Ускорение загрузки и парсинга DAG-файлов в Apache AirFlow на Kubernetes

AirFlow обучение примеры курсы, AirFlow для дата-инженера, обучение инженер данных AirFlow, AirFlow Kubernetes проблемы и решения, AirFlow Kubernetes конвейер обработки данных примеры курсы обучение, AirFlow Kubernetes примеры курсы обучение, data pipeline AirFlow, Школа Больших Данных Учебный Центр Коммерсант

Специально для обучения дата-инженеров и администраторов кластера тонкостям работы с современными инструментальными средствами оркестрации конвейеров обработки данных, сегодня рассмотрим, почему в Apache AirFlow уходит много времени на парсинг большого количества DAG-файлов и как этого избежать.

Потери времени при парсинге множества DAG-файлов в Apache AirFlow

Apache AirFlow часто используется в проектах аналитики больших данных в качестве мощного и эффективного средства оркестрации конвейеров. Задачи обработки данных в AirFlow объединяются в цепочку в виде направленного ациклического графа (DAG, Directed Acyclic Graph). Однако, в случае большого количества таких DAG-файлов, в каждом из которых описано множество задач, процесс загрузки этих Pyhton-файлов становится слишком долгим. Не дождавшись файла, AirFlow завершает процесс и конвейер терпит сбой. Сократить это время можно, увеличив значение параметра конфигурации dagbag_import_timeout в переменной среды AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT, по умолчанию равное 30.0. Этот параметр задает тайм-аут – время ожидания в секундах, сколько AirFlow будет ждать при импорте импорта DAG-файла.

Получается, достаточно обновлять DAG планировщика раз в несколько минут при ежедневном однократном запуске DAG с учетом пары минут на добавление нового клиента. Но при развертывании Apache AirFlow в кластере Kubernetes процесс загрузки DAG-файлов повторяется для каждого исполнителя Kubernetes. Это связано с тем, что в Airflow есть рабочий процесс быстрого выполнения (мини-планировщик), который может запускать следующую задачу.

После того, как worker завершил текущую задачу, Airflow проверяет, есть ли последующая задача в той же DAG, которая теперь готова к запуску. При наличии такой задачи текущий worker выполняет ее немедленно. Это значительно увеличивает общую эффективность всего конвейера обработки данных, так как планировщику не нужно планировать последующую задачу для нового worker’а. Кроме того, поскольку текущий рабочий процесс уже проанализировал DAG, не нужно тратить время на анализ последующей задачи. Этот механизм очень полезен для линейных DAG, особенно в контексте KubernetesExecutor. Также Airflow 2.0 включает оптимизацию процесса запуска задачи и цикла планировщика, что снижает задержку выполнения задачи.

Тем не менее, каждый worker, которых может быть очень много в течение всего дня, тратит более минуты на загрузку тысяч DAG-файлов, тогда как действительно актуальным из них является лишь один. Например, имея около 4000 ежедневных DAG с 10 задачами в каждой, потери времени составят несколько часов в день! Эта задержка образуется, поскольку каждое выполнение задачи фактически должно полностью выполнять парсинг DAG-файла, в который она входит. Но когда каждый DAG является изолированным и не зависит от других, нет необходимости анализировать все DAG во время выполнения задачи. Как это сделать, рассмотрим далее.

Как указать точный идентификатор нужной задачи в конкретном DAG

Точечное обращение к конкретному DAG-файлу или отдельной задаче в нем можно реализовать, используя возможности развертывания Apache AirFlow в кластере Kubernetes, о чем мы писали здесь. В частности, если каждая из динамических DAG-цепочек полностью независима от других, т.е. каждый DAG-файл представляет отдельный изолированный клиентский конвейер, цикл может загружать только нужный DAG-файл для исполнителя Kubernetes. Реализовать это можно благодаря внутреннему поведению Airflow, которое передает идентификатор DAG (dag_id) как аргумент для оптимизации времени загрузки. Получить значение этого dag_id можно получить из меток/аннотаций пода Kubernetes.

Однако, этот подход работает лишь для некоторых конфигураций. Помимо Kubernetes Executor Airflow может иметь Celery, Local Executor, CeleryKubernetesExecutor, которые можно запустить, запустив новый интерпретатор, а также могут быть пользовательские исполнители. Поэтому силами сообщества Apache AirFlow в июле 2022 года был выпущен pull-request, который добавляет правильный контекст в виде настройки менеджеров контекста и переменные среды, чтобы указать, какой DAG анализируется в контексте процессора или выполнения задачи. Это позволяет получить DAG_ID и TASK_ID, чтобы ускорить работу конвейеров, сократив количество загружаемых или обрабатываемых данных.

Читайте в нашей новой статье, как организовать совместную работу с DAG-файлами при развертывании Apache AirFlow на Kubernetes.

Освойте администрирование и эксплуатацию Apache AirFlow для дата-инженерии и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://medium.com/apache-airflow/airflows-magic-loop-ec424b05b629
  2. https://www.astronomer.io/blog/airflow-2-scheduler/#single-scheduler-optimization
  3. https://medium.com/apache-airflow/magic-loop-in-airflow-reloaded-3e1bd8fb6671
  4. https://github.com/apache/airflow/pull/25161
Поиск по сайту