Соединения и хуки в Apache Airflow: разбираем на примере SQLite

В прошлый раз мы говорили о способе взаимодействия задач между собой в Apache Airflow. Сегодня поговорим о таких сущностях, как соединение (connections) и хуки (hooks). Читайте в этой статье: что такое хук и соединение, как создать и скачать соединение, а также как подключить базу данных в Airflow.

Что такое связи и хуки в Apache Airflow

Соединение (connection) – это набор параметров (логин, пароль, хост) и некая внешняя система c уникальным именем (conn_id), к которой и происходит подключение. Такой системой может быть базы и хранилища данных, электронная почта, AWS, Google Cloud, Telegram и т.д. Всякий раз, когда вам нужно изнутри Airflow подключиться к БД, то будут использоваться соединения и хуки.

Список соединений доступен в пользовательском интерфейсе во вкладке Admin->Connection. Там же можно добавить новое соединение. Если у вас нет нужного типа соединения, то его следует установить. Для этого используется Airflow Provider, список доступных пакетов находится в документации. А установить их (по крайней мере в Airflow 2.0) можно через обычный pip, например, вот так можно получить соединение с PostgreSQL:

$ pip install apache-airflow-providers-postgres
# или что то же самое:
$ pip install apache-airflow[postgres]

Некоторые типы соединений отсутствуют в списке conn type, например, Telegram. Тогда единственным способом создать новое соединение становится использование CLI, как это показано в документации.

Data Pipeline на Apache Airflow

Код курса
AIRF
Ближайшая дата курса
22 мая, 2024
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.

Соединение на примере SQLite

SQLite — легковесная СУБД, которая будет служить примером организации задач Airflow с базой данных. Создадим БД под названием tmp.db и поместим её в директорию $HOME/airflow. В этой БД создадим таблицу example с двумя полями. Вот так все это выглядит:

$ cd airflow
$ sqlite3 tmp.db
sqlite> TABLE example(id INT, exec_time TEXT);

Ниже на рисунке показано, как создать соединение в пользовательском интерфейсе. Мы указали идентификатор связи, название БД и хост. Если бы это был PostgreSQL, то необходимо было бы добавить порт (например, 5432), логин и пароль (если есть). Термин scheme может отличаться в других БД.

Создание соединения в Apache Airflow
Конфигурация соединения SQLite

Теперь построим такую задачу Airflow, которая будет добавлять значения полей.

Хук — это SQLOperator, который можно использовать в функциях

Хук (hook) предоставляет интерфейс для взаимодействия с внешней системой в пределах одного графа. Например, некоторые задачи требуют доступа к MySQL, и чтобы не устанавливать связь каждый раз в нужной задаче, можно использовать хук. Хуки также позволяют не хранить параметры аутентификации в графе. По сути своей, хук позволяет использовать возможности SQLOperator внутри PythonOperator.

Для баз данных используется единый API, поэтому методы для работы с ними будут одинаковыми. О хуках у нас есть статья. Итак, хук соединения SQLite импортируется и реализуется следующим образом:

from airflow.providers.sqlite.hooks.sqlite import SqliteHook
sqlite_hook = SqliteHook(sqlite_conn_id='sqlite_conn')

Хуки баз данных наследуются от класса DbApiHook, поэтому если вам понадобятся узнать о методах, то лучше заглядывать в него.

Data Pipeline на Apache AirFlow и Arenadata Hadoop

Код курса
ADH-AIR
Ближайшая дата курса
по запросу
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.

Пусть имеются две задачи: первая добавляет значения в таблицу базы данных, а вторая выдает эти значения. Первая задача будет использовать хук, вторая — SQLOperator, который будет иметь то же самый идентификатор соединения. Добавление значений полей в базу данных осуществляется через метод insert_rows. Ниже продемонстрирован код в Python. На это примере можно заметить, что хук представляет собой SQLOperator, который можно использовать внутри функции.

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from airflow.providers.sqlite.operators.sqlite import SqliteOperator

from airflow.providers.sqlite.hooks.sqlite import SqliteHook

def insert(**kwargs):
    exec_time = kwargs['ts']
    sqlite_hook = SqliteHook(sqlite_conn_id='sqlite_conn')
    rows = [(1,  exec_time),]
    fields = ['id', 'exec_time']
    sqlite_hook.insert_rows(
        table='example',
        rows=rows,
        target_fields=fields,
    )

with DAG(
    'conhook',
    schedule_interval=None,
    start_date=days_ago(2),
) as dag:
    insert_into_example = PythonOperator(
        task_id='insert_into_example',
        python_callable=insert,
    )

    print_rows = SqliteOperator(
        task_id='print_rows',
        sqlite_conn_id='sqlite_conn',
        sql='SELECT id, exec_time from example'
    )

    insert_into_example >> print_rows

В некоторых версиях Airflow метод insert_rows не работает для SQLite из-за проблем в исходном коде [1]. Данная проблема связана с тем, что добавляются значения в базу данных SQLite не совсем правильно, поскольку вместо указаний типов данных следует использовать знак ? (подробности см. тут). Если возникают проблемы с БД, то можно выполнить запрос самостоятельно через метод run, например так:

sql = """
INSERT INTO example (id, exec_time)
values (%d, "%s")
"""
sqlite_hook = SqliteHook(sqlite_conn_id='sqlite_conn')
sqlite_hook.run(sql % (0, exec_time))

Можно ещё раз убедиться, что новые значения появляются в таблице через интерфейс командной строки, выполнив тот же самый Select.

 

Ещё больше подробностей о соединениях, хуках в Apache Airflow вы узнаете на наших образовательных курсах в лицензированном учебном центре обучения и повышения квалификации руководителей и ИТ-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве:

Смотреть расписание

Источники

  1. https://github.com/apache/airflow/pull/17695
Поиск по сайту