Airflow и TaskFlow: композиция операторов и задач с TaskGroup

Автор Категория ,
Airflow и TaskFlow: композиция операторов и задач с TaskGroup

В предыдущей статье мы рассмотрели TaskFlow API, повившийся в Apache Airflow 2.0. Сегодня поговорим о способах задания операторов, отличных от PythonOperator, а также о способе группировки задач TaskGroup. Читайте в этой статье: как сформировать BashOperator, используя TaskFlow API, когда следует использовать TaskGroup, в чем преимущества TaskGroup перед SubDag.

Используем Bash Operator в связке с TaskFlow API

TaskFlow API очевидно использует операторы Python. Однако и внутри задекорированных функций можно создавать все доступные операторы. Покажем это на примере.

Допустим, есть 2 задачи, которые возвращают небольшие строки, причем 2-я задача модифицирует строку 1-й задачи. После этого вызывается BashOperator, который выводит модифицированную строку в стандартный вывод. Первые две задачи сформируем в виде функции под декоратором task, а третью создадим внутри задокерированной функции dag. Итак, наш код на Python с композицией нескольких задач Airflow выглядит следующим образом:

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'romank',
    'start_date': days_ago(0),
}

@task
def t1():
    return 'first'

@task
def t2(data):
    return data + ', second'

@dag(default_args=default_args, schedule_interval=None)
def res():
    data1 = t1()
    data2 = t2(data1)
    bop = BashOperator(
        task_id='echo_task',
        bash_command=f'echo {data2}'
    )
    data2 >> bop

dag_res = res()

Как можно заметить, data2 является как возвращаемым значением функции t2, так и самой задачей. Поэтому, помимо того, что data2 входит в аргумент команды echo, она также является предыдущей задачей по отношению к BashOperator.

Здесь оператор Bash был приведен в качестве примера, но его можно заменить на любой другой или использовать несколько. Таким образом, мы объединили Taskflow API, который использует явную кросс-коммуникацию (XCom), и традиционный способ задания операторов Airflow.

Объединяем графы с TaskGroup

В Airflow 2.0 появилась возможность представлять графы в виде отдельных задач. Достигается это с помощью TaskGroup, который используется в качестве контекстного менеджера [1]. Зачем это нужно? Для соблюдения принципа модульности и для визуального объединения задач. Также оно помогает избежать повторения кода, если подграф используется несколько раз или вовсе экспортирован в другой граф.

Расширим пример. Объединим задачи, которые модифицируют строки, в отдельные группы. Их результаты объединим в задаче merge_data, которая в свою очередь пойдет на вход оператору Bash. Код на Python для разбиения задач Airflow на группы выглядит так:

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup

default_args = { 'owner': 'romank', 'start_date': days_ago(0), }

@task
def t1():
    return 'first'

@task
def t2():
    return 'second'

@task
def transform_task_1(data):
    return data + ' changed'

@task
def transform_task_2(data):
    return data + ' modified'

@task
def merge_data(data_1, data_2):
    return data_1 + ', ' + data_2

@dag(default_args=default_args, schedule_interval=None)
def dag_with_taskgroup():
    with TaskGroup('section-1') as section_1:
        data = t1()
        data_1 = transform_task_1(data)
    with TaskGroup('section-2') as section_2:
        data = t2()
        data_2 = transform_task_2(data)
    result = merge_data(data_1, data_2)
    bop = BashOperator(
        task_id='echo_task',
        bash_command=f'echo {result}'
    )
    result >> bop

dag = dag_with_taskgroup()

Сам граф расположен на рисунке ниже, при нажатии на группу, она раскрывается. Задача merge_data зависит от групп section-1 и section-2, поскольку ожидает их результатов.

Группировка задач TaskGroup
Сгруппированные задачи

Стоит ли пользоваться TaskGroup

TaskGroup появился вместо SubDag. И он появился не зря. SubDag медлителен, нестабильный, сложен в использовании и приводит к странным последствиям; разработчики хотят его даже удалить в Airflow 3.0 [2]. Поэтому TaskGroup является альтернативой сложным манипуляциям с SubDag’ами. Группы задач из TaskGroup находятся внутри того же самого графа, отсюда все параметры и конфигурации также доступны через него.

Несмотря на то, что группировка задач выглядит удобно, не стоит её применять в и так уже разросшимся DAG’е, увеличивая тем самым его сложность. Представьте себе, что у вас собираются данных с разных API, систем ERP, запущенных скриптов и проч., тогда вливать это всё в единый конвейер будет не самой разумной идеей. Ведь такого “монстра” сложно будет поддерживать. В данном случае лучше всего придерживаться принципа декомпозиции: разделяй и властвуй. Поэтому применять TaskGroup стоит с излишней осторожностью. Если кажется, что задачи можно скомпоновать в отдельный граф, сделайте это. Тогда такой граф можно тестировать отдельно, а после его можно импортировать в другие графы. В нашей статье вы также узнаете о лучших практиках создания Big Data конвейеров данных в Apache Airflow.

 

О конструировании графов, создании групп задач в Apache Airflow вы узнаете на наших образовательных курсах в лицензированном учебном центре обучения и повышения квалификации руководителей и ИТ-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве:

Источники

  1. https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#taskgroups
  2. https://lists.apache.org/thread.html/ra52746f9c8274469d343b5f0251199de776e75ab75ded6830886fb6a%40%3Cdev.airflow.apache.org%3E?noscript