Динамическая генерация DAG в Apache Airflow: 5 способов автоматизации рутинных задач

Автор Категория ,
Динамическая генерация DAG в Apache Airflow: 5 способов автоматизации рутинных задач

Сегодня рассмотрим, как упростить работу дата-инженера в Apache AirFlow, автоматизировав процесс создания DAG’ов из одного или нескольких Python-файлов. На практических примерах разберем достоинства и недостатки 5 способов динамической генерации, а также особенности масштабирования Big Data pipeline’ов.

Что такое динамическая генерация DAG в Apache Airflow и зачем она нужна

В статье про лучшие практики разработки Big Data pipeline’ов в Apache Airflow мы упоминали про DRY-принцип (Don’t Repeat Yourself). Например, многие ELT-процессы с разными источниками и приемниками данных часто содержат большое количество одинакового Python-кода. Чтобы каждый раз не писать это вручную, целесообразно генерировать это Python-код автоматически в виде DAG (Directed Acyclic Graph) – направленный ациклический граф задач по обработке данных [1].

Airflow выполняет весь код Python в папке DAG_FOLDER и загружает все объекты DAG, которые появляются в функции globals(). Напомним, Python-функция globals()возвращает переменные, которые пользователь может устанавливать, и получать значения, как в словаре. После установки значений можно вызывать переменные, как будто они были созданы в обычном режиме [2].

Таким образом, самый простой способ создать DAG – записать его как статический файл Python. Но создание DAG вручную не всегда целесообразно. К примеру, когда есть сотни или тысячи DAG’ов, которые делают похожие вещи, меняя лишь некоторые параметры. Или нужен набор DAG для загрузки таблиц, которые могут меняться. В этих и многих других случаях поможет динамическое создание DAG.

Поскольку все в Airflow – это код, можно динамически создавать DAG, используя только Python. Создавав объект DAG в функции globals() с помощью Python-кода в папке DAG_FOLDER, Airflow загрузит его. Динамическая генерация Directed Acyclic Graph возможна несколькими способами, которые можно сгруппировать по количеству используемых Python-файлов, что мы и рассмотрим далее [3].

3 способа динамической генерации DAG из одного файла

Итак, первым способом динамического создания Directed Acyclic Graph является – это наличие одного файла Python, который генерирует DAG на основе входных параметров, например, списка API или таблиц как в ETL/ELT-конвейере со множеством источников или приемников данных. Это требует создания множества Directed Acyclic Graphs по одному шаблону. Преимуществами такого метода являются следующие [3]:

  • простая реализация;
  • возможность принимать входные параметры из разных источников;
  • быстрота – добавление DAG-файлов происходит практически мгновенно, т.к. для этого нужно лишь изменить входные параметры.

Однако, этот способ имеет и недостатки:

  • сам файл Directed Acyclic Graph при этом не создается;
  • необходимо иметь файл Python в папке DAG_FOLDER и генерация кода будет выполняться при каждом heartbeat-сигнале планировщика, что может вызвать проблемы с производительностью при большом количестве DAG’ов или подключении к внешней системе, например к СУБД.

Однофайловый способ реализуется по-разному в зависимости от того, какие входные параметры используются для создания DAG [3]:

  • метод Create_DAG – пользовательская функция Python, которая будет генерировать DAG на основе входного параметра. Входные параметры могут поступать из любого источника, к которому имеет доступ скрипт Python. При необходимости можно установить простой цикл для генерации уникальных параметров и передачи их в глобальную область видимости, зарегистрировав их как действительные DAG в планировщике Airflow.
  • генерация DAG из переменных. Входные параметры не обязательно должны существовать в самом файле Directed Acyclic Graph, их значения можно установить в объекте Variable. Чтобы получить это значение, нужно импортировать класс Variable и передать его в соответствующую область. Чтобы интерпретатор регистрировал этот файл как действительный независимо от того, существует ли переменная, default_var устанавливается в 3.
  • генерация DAG из подключений – отличный вариант, если каждый DAG подключается к СУБД или API. Поскольку дата-инженеру все равно придется настраивать эти подключения, создание Directed Acyclic Graph из этого источника позволит избежать избыточной работы. Для реализации этого способа следует извлечь соединения из базы метаданных Airflow, создав экземпляр Session и запросив таблицу Connection. Можно отфильтровать этот запрос, чтобы он извлекал только те соединения, которые соответствуют определенным критериям. Необходимо обратиться к библиотеке Models, чтобы работать с классом Connection, а доступ к классу Session нужен, чтобы запрашивать текущий сеанс базы данных.

Многофайловые методы

Также для динамической генерации Directed Acyclic Graph можно использовать несколько файлов Python, чтобы для каждого созданного DAG в папке DAG_FOLDER был отдельный Python-файл. Это особенно полезно в production, когда Python-скрипт генерирует файлы DAG при выполнении рабочего процесса CI/CD. DAG создаются во время сборки CI/CD, а затем развертываются в Airflow. Также может быть другой Directed Acyclic Graph, который периодически запускает этот скрипт генерации.

Главные преимущества такого метода:

  • лучшая масштабируемость по сравнению с однофайловыми способами, т.к. не нужно анализировать код в DAG_FOLDER и код генерации DAG не выполняется при каждом heartbeat-сигнале планировщика;
  • файлы DAG создаются явным образом перед развертыванием в Airflow и есть полная видимость их кода.

Основным недостатком этого метода является сложность настройки. Кроме того, изменения в существующих DAG или дополнительные Directed Acyclic Graphs создаются только после запуска скрипта, что может требовать полноценного развертывания.

Реализовать метод динамической генерации DAG из нескольких файлов помогут следующие способы:

  • создание DAG из файлов конфигурации JSON с помощью Python-скрипта. Этот вариант актуален для дата-аналитиков, которым необходимо запланировать разные SQL-запросы по одним источникам данных, выполняющиеся в разное время. Для этого сперва нужно создать файл-шаблон со структурой DAG и переменными, информация о которых будет динамически генерироваться, например, dag_id, scheduletoreplace и querytoreplace. Затем нужно создать папку dag-config с файлом конфигурации JSON для каждого DAG. Файл конфигурации определяет необходимые параметры: dag_id, scheduletoreplace и querytoreplace. Наконец, создать Python-скрипт, который будет создавать файлы DAG на основе шаблона и файлов конфигурации, просматривая каждый файл конфигурации в папке dag-config/ и перезаписывает значения параметры. Чтобы сгенерировать файлы DAG, можно запустить этот сценарий ad-hoc как часть рабочего процесса CI/CD или создать еще один DAG для периодического запуска скрипта. Этот простой пример с генерацией DAG по одному шаблону можно расширить, добавив динамические входы для задач, зависимостей, различных операторов и пр.
  • Фабрика DAG’ов (DAG-factory) – Python-библиотека с открытым исходным кодом для динамической генерации DAG-файлов Airflow из файлов YAML. Разработанная сообществом энтузиастов, она доступна для свободного скачивания с Github [4]. Чтобы использовать dag-factory, следует установить этот пакет в среде Airflow и создать файлы конфигурации YAML для генерации Directed Acyclic Graph. Затем можно динамически создать Directed Acyclic Graph, вызвав метод dag-factory.generate_dags() в Python-скрипте. Как эта идея реализуется на практике, читайте в нашей следующей статье.

Динамическая генерация DAG’ов и масштабируемость AirFlow

В зависимости от количества DAG’ов, конфигурации Airflow и инфраструктуры, в которой он развернут, динамическое создание Directed Acyclic Graph может вызвать проблемы с производительностью при масштабировании. Поэтому дата-инженеру рекомендуется учесть следующие аспекты [3]:

  • любой код в папке DAG_FOLDER будет выполняться при каждом heartbeat-сигнале планировщика. Поэтому однофайловые методы генерации DAG, вероятней всего, вызовут проблемы с производительностью при масштабировании.
  • если время синтаксического анализа Directed Acyclic Graph, т.е. кода в папке DAG_FOLDER, больше интервала heartbeat-сигнала планировщика, Sheduler может заблокироваться, и задачи не будут выполняться.

Решить эти проблемы с производительностью поможет обновление до Airflow версии 2.0, в котором поддерживается высокая доступность планировщика. Но все же дата-инженеру потребуется дополнительно оптимизировать конвейеры в зависимости от рабочего масштаба. Не существует универсального способа реализации или масштабирования динамически генерируемых DAG, но гибкость Airflow позволяет найти решение, которое подойдет для конкретного сценария [3].

Больше подробностей про использование Apache AirFlow для разработки сложных конвейеров аналитики больших данных с Hadoop и Spark вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://towardsdatascience.com/data-engineers-shouldnt-write-airflow-dags-b885d57737ce
  2. https://flavio-mtps.medium.com/making-use-of-python-globals-to-dynamically-create-airflow-dags-124e556b704e
  3. https://www.astronomer.io/guides/dynamically-generating-dags
  4. https://github.com/ajbosco/dag-factory