Как создать собственный сенсор Apache Airflow: пример

Автор Категория ,
Как создать собственный сенсор Apache Airflow: пример

Сегодня заглянем под капот особых операторов Apache AirFlow, разберемся с режимами работы датчиков, а также рассмотрим, как создать собственный сенсор. Краткий ликбез по разработке своего sensor’а с лучшими практиками настройки и использования в DAG’ах AirFlow.

Что такое сенсор: краткий ликбез по AirFlow

Сенсоры или датчики AirFlow — это особый тип операторов, которые предназначены для выполнения ровно одной задачи — ожидания, когда что-то произойдет, например, загрузится файл, будет получен ответ от внешней системы и пр. Когда датчики работают, они проверяют, выполняется ли определенное условие, прежде чем они будут помечены как успешные, и позволят выполнять последующие задачи. При правильном использовании сенсоры могут стать отличным инструментом для того, чтобы сделать DAG’и управляемыми событиями. Как и операторы, Airflow имеет большой набор предварительно созданных датчиков, которые можно использовать как в ядре Airflow, так и через реестр провайдеров.

Датчики имеют три различных режима работы:

  • poke (по умолчанию): датчик занимает рабочий слот на все время работы;
  • reschedule – датчик занимает рабочий слот только во время проверки и спит в течение установленного времени между проверками;
  • smart sensor или интеллектуальный датчик – особый тип операторов, цель которого в ожидании определенного триггера, например, успешного завершения внешней задачи. Smart Sensor появился в Airflow 2.0 в 2020 году как функция раннего доступа, о чем мы писали здесь и здесь.

Режимы poke и reschedule можно настроить непосредственно при создании экземпляра сенсора в зависимости от временной задержки. Для ежесекундной проверки подходит poke, а при работе в минутном диапазоне – reschedule. Кроме режимов работы, для настройки сенсоров также используются следующие параметры:

  • poke_interval – для режима poke это время в секундах, в течение которого датчик ожидает, прежде чем снова проверить условие, по умолчанию 30 секунд;
  • exponential_backoff – значение True для этого параметра создает экспоненциально более длительное время ожидания между нажатиями в режиме poke;
  • timeout – максимальное количество времени в секундах, в течение которого датчик должен проверять условие. Если условие не выполнено при достижении этого временного периода, задача не выполняется.
  • soft_fail – значение True для этого параметра помечает задачу как пропущенную, если условие не выполняется по истечении времени ожидания.

Помимо этих параметров, разные типы датчиков имеют разные детали реализации.

На практике чаще всего используются следующие сенсоры:

  • S3KeySensor, который ожидает, пока файл попадет в корзину AWS Этот датчик полезен, когда нужно, чтобы DAG обрабатывал файлы из S3 по мере их поступления;
  • DateTimeSensor ожидает прохождения указанной даты и времени. Полезно, когда нужно выполнять задачи из одного DAG в разное время;
  • ExternalTaskSensor ожидает завершения задачи Airflow. Пригодится, если нужно реализовать зависимости между DAG в одной и той же среде Airflow.
  • HttpSensor ожидает доступности API;
  • SqlSensor ожидает появления данных в таблице SQL. Пригодится, если нужно, чтобы DAG обрабатывал данные по мере их поступления в базу данных.
  • PythonSensor ожидает, пока вызываемый объект Python вернет значение True. Этот датчик полезен, если нужно реализовать сложные условия в пользовательском DAG.

Но если имеющихся в Airflow датчиков не хватает и нет подходящего решения в открытом реестре провайдеров от Astronomer, о котором мы писали здесь, можно создать собственный сенсор. Как это сделать, рассмотрим далее.

Как создать свой датчик: пошаговый guide и лучшие практики

Сперва необходимо наследовать класс BaseSensorOperator. Он имеет следующий синтаксис:

classairflow.sensors.base.BaseSensorOperator(*, poke_interval: float = 60, timeout: float = 60 * 60 * 24 * 7, soft_fail: bool = False, mode: str = 'poke', exponential_backoff: bool = False, **kwargs)[source]

Операторы пользовательского сенсора являются производными от этого класса и наследуют эти атрибуты. В базовом классе есть метод poke, который следует перезаписать в соответствии с потребностями. В результате эта функция должна вернуть значение True или False. При значении True сенсор сработает, при False продолжит ожидание.

Предположим, требуется написать SqlSensor, поддерживающий Snowflake – популярное аналитическое SaaS-хранилище. Следующий фрагмент кода показывает, как создать его самостоятельно:

"""
Parameters:
 sql: str, sql to execute
Sensor class for snowflake
"""
from airflow.sensors.base_sensor_operator import BaseSensorOperatorclass SnowflakeSqlSensor(BaseSensorOperator):
 def __init__(self, sql *kwargs):
  self.sql = sql
  super().__init__(*kwargs)def get_connection():
  ## Create connection to your snowflake database
  conn = 'conn'
  return conndef poke(self, context):
  self.snowflake_conn = self.get_connection()  response = self.snowflake_conn.execute(self.sql).fetchall()  if not response:
     return False
  else:
     if str(response[0][0]) in ('0', '',):
        return False
     else:
        return True

Также следует создать соединение с учетной записью Snowflake в методе get_connection или написать хук – точку подключения AirFlow к сторонним сервисам и библиотекам.

Для эффективного использования сенсоров рекомендуется применять следующие советы, чтобы избежать проблем с производительностью:

  • всегда определять значимый параметр тайм-аута. Значение по умолчанию для этого параметра равно неделю, что зачастую слишком много. При реализации собственного сенсора стоит внимательно рассмотреть предполагаемый вариант использования и ожидаемое время ожидания, чтобы определить оптимальный тайм-аут.
  • применять режиме reschedule для долго работающих датчиков, чтобы они не занимали постоянно рабочий слот. Это поможет избежать взаимоблокировок в Airflow, когда датчики занимают все доступные рабочие слоты.
  • Для короткого интервала опроса (poke_interval), т.е. менее 5 минут лучше использовать режим poke, т.к. reschedule в этом случае может привести к перегрузке планировщика.
  • выбирать poke_interval на основе варианта использования. Например, нет необходимости проверять условие каждые 30 секунд (по умолчанию), если общее время ожидания порядка 30 минут.

Как использовать внешний датчик для управления зависимостями задач из разынх DAG, читайте в нашей следующей статье. А освоить администрирование и эксплуатацию Apache AirFlow для организации ETL/ELT-процессов в аналитике больших данных вам помогут специализированные курсы в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://airflow.apache.org/docs/apache-airflow/stable/concepts/sensors.html
  2. https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/index.html
  3. https://www.astronomer.io/guides/what-is-a-sensor
  4. https://medium.com/@soniagoyal/how-to-create-a-custom-airflow-sensor-bc5293bd96f3
  5. https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/base/index.html#module-airflow.sensors.base