Мониторинг Apache Airflow через Slack

Автор Категория ,
Мониторинг Apache Airflow через Slack

В этой статье для разработчиков Data Flow, инженеров данных и администраторов Apache AirFlow рассмотрим, как организовать мониторинг этого batch-оркестратора через популярный корпоративный мессенджер Slack. Хотя по умолчанию Airflow имеет встроенную возможность отправлять оповещения по электронной почте, это не самый оперативный способ сообщить о критичной проблеме, к примеру, когда DAG с важными бизнес-задачами вышел из строя. Эффективнее получать оповещения там, где за ними следит вся команда дата-инженеров, например, в Slack.

Зачем следить за Airflow через Slack и как это организовать

Если Slack уже используется в качестве корпоративного мессенджера и средства командного взаимодействия, можно организовать в его каналах мониторинг конвейеров обработки данных на Apache AirFlow, независимо от их количества. Для этого требуется совсем немного пререквизитов:

  • запущенный Airflow и базовые знания о том, как написать DAG с помощью PythonOperator;
  • рабочая область Slack с правами администратора или хотя бы возможность запрашивать разрешения для создания веб-перехватчиков в мессенджере;
  • выделенный канал мессенджера.

Теперь рассмотрим по шагам, как организовать мониторинг рабочих процессов в AirFlow через Slack, выполнив такую последовательность действий:

  1. создать приложение Slack и активировать веб-хуки или перехватчики;
  2. подключить Slack Webhook к Airflow;
  3. включить в DAG AirFlow код для отправки сообщений на канал Slack.

Каждый из этих шагов мы подробнее рассмотрим далее.

Начало работы в Slack API и веб-хуки

В Slack входящие веб-перехватчики или хуки (hooks) — это простой способ по желанию публиковать сообщения из приложений в канал мессенджера, используя определенный URL-адрес. Создание входящего веб-перехватчика дает уникальный URL-адрес, куда можно отправлять полезную нагрузку JSON с текстом сообщения и некоторыми параметрами. Это полезно, когда действия, которые могут привести к публикации сообщения, происходят в удаленном сервисе. Например, в системе отслеживания проблем нужно опубликовать сообщение в канале о создании или устранении ошибки. Для этого пригодится входящий веб-перехватчик, вызываемый из самой системы отслеживания проблем.

Создать новое приложение Slack можно прямо в его веб-API по адресу: https://api.slack.com/apps?new_app=1. Задав в соответствующих полях имя приложения и рабочее пространство Slack, можно создать приложение и перейти к веб-хукам. Их следует активировать, т.е. разрешить использование входящих перехватчиков в новом только что созданном приложении и добавить новый веб-перехватчик в рабочую область. Это может делать только пользователь с правами администратора выбранного рабочего пространства Slack. Поэтому следует сперва запросить разрешение на данную операцию. Также нужно указать канал, на который Airflow будет отправлять оповещения и сообщения. Далее следует скопировать отображаемый URL-адрес Webhook и перейти в пользовательский интерфейс администратора Airflow.

Подключение Slack Webhook к Airflow

В пользовательском интерфейсе администратора Airflow нужно создать новое подключение и с типом Slack Webhook. Идентификатор подключение (Conn Id) может быть назван любым именем, а в поле Хост следует вставить URL-адрес Webhook. Выбранный тип подключения Slack включает интеграцию AirFlow с этим мессенджером. При этом можно обеспечить безопасный доступ, выполнив аутентификацию в мессенджер с помощью токена Slack API. При указании подключения в переменной среды следует указывать его с использованием синтаксиса URI, но все компоненты URI должны быть закодированы в URL. Например,

export AIRFLOW_CONN_SLACK_DEFAULT=’slack://:token@’

Включение в DAG AirFlow кода для отправки сообщений на канал Slack

Напомним, направленный ациклический граф (DAG) в Airflow представляет собой цепочку задач, которая отражает их зависимости и отношения. DAG определены в Python-скриптах и помогают планировщику определить, какие задания выполнять. Есть два варианта мониторинга задач Airflow через Slack:

  • оповещения о задачах, когда в мессенджер отправляются сообщение при сбое задачи;
  • Slack Message Tasks, когда в мессенджер отправляются сообщения о ходе выполнения batch-конвейера. Это может быть в начале или в конце DAG, а также между входящими в него задачами.

Оба варианта используют SlackWebhookOperator, который обычно применяется для создания отчетов и предупреждений путем планирования входящих сообщений в каналы Slack при выполнении некоторого условия срабатывания. Этот оператор позволяет отправлять сообщения в мессенджер с помощью входящих веб-перехватчиков. Оператор принимает как токен веб-перехватчика Slack напрямую, так и соединение с его токеном. Если оба указаны, http_conn_id будет использоваться как base_url, а webhook_token будет использоваться как конечная точка, относительный путь URL-адреса. Каждый токен веб-перехватчика Slack можно предварительно настроить для использования определенного канала, имени пользователя и значка. Можно переопределить эти значения, задав следующие параметры:

  • http_conn_id (str) — подключение, где в дополнительном поле указан токен Slack webhook;
  • webhook_token (str) – токен веб-хука Slack;
  • message (str) — сообщение, которое следует отправить в корпоративный мессенджер;
  • attachments (list) — вложения для отправки в Slack в виде списка словарей;
  • blocks (list) — блоки для отправки в мессенджер в виде списка словарей;
  • channel (str) – канал мессенджера, куда будет отправляться сообщение;
  • username (str) — имя пользователя для публикации сообщения в мессенджере;
  • icon_emoji (str) — эмоджи в качестве значка для пользователя, публикующего сообщения;
  • icon_url (str) — строка URL-адреса изображения значка вместо иконки по умолчанию;
  • link_names (bool) — логическая переменная, которая показывает необходимость находить и связывать каналы и имена пользователей в сообщении;
  • proxy (str) — прокси-сервер для вызова Slack webhook;
  • extra_options (dict) – дополнительные параметры для http-хука

По умолчанию SlackWebhookOperator имеет следующие поля:

template_fields = [‘webhook_token’, ‘message’, ‘attachments’, ‘blocks’, ‘channel’, ‘username’, ‘proxy’, ‘extra_options’]

Больше деталей про администрирование и эксплуатацию Apache AirFlow для организации ETL/ELT-процессов в аналитике больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://medium.com/geekculture/airflow-monitoring-via-slack-c692cd6ee85a
  2. https://airflow.apache.org/docs/apache-airflow-providers-slack/stable/connections/slack.html
  3. https://api.slack.com/messaging/webhooks
  4. https://www.mikulskibartosz.name/send-cusomized-slack-notification-when-airflow-task-fails/
  5. https://medium.com/datareply/integrating-slack-alerts-in-airflow-c9dcd155105
  6. https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/contrib/operators/slack_webhook_operator/index.html