Apache Airflow

В этой статье я бы хотел рассказать об основных концепциях Airflow и как с ним работать.

Что такое Airflow?

Airflow – это open-source оркестаратор для управления процессами загрузки и обработки данных. Если у вас есть большое количество задач, запускаемых на cron, особенно, если между ними есть зависимости, то Airflow может вам сильно помочь.

Основные его преимущества – это несложная инсталяция и первые шаги, хорошая визуализация, а также возможность автоматически создавать большое число задач и широкие возможности кастомизации.

Основной объект Airflow – это направленный ацикличный граф (DAG). Узлы DAG – это task (задачи, которые выполняют основную работу). Между task’ами есть связи. Как следует из определения, циклов в зависимостях быть не может.

Пример визуализации DAG
Рис.1 (Пример визуализации DAG)

DAG в Airflow может состоять из множества веток, различных ветвлений и т.п. Также можно устанавливать зависимости не только внутри одного DAG, но и между несколькими DAG’ами. Часть задач можно пропускать (skip) в зависимости от условий, статусов завершения предыдущих задач и т.п.

Также для задач можно выставлять различные приоритеты (priority_weight).

Основные концепции

DAG состоит из task и связей между ними.

Каждый task – это, по сути, экземпляр Operator класса с заданным списком параметров (для каждого оператора они разные).

Когда DAG запускается, Airflow создает экземпляр DAG Run.

Когда в контексте DAG запускается task, создается task Instance, которые и выполняет различные действия с данными.

Более подробно об основных концепциях Airflow можно почтить в официальной документации.

Установка Airflow

Airflow написан на Python 2.7, но уже достаточно давно в Production решениях используется Python 3. В целом проблем это не вызывает, но есть некоторые библиотеки, несовместимые с Python 3 (например MySQLdb нужно будет заменить на pymysql).

Установка библиотек Airflow:

sudo apt update

sudo apt install python3

sudo apt install python3-pip

pip3 install —upgrade setuptools

export SLUGIFY_USES_TEXT_UNIDECODE=yes

sudo pip3 install apache-airflow

Настройка окружения:

mkdir airflow

AIRFLOW_HOME=~/airflow

airflow initdb

Запуск сервисов airflow

airflow scheduler

airflow webserver

airflow worker

Также можно настроить запуск сервисов Airflow через systemd (пример из репозитория Airflow) или docker (https://github.com/puckel/docker-airflow).

После запуска webserver будет доступен по ссылке http://localhost:8080/admin/

Создадим свой первый DAG

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
# Python функция, которая у нас будет использоваться в дальнейшем
def print_hello():
    return ‘Hello world!’
# Создадим экземпляр DAG (контекст для наших task)
dag = DAG(‘hello_world’, description=‘Simple tutorial DAG’,
          schedule_interval=‘0 12 * * *’, # расписание
          start_date=datetime(2017, 3, 20), catchup=False) # дата начала работы DAG

dummy_operator = DummyOperator(task_id=dummy_task, retries=3, dag=dag) # Пустой оператор, ничего не делает

hello_operator = PythonOperator(task_id=hello_task, python_callable=print_hello, dag=dag) # Оператор, который вызывает нашу функцию print_hello

dummy_operator >> hello_operator # Установим зависимости между task’ами

Запуск DAG

Сначала нужно положить  *.py файлик в директорию dags_folder. Найти её можно в ~/airflow/airflow.cfg

Airflow сам увидит DAG и подтянет его в web-интерфейс.

Настройка WEB интерфейса Apache AirFlow
Рис.2 DAG в web-интерфейсе

Для того, чтобы DAG стал активен, нужно нажать на переключатель on/off (стрелка 1). Тогда scheduler увидит, что этот DAG нужно запустить.

Если нажать на кнопку 2, то можно будет увидеть структуру DAG. Выглядеть в случае нашего примера hello_world будет так:

DAG в Apache AirFlow
Рис. 3 Пример структуры DAG

Зеленая рамка вокруг первого task означает, что он завершился успешно. Серая вокруг второго, что он стоит в очереди на выполнение.

Более подробно почитать про визуализацию можно в официальной документации.

После успешного завершения hello_task, DAG перейдет в статус success.

Заключение

В этой статье я рассказал об основных особенностях в airflow и о том, как начать с ним работать. В следующих статьях я расскажу о том, как генерировать большое количество тасков автоматически, писать собственные операторы, использовать airflow connections для хранения паролей и многое другое.