Начало работы с Apache Airflow

Автор Категория ,
Начало работы с Apache Airflow

В прошлой статье мы рассмотрели установку Apache Airflow на свой компьютер. Данная платформа предназначена для планирования задач, например, выполнения скриптов Bash и Python в заданное время, в заданной последовательности. Сегодня на примере выполнения двух Bash-команд расскажем, как создать свой первый граф. Читайте в этой статье: связи между задачами, создание графа (DAG), запуск задним числом (backfill).

Граф, задачи и их соединения

Последовательность выполнения задач (tasks) задается через DAG (направленный ациклический граф). Apache Airflow предоставляет специальный объект DAG, который реализует протокол контекстного менеджера, т.е. в коде встречается в виде with DAG().

Связь между задачами формируются с помощью операторов побитового сдвига. Например, при задании t3 >> t2 >> t1 сначала выполнится задача t3, затем t2, затем t3. Параллельное выполнение также можно задать. Например, в следующем коде на Python:

t1 >> t2 >> t4
t1 >> t3 >> t4

# Что означает:
#    t2
#   /  \
# t1    t4
#   \  /
#    t3

— задачи t2 и t3 будут выполняться параллельно.

По умолчанию задача в графе начнет выполнение только в том случае, если предыдущие задачи получили успешный статус. В предыдущем примере t4 начнет выполнение, когда успешно закончатся t2 и t3, а они в свою очередь выполнятся после успеха t1. Однако это поведение всегда можно изменить [1].

Создаем свой первый DAG в Apache Airflow

Напишем DAG, который будет состоять из двух последовательных задач. Первая задача будет выполнять команду echo, а вторая — date. Обе команды должны быть поданы командному интерпретатору Bash.

Импорт модулей

Итак, сначала импортируем необходимые модули. Для нашего примера понадобятся всего 3 импорта:

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

Определение свойств задач

Теперь нам нужно задать свойства для каждой задачи. Это делается с помощью словаря (dict). Мы зададим всего 3 свойства:

default_args = {
    'owner': 'romank',
    'start_date': days_ago(0),
    'depends_on_past': False,
}

Владельцем является romank. Аргумент start_date обозначает формальную дату начала задачи. Она формальная в том смысле, что может иметь любое значение (например, 2019-25-07) при этом запустить задачу можно с текущего момента. Мы поставили ей значение функции days_ago(0), т.е. равное текущему дню. Кроме days_ago, можно использовать datetime, например, datetime.datetime(2019, 7, 25).

Аргумент depends_on_past, равный False, при запуске задним числом (backfill) дает запустить DAG в не зависимости от статуса предыдущего запуска. При флаге True граф не запустится задним числом, если предыдущий имеет статус отличный от успешного.

Создание самого DAG в Apache Airflow

Наконец, нужно объявить сам DAG. В него нужно передать уникальный идентификатор (dag_id), словарь аргументов, интервал запуска и дополнительные параметры. В самом графе перечисляются задачи в виде операторов, а также определяется связь между ними. Задачами могут являться BashOperator — команда Bash, PythonOperator — объект, реализующий протокол callable, т.е. любая Python-функция. Этими двумя операторами дело не ограничивается, но в этой статье мы рассмотрим только BashOperator‘ы, которые выполняются последовательно. Итак, наш граф будет определяться следующим образом:

with DAG(
    'mydag',
    default_args=default_args,
    schedule_interval='@once',
    catchup=False
) as dag:

    t1 = BashOperator(
        task_id='echo_hi',
        bash_command='echo "Hello"',
    )

    t2 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    t1 >> t2

Мы присвоили графу идентификатор mydag, который запустится сам лишь один раз (@once). Интервал запуска можно установить через timedelta (см. таб. ниже). Параметр catchup позволяет рассматривать формальную дату начала задачи (start_date) как текущую. Этот параметр важен. Рассмотрим такую ситуацию: формальная дата начала равна 2021-08-10, интервал запуска — ежедневный @daily, а текущая дата — 2021-08-15, тогда при catchup=True граф запустится 5 раз, т.е. “нагонит” текущую дату. При флаге False, граф выполнится только один раз (и будет запускаться впредь ежедневно, если не остановлен).

Интервал Значение
None Не планировать, запустить вручную
@once Единожды
@hourly / *td(hours=1) Ежечасно
@daily / *td(days=1) Ежедневно
*td = timedelta  

Таким образом, граф запустится один раз, и выполнятся команды echo, а затем date.

Где это все отслеживать

Если вы подняли контейнер, то у вас запущен веб-сервер, который доступен по адресу localhost:8080. Найдите идентификатор графа, снимите его с паузы и нажмите на него. Вы увидите сам граф с задачами. Нажав, на задачу можете просмотреть логи, где указан результат команды.

В нашем видеоролике вы узнаете немного о веб-сервере.

Выполнение задним числом в Apache Airflow

В словаре аргументов мы поставили формальную дату запуска, но если вам или коллеге требуется другая дата? Чтобы не менять код, вы можете запустить граф задним числом (backfill), т.е. расставить другую дату начала и конца, не меняя кода. При этом логи и записи в веб-сервере будут также записаны. Кроме того, мы недаром поставили depends_on_past=False, который работает в этом режиме: если запуск одного графа провалится, последующие все равно запустятся.

Итак, для запуска задним числом применяется следующая команда:

./airflow.sh dags backfill mydag –start-date 2021-08-18 –end-date 2021-08-20

Дату начала и/или конца подставьте свои. Вышеприведенный код вы можете найти в нашем репозитории.

 

В следующей статье рассмотрим механизм передачи сообщений, который появился в Airflow 2.0 — TaskFlow API. А ещё больше подробностей о работе в Apache Airflow с реальными примерами вы узнаете на специализированных курсах в лицензированном учебном центре обучения и повышения квалификации разработчиков, менеджеров, архитекторов, инженеров данных, администраторов, Data Scientist’ов и аналитиков Big Data в Москве.

  1. AIRF: Курс Apache AirFlow
  2. Курс Data pipeline на Apache AirFlow и Arenadata Hadoop

Источники

  1. Документация: управление потоками