Комбо Apache Airflow и NiFi для запланированного запуска ETL-конвейеров: практическая инженерия Big Data

Автор Категория , ,
Комбо Apache Airflow и NiFi для запланированного запуска ETL-конвейеров: практическая инженерия Big Data

Чтобы сделать наши курсы для дата-инженеров еще более полезными, сегодня рассмотрим, как объединить Apache NiFi и Airflow в рамках одного ETL-конвейера обработки данных. Читайте далее, зачем совмещать эти технологии и как сделать это наиболее эффективно, обращаясь к конечным точкам REST API процессоров NiFi из задач DAG-графа AirFlow.

Apache Airflow + NiFi: зачем их совмещать и как это сделать

Apache NiFi и Airflow – очень мощные и популярные инструменты современного дата-инженера, активно используемые в ETL-процессах. Каждый из них решает конкретные проблемы: NiFi отлично справляется с ролью маршрутизатора потоков данных, а AirFlow позволяет запускать пакетные задачи по расписанию в рамках связанной цепочки. Иногда требуется совместить эти технологии, например, чтобы вставить ETL-конвейер NiFi в запланированный DAG-граф Airflow, чтобы это выполнялось следующим образом:

  • Airflow DAG выполняет подготовительную задачу;
  • Airflow запускает процессор в Apache NiFi;
  • NiFi выполняет ETL-процесс;
  • Airflow ожидает окончания ETL-конвейера в NiFi;
  • по завершении ETL-конвейера Airflow запускает выполнение следующей задачи.

Хотя в NiFi есть возможность планировать процессоры по расписанию с CRON, это не считается хорошей практикой, а потому не рекомендуется для production. Предпочтительней использовать Airflow как надежное средство планирования задач и мониторинга их исполнения.

Аналогично, хотя Airflow потенциально может выполнять ETL-процессы, написанные на Python, попытка заменить этим фреймворком Apache NiFi – тоже не лучшая идея, т.к. потоковый маршрутизатор имеет свои преимущества в извлечении, преобразовании и загрузки данных.

Таким образом, используя Airflow в качестве планировщика, а NiFi как back-end инструмент обработки, дата-инженер получает лучшее от обоих технологий: способность Airflow создавать, планировать и отслеживать рабочие процессы с помощью масштабируемых и настраиваемых DAG’ов, а также мощь NiFi для маршрутизации и преобразования данных. Кроме того, совместное использование этих технологий может снизить потребление аппаратных ресурсов. В частности, избежать 100% -ной загрузки ЦП и нехватки оперативной памяти, выполняя задачи с большим объемом данных в экземпляре AWS EC2, который изначально предназначен для планирования задач, а не емких вычислений.

Вышеописанный сценарий взаимодействия NiFi с AirFlow сводится к следующему:

  • DAG AirFlow запускает начальную точку ETL-конвейера NiFi, выступая триггером;
  • AirFlow ожидает завершения ETL-процесса: DAG должен будет получать сигнал, когда NiFi завершит свою часть общего конвейера обработки данных.

Технически интеграция как обмен сигналами и данными между NiFi и AirFlow организуется через REST API, т.е. по протоколу HTTP. REST API обеспечивает программный доступ к управлению экземпляром NiFi в режиме реального времени, позволяя запускать и останавливать процессоры, отслеживать очереди, запрашивать данные о происхождении и пр. Каждая конечная точка включает описание, определения ожидаемых входных и выходных данных, коды потенциальных ответов и авторизации, необходимые для вызова сервиса.

Главным объектом данных в Apache NiFi является потоковый файл, перемещающийся между обработчиками – процессорами, каждый из которых можно настроить на выполнение определенных действий. В нашем примере будут задействованы следующие готовые процессоры:

  • GenerateFlowFile, который создает потоковый файл и выполняет роль начальной точки ETL-конвейера, запускаемого из Airflow;
  • UpdateAttribute, который обновляет атрибуты (метаданные) потокового файла и является конечной точкой конвейера, состояние которого может быть запрошено Airflow.

Между этими двумя процессорами может быть любое количество других, предоставляемых фреймворком или написанных разработчиком DataFlow собственноручно. Можно создать поток данных и блоки процессоров программно через вызовы REST API, а не средствами веб-GUI.

Apache NiFi и AirFlow, курсы дата-инженеров Apache NiFi и AirFlow, обучение Apache NiFi AirFlow
Взаимодействие Apache NiFi и AirFlow через вызовы REST API

DAG Airflow будет состоять из четырех задач, две из которых взаимодействуют с NiFi с помощью собственного Python-кода, включающего вызовы REST API, или использования готовых пакетов типа nipyapi. Например, запрос GET/processors/{id}/config/verification-requests/{requestId}/ при корректно указанных значениях идентификатора процессора (id) и запроса (requestId) вернет JSON-объект с данными о состоянии процессора, включая процент выполнения, текущие запросы и любые сбои. Аналогично работает PUT-метод REST API, позволяя устанавливать значения свойств процессора или других объектов NiFi. Например, вызов PUT/processors/{id}/run-status обновит состояние процессора с идентификатором, значение которого указано в {id}.

Таким образом, задача интеграции двух Big Data фреймворков сводится к настройке процессоров NiFi и разработке DAG в Airflow. Далее потребуется указать операторам AirFlow идентификаторы процессоров NiFi.

 

Настройка блоков ETL-конвейера в Apache NiFi

Изначально стартовый узел (Startnode) ETL-конвейера должен быть выключен, пока работают все последующие процессоры, готовые к запуску с потоковым файлом. Вместо процессора GenerateFlowFile можно использовать процессор GenerateTableFetch или любой другой, который после включения создает потоковые файлы.

Чтобы иметь возможность запускать стартовый узел из Airflow и создавать только один потоковый файл для однократного запуска конвейера, нужно настроить процессор NiFi на стартовом узле следующим образом:

  • Стратегия запуска (Scheduling strategy) – по таймеру (timer driven);
  • Количество одновременно исполняемых задач (concurrent tasks) – 1;
  • Исполнение (Execution) – на основном узле (primary node), чтобы только один узел выполняет процессор;
  • График выполнения (Run Schedule) – 60 секунд, чтобы у Airflow DAG было достаточно времени для остановки процессора перед созданием второго потокового файла NiFi.

На конечном узле (Endnode) ETL-конвейера в процессоре UpdateAttribute следует установить локальное хранилище состояний (Store) и добавить настраиваемое свойство с именем last_tms со значением $ {now ()}.

настройка Apache NiFi, ETL NiFi
Настройка процессоров Apache NiFi в ETL-конвейере

Таким образом каждый раз при прохождении потокового файла через процессор, будет выполняться код now(), а его результат (метка времени) сохранится в свойстве last_tms состояния процессора. Проверить это можно, запустив конвейер вручную, т.е. включить процессор GenerateFlowFile и дождаться создания потокового файла, а затем снова отключить его. Получить доступ к NiFi можно, обратившись к конечной точке REST API /processors/{id}/state методом GET или введя корректный адрес в строке браузера.

 

Настройка Apache Airflow

DAG будет состоять из четырех задач, две из которых (startup_task и waiting_task) взаимодействуют с API Apache NiFi, а первая и последняя используются как заглушки для будущих операций, если их потребуется запланировать или выполнить из Airflow для подготовки или завершения задач.

DAG Apache AirFlow пример
Задачи в DAG-цепочке Apache AirFlow

Инициирующая задача (initial_task) предшествует выполнению ETL-конвейера в NiFi и может включать любой Python-код.

Задача запуска (Startup_task) состоит из трех шагов:

  • включение стартового процессора NiFi – установка GenerateFlowFile в положение RUNNING;
  • ожидание 15 секунд, чтобы дать процессору время для создания потокового файла;
  • останов стартового процессора NiFi – установка GenerateFlowFile в положение STOPPED.

Можно изменить состояние процессора, получив текущее состояние с помощью GET-запроса из /nifi-api/processors/{id}, локально изменив состояние в пользовательском JSON и обратившись к конечной точке /nifi-api/processors/{id}/run_status методом PUT, чтобы установить новое состояние.

Задача ожидания (waiting_task) отвечает за проверку завершенности ETL-конвейера в NiFi, обращаясь к конечной точке nifi-api/processors/{id}/state и анализируя полученный JSON для значения last_tms, пока не появится изменение состояния. Можно делать это в цикле while, проверяя API каждые 60 секунд.

В завершающую задачу (continuation_task) DAG можно вставить любой код, который нужно выполнить после ETL-конвейера NiFi.

Таким образом, DAG AirFlow состоит из вышеуказанных функций, зависимости между которыми следует настроить. Например, если задачи должны выполняться последовательно одна за другой, DAG выглядит как направленная стрела. Также можно дополнительно отредактировать DAG-параметры, а каждой из задач DAG в Python-коде поставить соответствующего оператора или разработать собственный.

В результате описанных действий Airflow запускает ETL-конвейер NiFi, ожидает его завершения, а затем переходит к другой задаче. Есть и другие способы интеграции фреймворков, но это наиболее простой и малозатратный, который не влечет особых накладных расходов. После настройки его легко расширить для других сценариев использования, например, интегрировать конвейеры без изменения общей настройки, добавить/удалить дополнительные блоки в Airflow или NiFi, не мешая соединению между ними. В частности, добавить больше процессоров между стартовым и финишним узлами ETL-конвейера. Также можно изменить содержимое задач DAG-графа в Airflow.

Больше полезных примеров по организации ETL/ELT-процессов c Apache AirFlow и NiFi для эффективной аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://towardsdatascience.com/interconnecting-airflow-with-a-nifi-etl-pipeline-8abea0667b8a
  2. https://nifi.apache.org/docs/nifi-docs/rest-api/index.html