Как построить логически сложный ETL-конвейер: ветвления DAG в Apache AirFlow

сложные конвейеры обработки данных DAG AirFlow, условная логика выполнения задач в AirFlow, AirFlow DAG операторы ветвления, AirFlow обучение примеры курсы, AirFlow для дата-инженера, обучение инженер данных AirFlow, AirFlow конвейер обработки данных примеры курсы обучение, data pipeline AirFlow, Школа Больших Данных Учебный Центр Коммерсант

Сегодня в рамках обучения дата-инженеров разберем, как организовать логическое ветвление рабочего процесса в Apache AirFlow с помощью операторов. Какие операторы позволяют организовать условную логику в DAG, чем BranchPythonOperator отличается от ShortCircuitOperator, как запустить задачу в зависимости от времени и/или дня недели, а также результата выполнения SQL-запроса.

Условная логика в DAG: 5 операторов AirFlow

При проектировании сложных конвейеров данных дата-инженер может столкнуться со случаями, когда после выполнения определенной задачи DAG разделяется на несколько вариантов, выбор одного из которых зависит от предыдущего результата или других условий. Например, требуется запускать задачи только в определенные дни или инициировать запуск DAG, только если предыдущее выполнение было успешным. В Airflow есть несколько вариантов построения условной логики DAG с помощью готовых операторов ветвления:

  • BranchPythonOperator на основе PythonOperator, который принимает функцию Python в качестве входных данных и возвращает список идентификаторов задач, с которыми DAG будет работать на основе некоторой логики. BranchPythonOperator отлично подходит для организации условной логики, если ее можно легко реализовать в простой функции Python. Важно, что с BranchPythonOperator вызываемый Python-объект должен возвращать хотя бы один идентификатор задачи для любой выбранной ветки. Этот оператор не может ничего возвращать. Все остальные «ветки» или непосредственно нижестоящие задачи помечаются как пропущенные, чтобы эти пути не могли продвигаться вперед. Пропущенные состояния распространяются вниз по течению, чтобы позволить заполнить состояние DAG и сделать вывод о состоянии его выполнения. Если по какому-то пути ветвления не нужно выполнять никаких действий, следует использовать EmptyOperator (бывший DummyOperator) в этой ветке. Этот пустой оператор практически ничего не делает, и его можно использовать для группировки задач в DAG. Задача оценивается планировщиком, но никогда не обрабатывается исполнителем.

  • ShortCircuitOperator, который также принимает вызываемый объект Python, возвращая True или False в зависимости от логики. Если возвращено значение True, DAG продолжит работу, а если возвращено значение False, все нижестоящие задачи будут пропущены. ShortCircuitOperator лучше всего использовать, когда известно, что часть DAG запускается лишь время от времени. Например, DAG может выполняться ежедневно, но некоторые задачи должны выполняться только по воскресеньям. Или DAG управляет моделью машинного обучения, и задачи, которые публикуют модель, должны выполняться только в том случае, если после обучения достигается определенная точность. Этот тип логики также может быть реализован с помощью BranchPythonOperator, который требует возврата идентификатора задачи. ShortCircuitOperator больше подходит для случаев, когда условная логика эквивалентна «запускать или нет», а не «запускать одно или другое».
  • BranchSQLOperator — активирует ветку в зависимости от того, возвращает ли SQL-запрос True или False.
  • BranchDayOfWeekOperator — активирует ветку в зависимости от того, равен ли текущий день недели заданному параметру week_day.
  • BranchDateTimeOperator — активирует ветку в зависимости от того, попадает ли текущее время в промежуток между target_lower и target_upper.

Последние 3 оператора принимают параметры follow_task_ids_if_true и follow_task_ids_if_false, которые предоставляют список задач для включения в ветвь на основе логики, возвращаемой оператором.

Предположим, в DAG должна выполняться одна из четырех ветвей. Организовать это можно с помощью BranchPythonOperator. При этом если есть подчиненные задачи, которые должны выполняться независимо от того, какая ветвь выбрана, например задача соединения, необходимо соответствующим образом обновить правило триггера. Правило триггера по умолчанию в Airflow — ALL_SUCCESS, что означает, что если вышестоящие задачи пропущены, нижестоящая задача не запустится. Изменить это поможет установка триггера на правило NONE_FAILED_MIN_ONE_SUCCESS, чтобы указать, что задача должна выполняться до тех пор, пока одна вышестоящая задача выполнена успешно и ни одна из задач не завершилась сбоем.

Другим вариантом реализации условной логики в DAG является ShortCircuitOperator. Этот оператор также принимает вызываемый объект Python, который возвращает True или False в зависимости от логики, реализованной для вашего варианта использования. Если возвращено значение True, DAG продолжит работу, а если возвращено значение False, все нижестоящие задачи будут пропущены. Предположим, в DAG есть две задачи, одна из которых всегда возвращает True, а другая — False. После запуска DAG с ShortCircuitOperator, задачи после оператора условия True, будут выполняться, а другие задачи будут пропущены. Подробнее про ShortCircuitOperator читайте в нашей новой статье.

DAG AirFlow Branching operators
Ветвления DAG с разными операторами AirFlow

Читайте в нашей новой статье, как автоматизировать реорганизацию DAG, используя JSON, YAML-файл или другую плоскую структуру данных для хранения динамической конфигурации рабочего процесса

Больше подробностей про администрирование и эксплуатацию Apache AirFlow для эффективной организации ETL/ELT-процессов в аналитике больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://www.astronomer.io/guides/airflow-branch-operator/
  2. https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/python/index.html
Поиск по сайту