3 новинки для DAG в Apache AirFlow 2.0

Автор Категория ,
3 новинки для DAG в Apache AirFlow 2.0

В поддержку наших полностью обновленных авторских курсов для инженеров данных по Apache AirFlow, сегодня рассмотрим новые способы определения DAG, которые были добавлены в релизе 2.0. Читайте далее, что под капотом TaskFlow API, как поместить задачи в TaskGroup, чем dag_policy отличается от task_policy и почему все это упрощает работу инженера Big Data.

Самые важные обновления версии 2.0 касательно DAG

Напомним, основной сущностью планировщика рабочих задач Apache AirFlow является направленный ациклический граф (DAG, Directed Acyclic Graph) – набор задач (task), которые выстроены в определенную последовательность, и выполняются по расписанию. Из 10 главных обновлений Apache AirFlow, выпущенных в мажорной версии 2.0, которая вышла в конце 2020 года, о чем мы писали здесь, по крайней мере 3 непосредственно связаны с DAG [1]:

  • TaskFlow API, который устраняет многие ограничения операторов Python, управления зависимостями между задачами и доступа к значениям XCom при создании цепочек рабочих задач;
  • группы задач (TaskGroups) – конструкция пользовательского веб-интерфейса, которая не влияет на поведение задач при их выполнении, но создание, управление и просмотр больших вложенных файлов DAG;
  • детализация кластерных политик – теперь пользователь может определять task_policy и dag_policy, каждая из которых выполняется в определенное время.

Что представляет собой каждое из этих нововведений, мы подробнее рассмотрим далее.

Что под капотом TaskFlow API: расширение XCom и не только

TaskFlow API состоит из трех функций [1]:

  • XComArg – слой поверх уже существующего XCom, который упрощает доступ и передачу информации между задачами через атрибут оператора output, который возвращает экземпляр XComArg. Этот простой способ позволяет получить доступ к ссылке на XCom, возвращаемой задачей, и использовать ее в других задачах, вместо сложного и многоступенчатого процесса определения отношений порядка и связей между данными в шаблонах jinja.
  • декоратор @task, который позволяет пользователям преобразовывать любую функцию Python в экземпляр задачи с помощью PythonOperator. В результате вызова декорированной задачи возвращается XComArg, который можно передать оператору или другой функции, также декорирующей задачу. Идентификаторы задач генерируются с использованием имени функции, позволяя пользователям вызывать ее несколько раз для создания множества задач одного типа.
  • настраиваемый XCom-бэкенд, который упрощает сохранение промежуточных данных между задачами, позволяя пользователям определять класс для сериализации вывода задачи и десериализации ввода задачи. Это снимает ограничения размера и формата передачи данных через таблицу XCom в предыдущих релизах AirFlow. Теперь при передаче больших данных от одной задачи к другой не нужно хранить их во внешнем хранилище, например, в облачных корзинах GCS или S3. В AirFlow0 каждая задача реализует логику загрузки и выгрузки в своем начале и в конце. Для использования Custom XCom backend следует настроить конфигурационный файл airflow.cfg.

Удобное управление задачами DAG c TaskGroups

TaskGroup упрощает создание, управление и просмотр больших DAG в пользовательском веб-интерфейсе Airflow. Напомним, ранее во фреймворке отсутствовала возможность переключать задачи, которые создают группу DAG [1]. В релизе 2.0 TaskGroup позволяют инкапсулировать вложенные задачи в графическом представлении, визуально организовав их в иерархические группы. Это существенно упрощает работу дата-инженера, облегчаю работу с объемными цепочками задач. В отличие от SubDagOperator, TaskGroup – это концепция группировки пользовательского интерфейса. Задачи в TaskGroups находятся в той же исходной группе DAG и соответствуют заранее заданным конфигурациям.

TaskGroups Apache AirFlow 2.0, обучение AirFlow, курсы дата-инженеров
Вложенные группы задач в веб-интерфейсе Apache AirFlow 2.0

По умолчанию дочерние задачи и группы задач имеют свой task_id и group_id с префиксом group_id их родительской TaskGroup. Это гарантирует уникальность их идентификаторов (group_id и task_id) во всей группе DAG. Чтобы отключить префикс, следует задать prefix_group_id=False при создании TaskGroup. Это дает пользователю полный контроль над фактическими group_id и task_id, которые должны быть уникальными во всей группе DAG. Отключение параметра prefix_group_id пригодится для помещения задач из существующих DAG в TaskGroup без изменения их task_id.

Отношения зависимости могут применяться ко всем задачам в TaskGroup. Например, следующий код помещает task1 и task2 в TaskGroup group1, а затем помещает обе задачи вверх от task3 [2]:

with TaskGroup(“group1”) as group1:

    task1 = DummyOperator(task_id=”task1″)

    task2 = DummyOperator(task_id=”task2″)

task3 = DummyOperator(task_id=”task3″)

group1 >> task3

Новые кластерные политики Apache AirFlow

В Airflow политики кластера предоставляют интерфейс для работы с каждой задачей или DAG во время его загрузки, а также непосредственно перед выполнением задачи. В фреймворка версии 2.0 пользователи могут делать следующее [2]:

  • установить аргументы по умолчанию для каждого DAG и задачи;
  • проверить соответствие DAG и задачи требуемым стандартам;
  • настроить логику маршрутизации задачи в очередь.

Чтобы использовать эти политики, необходимо определить в пользовательских настройках airflow_local_settings следующие функции [2]:

  • dag_policy, которая на входе принимает аргумент dag типа DAG, позволяя пользователям определять политику уровня DAG, применяемую для каждого DAG во время загрузки;
  • task_policy, которая на входе принимает аргумент задачи типа BaseOperator, позволяя пользователям определять политику на уровне задач, применяемую для каждой задачи во время загрузки DAG;
  • task_instance_mutation_hook, которая на входе принимает аргумент task_instance типа TaskInstance, позволяя пользователям определять политику уровня задачи, применяемую непосредственно перед выполнением задачи.

Для DAG и политик задач можно активировать исключение AirflowClusterPolicyViolation, чтобы предотвратить импорт DAG или выполнение задачи, не соответствующей пользовательской проверке. Политика кластера имеет приоритет над атрибутами задач, определенными в DAG, поэтому следует целостно настраивать те параметры, которые повторяются, например, доступность (SLA) задачи (task.sla).

Наконец, отметим еще одно важное обновление Apache AirFlow 2.0, которое упрощает дата-инженерам работу с DAG. Раньше при использовании TriggerDagRunOperator или ExternalTaskSensor было не просто переходить от одного DAG к другому, на который ссылается оператор или датчик. В предыдущих версиях фреймворка эта проблема решалась через ручное создание пользовательских ссылок. В релизе 2.0 TriggerDagRunOperator и ExternalTaskSensor поставляются с уже встроенными ссылками на операторы, которые можно просмотреть в пользовательском веб-интерфейсе. Это значительно упрощает навигацию между связанными DAG [1].

Завтра мы продолжим разговор про полезные фишки Apache AirFlow 2.0 для дата-инженера и рассмотрим, как создать свой собственный провайдер перехода фреймворка с монолитной структуры к пакетной организации независимых друг от друга компонентов. 

Как эффективно использовать все эти нововведения Apache AirFlow 2.0 на практике при разработке конвейеров аналитики больших данных с Hadoop и Spark, вы узнаете на авторских курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

 

 

Источники

  1. https://medium.com/apache-airflow/airflow-2-0-dag-authoring-redesigned-651edc397178
  2. https://airflow.apache.org/docs/apache-airflow/stable/concepts.html