Всего 2 cURL-вызова для потокового обновления данных с Apache Kafka Connect

Автор Категория , ,
Всего 2 cURL-вызова для потокового обновления данных с Apache Kafka Connect

Сегодня в рамках обучения разработчиков распределенных приложений и дата-инженеров рассмотрим практический пример потоковой интеграции данных из 2-х разных источников с Apache Kafka. Читайте далее, как мгновенно передать данные между реляционными СУБД с помощью готовых JDBC-коннекторов через cURL-вызовы к REST API Kafka Connect.

Apache Kafka как средство потоковой интеграции данных

Интеграция данными между разными приложениями и облачными сервисами – одна из самых распространенных ИТ-задач. Apache Kafka может выступать в качестве сервисной шины предприятия (ESB, Enterprise Service Bus), предоставляя все необходимые компоненты для интеграции данных в соответствии с SOA-подходом [1]:

  • набор коннекторов для подключения к разным системам для приема и отправки данных;
  • очередь сообщений для организации промежуточного хранения сообщений в процессе их доставки;
  • платформа связи коннекторов с очередью, которая организует асинхронную передачу информации между источниками и приемниками с гарантированной доставкой сообщений и возможностью их трансформации.

Однако, при построении ESB не стоит встраивать в нее логику интеграции данных или бизнес-логику, чтобы избежать зависимости от связываемых сервисов. Подробнее об этом мы рассказывали здесь на примере кейса компании Авито.

Kafka Connect позволяет реализовать гибкую интеграцию между разными приложениями, предоставляя единое масштабируемое и надежное решение, которое заменяет медленные пакетные задания точечных интеграций между отдельными сервисами на быстрые потоковые обновления по всему ИТ-ландшафту предприятия. Рассмотрим типичный кейс, когда необходимо быстро обновить одну SQL-СУБД информацией из другой базы данных, например, при изменении статуса заказа требуется отразить это во внутренней базе CRM. Это означает, что каждый раз при изменении таблицы STATUS в системе исполнения заказов нужно обновить таблицу ORDER_STATUS в базе CRM. Для простоты предположим, что обе интегрируемые СУБД являются базами данных MySQL [2]. Как связать их с помощью Apache Kafka Connect, рассмотрим далее.

Apache Kafka для инженеров данных

Код курса
DEVKI
Ближайшая дата курса
25 июля, 2022
Длительность обучения
32 ак.часов
Стоимость обучения
80 000 руб.

Интеграция SQL-СУБД с помощью Kafka Connect: практический пример

В общем случае для рассматриваемого кейса интеграции 2-х SQL-СУБД с помощью работающего кластера Apache Kafka и Kafka Connect требуется выполнить следующие шаги:

  • создать соединение-источник для чтения данных из таблицы STATUS из системы исполнения заказов;
  • создать соединение-приемник для записи данных в таблицу ORDER_STATUS базы данных CRM.

Проще всего это сделать через cURL-вызовы REST API Kafka Connect. Напомним, cURL (Client URL) – это свободно распространяемая утилита, кроссплатформенная служебная программа командной строки. Она позволяет взаимодействовать с разными серверами по множеству протоколов (FTP, FTPS, HTTP, HTTPS, TFTP, SCP, SFTP, Telnet, DICT, LDAP, POP3, IMAP и SMTP) с синтаксисом URL, поддерживая сертификаты HTTPS, методы HTTP POST, HTTP PUT, загрузку на FTP и через формы HTTP. Еще CURL поддерживает различные способы аутентификации: базовый, дайджест, NTLM и Negotiate для HTTP, а также Kerberos для FTP [3].

Таким образом, создание соединения-источника для Kafka Connect будет выглядеть так:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_mysql_01",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://fulfillmentdbhost:3306/fulfillmentdb",
"connection.user": "fullfilment_user",
"connection.password": "<password>",
"topic.prefix": "order-status-update-",
"mode":"timestamp",
"table.whitelist" : "fulfullmentdb.status",
"timestamp.column.name": "LAST_UPDATED",
"validate.non.null": false
}
}'

Здесь команда cURL указывает Kafka Connect определенный тип исходного коннектора, а именно JdbcSourceConnector для подключения к базе данных MySQL по адресу fillmentdbhost: 3306/fillmentdb с использованием предоставленных учетных данных. Также выполняется настройка коннектора для отслеживания новых записей в таблице STATUS СУБД-источника на основе метки времени в столбце «LAST_UPDATED». В результате каждый раз при записи в таблицу новой строки Kafka Connect автоматически запишет соответствующее новое сообщение в топик Kafka «order-status-update-status». А второй вызов cURL обеспечивает соединение с приемником, который отслеживает изменения в указанном топике Kafka:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_sink_mysql_01",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://crmdbhost:3306/crmdb",
"connection.user": "crm_user",
"connection.password": "<password>",
"topics": "order-status-update-status",
"table.name.format" : "crmdb.order_status"
}
}'

Здесь команда cURL создает коннектор приемника JdbcSinkConnector, который подключается к другой СУБД – crmdbhost: 3306/crmdb. Этот коннектор будет отслеживать топик «order-status-update-status» на предмет новых сообщений и записывать их в таблицу ORDER_STATUS базы данных CRM (crmdb).

Аналогичным образом можно интегрировать любые приложения, для которых Kafka Connect предоставляет готовые коннекторы, или написать собственный. Далее нужно настроить коннекторы источника и приемника данных, и потоковая передача обновлений готова. Кроме того, после настройки коннектора-источника его можно использовать для связи сразу с несколькими приемниками. Например, в рассмотренном кейсе можно запустить третью команду cURL, чтобы записать все обновления статуса заказа в другую систему [2]. В следующей статье мы рассмотрим, что под капотом этих JDBC-коннекторов и разберем source-connctor Kafka Connect От Confluent.

Администрирование кластера Kafka

Код курса
KAFKA
Ближайшая дата курса
11 июля, 2022
Длительность обучения
24 ак.часов
Стоимость обучения
60 000 руб.

Другие примеры интеграции систем с помощью Apache Kafka Connect мы рассматривали здесь и здесь. Освоить на практике эти и другие тонкости администрирования и эксплуатации Apache Kafka для разработки распределенных приложений потоковой аналитики больших данных вы сможете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. http://www.sovtex.ru/products/esb/
  2. https://levelup.gitconnected.com/how-to-use-kafka-connect-to-connect-two-data-sources-on-heroku-49104d693e4b
  3. https://ru.wikipedia.org/wiki/CURL