CDC-конвейер для MySQL на Apache NiFi: практический пример

Автор Категория ,
CDC-конвейер для MySQL на Apache NiFi: практический пример

Сегодня разберем типичный для современной дата-инженерии кейс построения конвейера обработки измененных данных на Apache NiFi с учетом безопасности и масштабируемости API-вызовов. Также рассмотрим, зачем использовать Apache NiFi при межсистемной интеграции через API-вызовы и как реализовать CDC-подход к изменениям в СУБД MySQL с помощью процессоров этого популярного ETL-фреймворка.

CDC и интеграция систем через API-вызовы

Захват или отслеживания измененных данных (CDC, Change Data Capture) – подход к избирательному перемещению данных в режиме, близком к реальному времени, из реляционной СУБД в хранилище, озеро или другую базу данных. CDC отслеживает все изменения в источнике (вставка, обновление или удаление), и реплицирует их в целевую систему. Для этого на рынке есть готовые CDC-инструменты, например, Qlik Replicate, StreamSets и пр. Также дата-инженер может реализовать CDC собственными силами, разработав конвейер обработки измененных данных, например, как мы описывали это здесь, здесь и здесь. Аналогично можно разработать потоковый CDC-конвейер на мощном и простом в использовании open-source фреймворке Apache NiFi, который считается одним из самых популярных ETL-инструментов для маршрутизации и обогащения огромных объемов данных. Он имеет наглядный веб-интерфейс, множество коннекторов к различным источникам и приемникам данных, а также способен обрабатывать тысячи потоковых файлов за секунды.

Apache NiFi поддерживает передачу данных по протоколу HTTPS, обеспечивая безопасность API-вызовов ко внешним системам. Поэтому его можно использовать при обмене данными между различными микросервисами. Разумеется, не в качестве общей шины предприятия или очереди сообщений, как Apache Kafka, о чем мы писали здесь и здесь, а как средство логирования межсистемного взаимодействия через вызовы API-методов. Например, непрерывное прослушивание таблицы с логами API-вызовов с помощью NiFi позволяет отслеживать их и собирать статистику о том, сколько REST-запросов к каким системам выполнено успешно и на какую конечную точку. В качестве инструментов визуализации и интеллектуального поиска для этого сценария можно добавить Grafana и ELK-стек (Elasticsearch, Logstash, Kibana), чтобы генерировать предупреждения разработчикам о неудачном вызове API.

Это будет особенно полезно при отслеживании измененных данных из нескольких систем-источников. Чтобы понять, как это работает, далее рассмотрим практический пример реализации CDC средствами Apache NiFi – захват измененных данных из реляционной СУБД MySQL в режиме реального времени. Задачу облегчает наличие встроенного процессора NiFi – CaptureChangeMySQL.

Apache NiFI для инженеров данных

Код курса
NFED
Ближайшая дата курса
27 января, 2022
Длительность обучения
16 ак.часов
Стоимость обучения
36 000 руб.

Реализация CDC-конвейера из готовых процессоров Apache NiFi

Прежде всего необходимо установить MySQL Server и включить ведение бинарного лога в разделе [mysqld] конфигурационного файла my.cnf, который хранится в директории /usr/local/etc/mysql или /etc/mysql. Далее следует установить идентификатор сервера, путь log_bin и включить события binlog на уровне строк, чтобы создать все файлы журналов по пути, указанному в свойствах log_bin. После перезагрузки MySQL-сервера следует подтвердить изменение двоичного лога в shell-оболочке. Запустив сервер Apache NiFi, необходимо загрузить драйвер MySQL и сохранить на нем jar-файл коннектора mysql-connector-java.

Далее необходимо из готовых процессоров NiFi создать CDC-конвейер, который передает поток данных из MySQL в локальный каталог или другой пункт назначения. Разумеется, нужно настроить конфигурации процессоров. Например, для CDC-процессора CaptureChangeMySQL, который отслеживает двоичный лог MySQL и идентифицирует операцию изменения данных, нужно заменить хосты MySQL и расположение драйвера. Также требуется задать имя пользователя с правами администратора или root. Необходимо настроить конфигурации и других процессоров NiFi:

  • EvaluateJsonPath – процессор анализирует входящий поток JSON и фиксирует имя базы данных, имя таблицы и временную метку в формате времени UNIX;
  • ExecuteScript – процессор выполняет специальный сценарий классификации данных в соответствии с операцией, добавляя новый столбец «op» для идентификации операции изменения данных в источнике, например, «I» для вставки (Insert), «U» для обновления (Update) и «D» для удаления (Delete).
  • UpdateRecord – процессор преобразует входную метку времени UNIX в формат «ГГГГ-ММ-ДД ЧЧ: мм: сс» и добавляет ее в качестве нового столбца (cdc_timestamp);
  • ConvertAvroToParquet – процессор, который преобразует данные Avro в формат колоночный Parquet, более эффективный с точки зрения хранения и поиска данных;
  • UpdateAttribute – процессор, который добавляет атрибут с родительским путем для конечного размещения файла в системе приемнике, что будет выполняться в следующем процессоре;
  • PutFile – процессор, который размещает файл в конечный пункт назначения – локальный каталог по пути <parentpath>/<databasename>/<tablename>.
обучение дата-инженеров, инженерия данных, дата-инженер Apache NiFi CDC MySQL курсы примеры обучение
CDC-конвейер отслеживания изменений в MySQL из процессоров Apache NiFi

Таким образом, вся работа по реализации CDC-конвейера в Apache NiFi сводится к построению цепочки из готовых процессорных блоков и настройке их конфигураций. В результате ETL-фреймворк реплицирует все изменения в СУБД MySQL почти в реальном времени, отслеживая все операции с данными. Другой ETL-кейс по загрузке данных в СУБД с помощью Apache NiFi читайте в нашей новой статье.

Администрирование кластера Apache NiFi

Код курса
NIFI
Ближайшая дата курса
27 января, 2022
Длительность обучения
16 ак.часов
Стоимость обучения
36 000 руб.

Узнайте больше практических примеров администрирования и использования Apache NiFi для современной дата-инженерии на специализированных курсах для разработчиков, ИТ-архитекторов, инженеров данных, администраторов, Data Scientist’ов и аналитиков Big Data в нашем лицензированном учебном центре обучения и повышения квалификации в Москве:

Источники

  1. https://nikhil-suthar-bigdata.medium.com/cdc-with-apache-nifi-65020c748ff5
  2. https://moroknin.medium.com/scale-api-calls-from-db-using-apache-nifi-81a006aad90c
  3. https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-cdc-mysql-nar/1.5.0/org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL/