15 советов по работе с DAG в Apache AirFlow: лучшие практики дата-инженера

Автор Категория ,
15 советов по работе с DAG в Apache AirFlow: лучшие практики дата-инженера

Практическое обучение дата-инженеров – это не просто курсы по основам Big Data, а полезные рекомендации с реальными примерами. Поэтому сегодня рассмотрим, как работать с DAG в Apache AirFlow еще эффективнее с помощью параметров конфигурации, плагинов, меток, шаблонов, переменных и еще 10 различных инструментов.

15 лучших практики для DAG в Apache AirFlow

Напомним, конвейер обработки данных (data pipeline) в Airflow – это просто Python-скрипт, который определяет объект DAG (Directed Acyclic Graph). DAG AirFlow – это цепочка задач для запланированного запуска по расписанию в виде направленного ациклического графа. Проектируя DAG, инженер данных как разработчик Data Flow, определяет набор операторов, с помощью которых будут выполняться входящие в граф задачи. Поэтому знание лучших практик работы с DAG очень полезно пользователю Apache AirFlow. В этом случае наиболее простыми, но весьма эффективными рекомендациями будут следующие [1]:

  • рассматривайте DAG как файл конфигурации;
  • Используйте систему плагинов;
  • не выполняйте обработку данных в файлах DAG;
  • делегируйте операторам вызовы API-интерфейсов или подключения к базам данных;
  • делайте DAG-файлы и задачи идемпотентными;
  • используйте одну переменную для каждого DAG;
  • маркируйте DAG;
  • не злоупотребляйте XCom;
  • используйте промежуточное хранилище между задачами;
  • контролируйте доступ на уровне DAG;
  • используйте возможности шаблонов Jinja;
  • установите повторные попытки выполнения задач на уровне DAG;
  • определите согласованную структуру файлов;
  • выберите последовательный метод для зависимостей задач;
  • имейте стратегию уведомлений о сбоях.

Что именно представляет собой каждая рекомендуемая практика, мы рассмотрим далее.

DAG как файл конфигурации

Планировщик Airflow сканирует и компилирует файлы DAG при каждом такте. Это занимает довольно много ресурсов для тяжеловесных файлов с большим количеством верхнеуровневого кода. Поэтому целесообразно делать DAG максимально понятными с т.н. «чистым кодом», что они были похожи на файлы конфигурации. Для этого пригодится YAML/JSON-определение рабочего процесса (workflow), чтобы затем на его основе создавать DAG. Такой подход даст как минимум следующие преимущества:

  • DAG, которые создаются автоматически (программным способом), будут согласованными и воспроизводимыми в любое время;
  • доступность для пользователей, не работающих с Python.

Более того, блоки кода, не связанные с конфигурацией, можно отделить от определения DAG и использовать атрибут template_searchpath для их добавления. Например, если требуется выполнить некоторый SQL-запрос, подключившись к источнику данных, эта команда SQL должна быть загружена из файла. А расположение этого файла следует указать в template_searchpath. Аналогичное эмпирическое правило подходит и для запросов Hive (.hql).

Используйте в систему плагинов Airflow

Организуйте качественный репозиторий плагинов и поддерживайте его для создания пользовательских плагинов. Создавайте плагин по единому (универсальному) образцу, чтобы его можно было многократно использовать в разных сценариях использования. Это позволит управлять версиями плагинов, а также поддерживать порядок в рабочих процессах с помощью параметров их конфигурации, а не логики реализации. Вставляйте операции внутри метода выполнения, а не при инициализации класса.

Не выполняйте обработку данных в файлах DAG

Поскольку файлы Directed Acyclic Graph представляют собой Python-скрипты, может возникнуть соблазн использовать pandas или аналогичные библиотеки обработки данных. Однако, не стоит это делать: помните, что Airflow – это оркестратор рабочих процессов, а не среда их исполнения. Все вычисления должны выполняться в специализированной целевой системе.

Делегируйте операторам API-вызовы и подключения к базам данных

Вызов API или соединение с БД, выполненное на верхнем уровне кода в файлах DAG, перегружает веб-сервер. Эти вызовы, определенные вне оператора, вызываются при каждом такте. Поэтому рекомендуется передать их оператору util/common.

Сделайте DAG-файлы и задачи идемпотентными

DAG должен выдавать одни и те же данные при каждом запуске, например, чтение из раздела и запись в него, должны быть неизменными. Чтобы избежать внезапных ошибок, создавайте и удаляйте сами разделы, не трогая DAG.

Используйте одну переменную для каждого DAG

Каждый раз при обращении к переменным Directed Acyclic Graph, создается соединение с базой данных, чтобы считать метаданные. Это может перегрузить СУБД, особенно при нескольких DAG, каждый из которых вызывает более одной переменной. Поэтому лучше использовать одну переменную для каждого графа с объектом JSON в рамках единого соединения. А, проанализировать этот JSON, можно получить требуемую пару ключ-значение.

Маркируйте DAG

Наличие тегов помогает фильтровать и группировать Directed Acyclic Graph. Поэтому стоит маркировать цепочки задач в соответствии с системой тегов, характерной для вашей инфраструктуры. Например, теги могут базироваться на проекте, категории приложения и прочих особенностях экосистемы data pipeline’ов, принятых в компании. Также это может помочь в управлении множеством взаимозависимых DAG: об этой проблеме и способах ее решения читайте в нашей новой статье.

Не злоупотребляйте XCom

Напомним, XCom (cross-communication) в Apache AirFlow используется как канал обмена данными между задачами в одном DAG, с помощью пары ключ-значение и названием задачи-отправителя. XCom создаётся в операторе Python на основании возвращаемого им значения или вручную с помощью функции xcom_push. После выполнения задачи значение сохраняется в контексте, чтобы его через функцию xcom_pull приняла следующая задача в другом Python-операторе или из шаблона jinja внутри любой предобработанной строки. При этом данные хранятся в серверной СУБД метаданных, хотя в Airflow 2.0 сама операция XCom скрыта внутри Python-оператора и полностью абстрагируется от разработчика DAG. Подробнее об этом и других новинках нового релиза Apache AirFlow мы писали здесь. Тем не менее, несмотря на отмеченное улучшение с инкапсуляцией XCom, при передаче большого количества данных между задачами или слишком частого выполнения этой процедуры, серверная СУБД с метаданными будет перегружена.

Используйте промежуточное хранилище между задачами

Чтобы избежать вышеуказанной проблемы с XCom и организовать быстрый обмен большим объемом данных между задачами, имеет смысл сохранить их в промежуточной хранилище. И передавать последующей задаче ссылку на эти данные, не пересылая их самих.

Используйте возможности шаблонов Jinja

Airflow использует возможности Jinja Templating и предоставляет разработчику Data Flow готовый набор встроенных параметров и макросов, позволяя также самостоятельно определять их и создавать новые шаблоны. Напомним, Jinja – это язык шаблонов для Python-разработчиков, похожий на шаблоны Django. Он быстрый, популярный и безопасный благодаря дополнительной изолированной среде выполнения шаблонов (песочнице) и автоматической системе экранирования HTML для предотвращения XSS [2].

Многие операторы AirFlow поддерживают template_fields – кортежный объект (tuple), который определяет, какие поля будут преобразованы.

class PythonOperator(BaseOperator):

template_fields = (‘templates_dict’, ‘op_args’, ‘op_kwargs’)

При создании пользовательского оператора достаточно переопределить атрибут template_fields [1]:

class CustomBashOperator(BaseOperator):

template_fields = (‘file_name’, ‘command’, ‘dest_host’)

В этом примере поля «file_name», «command», «dest_host» доступны для создания шаблонов Jinja.

Контролируйте доступ на уровне DAG

Мы уже упоминали, что в Apache AirFlow 2.0 добавлены новые кластерные политики, которые предоставляют интерфейс для работы с каждой задачей или DAG во время его загрузки, а также непосредственно перед выполнением задачи. Поэтому следует использовать эти возможности по максимуму, определив в пользовательских настройках airflow_local_settings политики dag_policy, task_policy и task_instance_mutation_hook. Разумеется, предварительно следует создать настраиваемую роль – пользователя Linux, который будет выполнять разрешенные действия с графами и задачами.

Еще 5 полезных практик в работе с AirFlow

  • используйте статическую дату начала start_date, чтобы корректно определить расписание запуска DAG;
  • при глобальных структурных изменениях переименуйте файл с цепочкой задач, создав его новую версию, чтобы сохранить всю историю. При этом в частности и при создании любых файлов вообще придерживайтесь согласованной и понятной структуры их именования и хранения.
  • придерживайтесь последовательного метода организации зависимостей между задачами;
  • разработайте и внедрите стратегию уведомления о сбоях;
  • установите повторные попытки выполнения отказавших задач на уровне DAG.

Как использовать все эти рекомендации на практике, чтобы эффективно применять Apache AirFlow для простой разработки сложных конвейеров аналитики больших данных с Hadoop и Spark, вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://medium.com/swlh/airflow-dag-best-practices-716ac95b82d1
  2. https://jinja.palletsprojects.com/en/2.11.x/
  3. https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html