Apache Airflow 2.2.0: что нового?

Apache AirFlow примеры курсы обучение, обучение дата-инженеров, инженер данных курсы примеры обучение, обновления airflow example, инженерия данных с Apache AirFlow пример, обучение большим данным, Школа Больших Данных Учебный центр Коммерсант

В октябре прошлого года вышел крупный релиз Apache AirFlow 2.2.0. Разбираем его главные фичи, которые больше всего интересны с точки зрения инженерии данных: пользовательские расписания и декораторы, отложенные задачи, а также валидация параметров DAG по JSON-схеме.

Краткий обзор обновлений AirFlow 2.2.0

Хотя последней версией популярного batch-планировщика задач Apache Airflow на февраль 2022 года является 2.2.3, она наследует наиболее важные изменения с ветки 2.2.0. Поэтому именно этот релиз мы рассмотрим подробнее: он содержит более 600 коммитов с версии 2.1.4 и включает 30 новых функций, 84 улучшения, 85 исправлений ошибок и множество внутренних изменений и изменений документации.

В частности, теперь возможно тестирование подключений из пользовательского интерфейса, что пригодится при проверке валидности учетных записей. Также в UI добавлено отображение информации о запуске следующего DAG, в т.ч. когда он фактически начнется. Автономный режим позволяет запускать все компоненты Airflow напрямую без Docker, что отлично подходит для локальной разработки. Далее рассмотрим другие наиболее интересные обновления AirFlow 2.2.0.

Пользовательские расписания

Airflow исторически использовал выражения cron и timedeltas для представления того, когда запускать DAG. Это работало во многих случаях использования, но не во всех. Например, автоматически запускать ежедневно по будним дням, а не по выходным, было невозможно. Чтобы обеспечить большую гибкость планирования, определение времени запуска DAG теперь выполняется с помощью расписаний. Обратная совместимость сохранена: выражения cron и timedeltas по-прежнему полностью поддерживаются, однако расписания можно подключать, настраивая их по собственным потребностям. В частности, можно написать свое расписание, чтобы запланировать запуск DAG.

Вместо execute_date добавлена ​​новая концепция data_interval — период данных, с которыми должна работать задача. Теперь доступны следующие параметры:

  • logical_date (аналогично execution_date);
  • data_interval_start (аналогично execution_date for cron);
  • data_interval_end (аналогично next_execution_date).

При разработке собственных расписаний стоит помнить, что они должны быть идемпотентными и быстрыми, поскольку используются в планировщике для создания запусков DAG (DagRun). Подробнее о том, как планировать запуск DAG по расписанию, читайте в нашей новой статье.

Отложенные задачи: операторы и триггеры

Стандартные операторы и датчики занимают полный рабочий слот на все время работы, даже если они простаивают. Например, если есть только 100 рабочих слотов, доступных для запуска задач, и 100 DAG, ожидающих датчика, который в данный момент работает, но простаивает, нельзя запускать что-либо еще, даже если весь кластер Airflow по существу простаивает. Режим reschedule для сенсоров частично решает эту проблему, позволяя им работать только через фиксированные интервалы, но он негибок: только время является причиной возобновления работы. Обойти это помогут отложенные операторы (Deferrable Operators). Отложенный оператор написан с возможностью приостановить себя и освободить worker’а, когда он должен ждать и передать работу по его возобновлению работы триггеру. Будучи приостановленным, отложенный оператор не занимает рабочий слот, и кластер будет тратить гораздо меньше ресурсов на бездействующих операторов или сенсоров.

Триггер — это процесс-демон, запускающий цикл обработки событий asyncio, небольшой асинхронный фрагмент Python-кода для совместного запуска в одном процессе. Поскольку отложенные операторы и триггеры основаны на более поздних функциях asyncio, они работают только на Python 3.7 или выше.

Например, экземпляр задачи (работающий оператор) доходит до точки, где ему приходится ждать, и приостанавливается с помощью триггера, привязанного к событию, которое должно его возобновить. Это освобождает worker AirFlow для запуска чего-то другого. Новый экземпляр триггера регистрируется внутри Airflow и подхватывается процессом триггера. Триггер запускается до тех пор, пока не сработает, после чего его исходная задача перепланируется. Планировщик ставит задачу в очередь для возобновления на рабочем узле.

Отложенные задачи позволяют операторам или датчикам откладывать себя до тех пор, пока не пройдет легкая асинхронная проверка, после чего они могут возобновить выполнение. Самое главное, это приводит к тому, что рабочий слот и, в первую очередь, любые используемые им ресурсы возвращаются в Airflow. Это позволяет намного дешевле и проще выполнять мониторинг задания во внешней системе или наблюдение за событием. Использование отложенных операторов удобно и дает множество преимуществ для разработчика DAG, но их написание требует больше усилий.

Airflow 2.2.0 поставляется с двумя отложенными датчиками, DateTimeSensorAsync и TimeDeltaSensorAsync, которые заменяют ранее существовавший сенсор. Пока нельзя использовать возможность отсрочки из пользовательских функций PythonOperator или TaskFlow Python, отложенные сенсоры доступны только традиционным операторам класса.

Пользовательские декораторы @task и @task.docker

В Airflow 2.2.0 провайдерам можно создавать собственные декораторы @task в интерфейсе TaskFlow, и они будут отображаются как часть дизайна @task.____. Напомним, декоратор @task.docker позволяет запускать функцию в Docker-контейнере. Airflow обрабатывает получение кода в контейнере и возврат XCom, абстрагируя разработчика функции от деталей реализации. Это особенно полезно, когда есть конфликтующие зависимости между самим Airflow и задачами, которые нужно запустить.

Эта фича расширяет набор вариантов использования API TaskFlow, не ограничивая разработчика набором существующих пакетов и системных библиотекам worker’а Airflow. Чтобы использовать Docker-образ с API TaskFlow, следует изменить декоратор на @task.docker и добавить нужные аргументы для правильного выполнения задачи. При этом Docker-образ должен иметь установленный работающий Python и принимать команду bash в качестве аргумента команды.

Валидация параметров DAG

Также теперь можно проверять параметры DAG, передав объект Param для каждого параметра. Этот объект поддерживает полные спецификации проверки JSON-схемы. Вообще Params — это концепция Airflow по предоставлению конфигурации времени выполнения для задач, когда DAG запускается вручную. Возможность обновления параметров при запуске DAG зависит от флага core.dag_run_conf_overrides_params. Если этот флаг равен False, параметры будут вести себя как константы. Пока это работает только с DAG, запускаемыми вручную, но в перспективе будет доступно и для других функций, связанных с параметрами.

Param использует json-schema для определения свойств и выполнения проверки, поэтому можно использовать полные спецификации json-schema для определения конструкции этих объектов. Если DAG использует обязательное значение параметра, т. е. объект Param без значения по умолчанию или NULL в качестве разрешенного типа, расписание этого DAG должно иметь значение None. Но при определении такого параметра на уровне задачи, Airflow не может его ограничить. Поэтому задача не будет выполняться.

Текущие правила безопасности AirFlow не позволяют использовать объекты Param, полученные из пользовательских классов. Но в будущем разработчики планируют создать систему регистрации для пользовательских классов Param, как и для Operator ExtraLinks. Что еще появилось нового в свежем выпуске AirFlow от 30 апреля 2022 года, мы рассказываем здесь.

Узнайте все про администрирование и эксплуатацию Apache AirFlow для организации ETL/ELT-процессов в аналитике больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://airflow.apache.org/blog/airflow-2.2.0/
  2. https://pypi.org/project/apache-airflow/2.2.3/#history
  3. https://json-schema.org/draft/2020-12/json-schema-validation.html
  4. https://airflow.apache.org/docs/apache-airflow/stable/concepts/deferring.html
Поиск по сайту