Real-time Airflow Alerts to Google Chat

Kashif Sohail
7 min readJun 6, 2023

--

Integrate airflow with Google Chat to send task status via callbacks

Introduction

Apache Airflow is a powerful workflow orchestration platform used for managing complex data pipelines. Monitoring the status of tasks and receiving timely alerts about their success and failure is crucial for maintaining the health and reliability of your workflows. In Airflow, success and failure callbacks in DAGs provide a mechanism to perform custom actions based on the outcome of individual tasks. Success callbacks allow you to define actions to be executed when a task successfully completes, enabling you to perform additional operations or trigger downstream tasks.

On the other hand, failure callbacks allow you to specify actions to be taken when a task fails, such as sending notifications, logging errors, or performing error-handling procedures. By leveraging success and failure callbacks effectively, you can enhance the flexibility and robustness of your workflows, enabling seamless automation and proactive response to task outcomes. Google Chat provides a collaborative messaging platform that can be integrated with third-party apps using their extensive API.

Incoming webhooks let you send asynchronous messages into Google Chat from applications that aren’t Chat apps. For example, you can configure a monitoring application to notify on-call personnel on Google Chat when a server goes down. In this article, we will explore how to send Airflow task failure and success alerts to Google Chat using webhooks.

Prerequisites:

Before proceeding, ensure you have the following in place:

  1. An instance of Apache Airflow installed and configured.
  2. A Google account with access to Google Chat.
  3. Basic knowledge of Python and Airflow concepts.

Create a Google Chat Space and Webhook

  1. Log in to your Google account and navigate to Google Chat.
  2. Create a new space or select an existing one where you want to receive the Airflow alerts.
  3. Click on the space name, click on apps and integrations, and click the “Manage webhooks” button.
  4. Provide a name for the webhook and Avatar URL and then click “Save
  5. Copy the generated webhook URL as we will use it in the next step.
Create a webhook in Google Chat Space — step by step

Learn more about Google Chat webhooks here.

Adding Webhook details to airflow

Implementing a best practice, it is crucial to store credentials securely by adding them to the airflow connections. To achieve this, follow these steps:

  1. Access the Airflow Web UI.
  2. Navigate to the Connections Page.
  3. Add a New Connection.
  4. In the host section, paste the webhook URL (excluding the key), and add the key to the password box.

For the purpose of this tutorial, a connection named ‘gchat_webhook’ has already been added.

Implement the Google Chat Webhook Callback

  1. In your Airflow project, create a new Python file, e.g., google_chat_callback.py.
  2. Import the necessary modules:
import requests
import traceback
from airflow.hooks.base_hook import BaseHook

3. To enhance the functionality, let’s create a function named _get_webhook_url in order to retrieve the webhook and credentials from Airflow. This function will include an optional parameter called, which we will utilize to send exception details as a reply to the thread, thereby maintaining a cleaner space.

def _get_webhook_url(connection_id: str, thread_ref: str = ""):
"""
Retrieves the webhook URL for the specified connection ID.
Args:
connection_id (str): The connection ID.
thread_ref (str): The optional thread reference.
Returns:
str: The constructed URL.
"""
gchat_connection = BaseHook.get_connection(connection_id)
full_url = f"{gchat_connection.host}{gchat_connection.password}{thread_ref}"
return full_url

4. Create a separate function for sending HTTP post request

def _make_http_request(body, full_url):
"""
Sends an HTTP POST request with the provided body to the given URL.
Args:
body (dict): The request body.
full_url (str): The URL to send the request to.
"""
r = requests.post(
url=full_url,
json=body,
headers={"Content-type": "application/json"},
)
print(r.status_code, r.ok)

5. Now that we have reached this stage, we are prepared to send messages to a Google Chat space. The subsequent step involves constructing a message that can be transmitted to the desired Google Chat space. To ensure interactivity and consistency in our alert messages, we utilize the Card UI design.

Google Chat apps have the capability to create card messages with defined layouts, interactive UI elements, and rich media. These card messages serve various purposes such as presenting detailed information, collecting user input, and guiding users to take specific actions. This guide provides instructions on creating card messages both synchronously (real-time response to events like receiving a message or joining a space) and asynchronously (sending messages without prompts) using the Google Chat API.

Learn more about card messages here.

Anatomy of a card message. source: developers.google.com

Sample notification cards for success and failure.

Sample notifications for success and failure

6. The success callback task_success_alert(context) has been implemented to deliver elegant and visually appealing notifications when a task is executed successfully. Utilizing the Card UI, I have designed a rich message format that incorporates key details. The task name is prominently displayed in the header, while the DAG name is provided in the subtitle. Additionally, I have included an optional image, such as a check mark sign, to enhance the visual representation.

To personalize the message further, a range of icons is available at icons8, allowing you to choose an icon that aligns with your preferences. Furthermore, a button linking to the logs of the completed task has been included, enabling you to access the logs directly with a single click.

In terms of supplementary information, the card includes the execution time and task duration. You can refer to the TaskInstance documentation to identify additional information that can be incorporated into the card as per your requirements.

def task_success_alert(context):
"""
Sends an alert to Google Chat in case of task success.
Args:
context (dict): Context object containing information about the task instance.
"""

print("task_success_alert()")

body = {
'cardsV2': [{
'cardId': 'createCardMessage',
'card': {
'header': {
'title': "{task} execution successfull".format(task=context.get("task_instance").task_id, tries=context.get("task_instance").prev_attempted_tries),
'subtitle': context.get("task_instance").dag_id,
'imageUrl': "https://img.icons8.com/fluency/48/checkmark.png"
},
'sections': [
{
'widgets': [
{
"textParagraph": {
"text": f"<b>Execution Time:</b> <time>{context.get('logical_date')}</time>",
}
},
{
"textParagraph": {
"text": f"<b>Task Duration: </b> {context.get('task_instance').duration}s",
}
},

{
'buttonList': {
'buttons': [
{
'text': 'View Logs',
'onClick': {
'openLink': {
'url': context.get("task_instance").log_url
}
}
},

]
}
}
]
}
]
}
}]
}
full_url = _get_webhook_url(GCHAT_CONNECTION)
print("sending alert card")
_make_http_request(body, full_url)

7. The task_fail_alert(context) function follows a similar structure to the success alert, but with the inclusion of an exception summary within the card message. To maintain a cleaner space, the exception details are sent as a thread to this card message. Furthermore, the card now incorporates an additional button that allows users to mark the task as successful, providing a convenient option for task resolution. In addition to this, the card also includes a “View Logs” button for easy access to the task logs.

def task_fail_alert(context):
"""
Sends an alert to Google Chat in case of task failure.
Args:
context (dict): Context object containing information about the task instance.
"""

# forming the run_id, we will use it later as a unique thread_id so that we can push exception details as thread
# while exception message will be posted as a card in Space.
run_id = str(context.get("task_instance").dag_id)+"-"+str(context.get("task_instance").run_id).replace(
"+", "-").replace(":", "-")

print("task_fail_alert()")
exception = context.get("exception")
formatted_exception = str(exception)
try:
tb = None if type(exception) == str else exception.__traceback__
formatted_exception = "".join(
traceback.format_exception(etype=type(
exception), value=exception, tb=tb)
)
except:
pass

# form a card to represent alert in a better way.
body = {
'cardsV2': [{
'cardId': 'createCardMessage',
'card': {
'header': {
'title': "{task} is failed after {tries} tries".format(task=context.get("task_instance").task_id, tries=context.get("task_instance").prev_attempted_tries),
'subtitle': context.get("task_instance").dag_id,
'imageUrl': "https://img.icons8.com/fluency/48/delete-sign.png"
},
'sections': [
{
'widgets': [
{
"textParagraph": {
"text": f"<b>Execution Time:</b> <time>{context.get('logical_date')}</time>",
}
},
{
"textParagraph": {
"text": f"<b>Task Duration: </b> {context.get('task_instance').duration}s",
}
},
{
"textParagraph": {
"text": f"<b>Exception:</b> <i>{str(exception)[:150]}</i>",
}
},
{
'buttonList': {
'buttons': [
{
'text': 'View Logs',
'onClick': {
'openLink': {
'url': context.get("task_instance").log_url
}
}
},
{
'text': 'Mark Success',
'onClick': {
'openLink': {
'url': context.get("task_instance").mark_success_url
}
}
}
]
}
}
]
}
]
}
}]
}

thread_ref = f"&threadKey={run_id}&messageReplyOption=REPLY_MESSAGE_FALLBACK_TO_NEW_THREAD"
full_url = _get_webhook_url(GCHAT_CONNECTION, thread_ref)
print("sending alert card")
_make_http_request(body, full_url)

print("sending exception as a thread")
body = {
"text": f"""<users/all>
```{formatted_exception}```"""
}
_make_http_request(body, full_url)

See the complete code in my github repository.

Conclusion

Integrating Google Chat with Airflow provides a powerful combination for efficient workflow management. By leveraging the real-time communication and collaboration features of Google Chat within the Airflow environment, teams can streamline their workflows, promptly address issues, and enhance overall productivity. The step-by-step guide outlined in this article will assist you in setting up the integration and harnessing the benefits of seamless communication and collaboration between Google Chat and Airflow.

References

--

--

Kashif Sohail

Data Engineer with more than 7 years of experience having exposure to fintech, contact center, music streaming, and ride-hail/delivery industries.