Программный запуск DAG Apache AirFlow через REST API

запуск DAG по REST API Apache AirFlow, REST API DAG Apache AirFlow, тестирование DAG Apache AirFlow, Apache AirFlow примеры курсы обучение, обучение дата-инженеров, инженер данных курсы примеры обучение, тестирование DAG airflow example, инженерия данных с Apache AirFlow пример, обучение большим данным, Школа Больших Данных Учебный центр Коммерсант

Сегодня в рамках обучения дата-инженеров рассмотрим, как программно запустить DAG в Apache AirFlow через вызовы REST API. А также повторим основы интеграционного взаимодействия ИС через отправку HTTP-запросов к конечным точкам.

 Как устроен REST API в Apache AirFlow

Напомним, начиная с выпуска 2 Apache Airflow включает стабильный RESTfull API версии 1.0.0 для доступа к своему объекту или запуска DAG через отправку HTTP-запросов к конечным точкам. Это пригодится, например, в задаче интеграционного тестирования DAG, о чем мы писали здесь. Чтобы упростить работу дата-инженера, Apache Airflow поддерживает ряд конечных точек REST API для своих объектов. Большинство из них принимают JSON в качестве входных данных и возвращают ответы в этом же формате. Поэтому надо добавить в запрос следующие заголовки:

Content-type: application/json
Accept: application/json

Напомним, REST как самоописательный стиль интеграции информационных систем, работает по принципу запрос-ответ по протоколу HTTP. Запросы посылаются к конечным точкам (URL), которые предоставляют доступ к ресурсам. В REST API Apache AirFlow термин ресурс относится к одному типу объекта в метаданных фреймворка. API разбивается по соответствующему ресурсу его конечной точки. Имя ресурса обычно имеет множественное число и выражается в стиле camelCase, например, dagRuns. Имена ресурсов используются как часть URL-адресов конечных точек, а также в параметрах и ответах API. Между именем параметра URL и именем поля должна быть согласованность. Имена полей выражаются в стиле snake_case, например:

{
  "name": "string",
  "slots": 0,
  "occupied_slots": 0,
  "used_slots": 0,
  "queued_slots": 0,
  "open_slots": 0
}

Как и любой RESTfull-сервис, Apache AirFlow поддерживает CRUD-операции (Create, Read, Update и Delete) для большинства ресурсов, хотя некоторые конечные точки имеют особое поведение в виде исключений. В общем случае реализация CRUD-операций в REST API выглядит следующим образом:

  • чтобы создать ресурс, надо отправить HTTP-запрос POST с необходимыми метаданными ресурса в теле запроса. Ответ возвращает код ответа 201 Created при успешном использовании метаданных ресурса, включая его внутренний идентификатор, в теле ответа.
  • Для чтения ресурса или для вывода списка ресурсов используется HTTP-запрос GET. Для чтения определенного ресурса в запросе отправляется его идентификатор. Ответ обычно возвращает код ответа 200 OK в случае успеха с метаданными ресурса в теле ответа. Если запрос GET не включает конкретный идентификатор ресурса, он рассматривается как запрос списка. Ответ обычно возвращает код ответа 200 OK в случае успеха с объектом, содержащим список метаданных ресурсов в теле ответа. В параметрах запроса можно задать смещение, после которого надо возвращать объекты (offset) и максимальное количество объектов для выборки (limit), по умолчанию равное 25. Например,
v1/connections?limit=25&offset=25
  • Для обновления ресурса его идентификатор нужно указать в HTTP-запросе PATCH с полями, которые следует изменить в теле запроса. Ответ обычно возвращает код ответа 200 OK в случае успеха с информацией об измененном ресурсе в теле ответа.
  • Удалить ресурс можно, указав его идентификатор в HTTP-запросе DELETE. Ответ обычно возвращает код ответа 204 No Content в случае успеха.

Маска обновления доступна в качестве параметра запроса в конечных точках для PATCH-запросов. Она используется для уведомления API, какие поля нужно обновить. Использование update_mask упрощает обновление объектов, точечно указывая серверу, какие поля в объекте следует обновить, вместо того обновления всех полей. Запрос на обновление игнорирует любые поля, не указанные в маске поля, оставляя их текущими значениями, например:

resource = request.get('/resource/my-id').json()
  resource['my_field'] = 'new-value'
  request.patch('/resource/my-id?update_mask=my_field', data=json.dumps(resource))

Вспомнив основы REST API и особенности его реализации в Apache AirFlow, рассмотрим, как использовать это на практическом примере для запуска DAG.

Программный запуск DAG

Чтобы работать с REST API в Apache AirFlow, следует прежде всего включить его, отредактировав файл конфигурации, так как по умолчанию фреймворк не принимает никаких запросов через этот интерфейс. В файле конфигурации airflow.cfg и следует установить нужный бэкенд аутентификации auth_backends. Например, базовая аутентификация по логину и паролю для пользователей, созданных с помощью входа в систему LDAP или в базе данных метаданных Airflow с использованием пароля, устанавливается следующим образом:

# auth_backends = airflow.api.auth.backend.session
auth_backends = airflow.api.auth.backend.basic_auth

Напомним, Airflow поддерживает несколько методов аутентификации (Kerberos, Basic), и можно добавить свой собственный. Проверить, какой метод аутентификации используется сейчас, поможет следующая команда:

$ airflow config get-value api auth_backends
airflow.api.auth.backend.basic_auth

Далее следует включить совместное использование ресурсов между источниками (Cross-Origin resource Sharing (CORS) – это функция безопасности браузера, которая ограничивает HTTP-запросы, инициированные сценариями, запущенными в браузере. Для этого в конфигурационном файле airflow.cfg следует установить значения параметрам access_control_allow_headers, access_control_allow_methods и access_control_allow_origins в разделе [api], например:

[api]
access_control_allow_headers = origin, content-type, accept
access_control_allow_methods = POST, GET, OPTIONS, DELETE
access_control_allow_origins = https://exampleclientapp1.com https://exampleclientapp2.com

Далее можно протестировать API, перечислив все доступные DAG через GET-запрос на конечной точке /api/v1/dags. При использовании AirFlow на локальном хосте GET-запрос надо направить на конечную точку http://localhost:8080/api/v1/dags, предварительно установив авторизацию.

Затем нужно создать Python-файл DAG и поместить его в папку dags. Идентификатор запуска DAG (dag_run_id) может быть любым (строка или текст). Наконец, нужно получить доступ к конфигурации в коде или Python-функции. Можно получить доступ к переменным, которые установлены в конфигурации, переданной в запросе. Для этого следует написать Provide_context=True в функции PythonOperator и получить к ней доступ с помощью kwargs[‘dag_run’].conf.get(«имя переменной»). Чем Apache AirFlow Отличается от AWS Step functions, читайте в нашей новой статье.

Освойте администрирование и эксплуатацию Apache AirFlow для аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://deeputyagi39.medium.com/how-to-trigger-airflow-dag-using-rest-api-dd40e3f7a30d
  2. https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html
  3. https://airflow.apache.org/docs/apache-airflow/stable/security/api.html
Поиск по сайту