Копирование сложных структур данных из Kafka в СУБД с SMT и JDBC Sink Connector

Автор Категория ,
Копирование сложных структур данных из Kafka в СУБД с SMT и JDBC Sink Connector

Мы уже рассматривали особенности обработки вложенных структур данных на примере парсинга JSON-файлов с Apache Spark и Hive. Развивая эту тему, сегодня поговорим про перенос записей с вложенными массивами из топиков Apache Kafka в реляционные СУБД с пользовательскими SMT-преобразователями и JDBC-коннектором: кейс для разработчиков.

Проблемы обработки сложных структур данных с JDBC-коннектором Apache Kafka

Для передачи данных из Apache Kafka в реляционные РСУБД можно использовать JDBC-коннектор от Confluent, который потребляет записи из топика Kafka и отправляет их в базу данных с помощью JDBC-драйвера. JDBC Connector (Source and Sink) поддерживает множество баз данных, не требуя специального кода для каждой из них: данные загружаются путем периодического выполнения SQL-запроса и создания выходной записи для каждой строки в результирующем наборе. По умолчанию все таблицы в базе данных копируются, каждая в свой отдельный выходной топик. Сама база данных отслеживается на наличие новых или удаленных таблиц и автоматически адаптируется.

При копировании данных из таблицы коннектор может загружать только новые или измененные строки, указывая, какие столбцы следует использовать для обнаружения новых или измененных данных. Можно добиться идемпотентной записи с помощью операций вставки-обновления (upserts). Также поддерживается автоматическое создание таблиц и ограниченное автоматическое развитие.

Коннектор приемника требует знания схем, поэтому следует использовать подходящий преобразователь, например, AVRO, который поставляется с реестром схем (Schema Registry), или преобразователь JSON с включенными схемами. Ключи записи Kafka могут быть примитивными типами или структурой Connect, а значение записи должно быть структурой Connect. Поля, выбираемые из структур Connect, должны относиться к примитивным типам. Если данные в теме имеют несовместимый формат, может потребоваться реализация пользовательского преобразователя.

На практике этот коннектор отлично работает при соблюдении следующих условий:

  • схема записей, которые необходимо отправить в СУБД, плоская и не содержит массивов или аналогичных сложных структур, таких как вложенные массивы объектов с несколькими уровнями;
  • каждая запись из топика Kafka однозначно сопоставляется с одной строкой в ​​целевой таблице реляционной базы данных.

В реальности эти условия соблюдаются не всегда: некоторые записи имеют сложные схемы данных со вложенными массивами, которые также могут содержать многоуровневые структуры. При их отправке из топика Kafka с помощью JDBC Sink-коннектора можно столкнуться с исключением

org.apache.kafka.connect.errors.ConnectException: Unsupported source data type: ARRAY

Для решения этой проблемы стоит рассмотреть следующие варианты:

  • сгладить схему данных с помощью топологии Kafka Streams, а затем использовать эту упрощенную схему в качестве входных данных для JDBC-коннектора приемника;
  • применить функцию explode() в ksqlDB;
  • написать свой собственный Java-потребитель или коннектор Kafka;
  • использовать JDBC Sink с функцией Flatten, который является расширением существующего JDBC-коннектора от Confluent. При его использовании сопоставления и массивы будут разделены и записаны в отдельные целевые таблицы.

Каждый из перечисленных вариантов имеет свои достоинства и недостатки. В частности, если требуется универсальное решение, которое можно повторно использовать для нескольких сценариев с Kafka Connect и минимальными усилиями на разработку кода, предпочтительным кажентся последнее решение – расширение JDBC Sink-коннектора с функцией Flatten. Однако, для сложных схем AVRO потребуется его тщательная настройка и отладка. Поэтому имеет смысл разработать собственный преобразователь отдельных сообщений (SMT, Single Message Transformation) для применения к сообщениям из топика Kafka по мере их прохождения через платформу Connect. SMT преобразуют входящие сообщения после их создания коннектором источника, но до того, как они будут записаны в Kafka. SMT преобразуют исходящие сообщения перед их отправкой на коннектор приемника. Как это реализовать, рассмотрим далее.

Пользовательский SMT для обработки вложенных структур

Если большинство СУБД-приемников поддерживают работу со строками JSON или строками XML, можно использовать обычный JDBC-коннектор приемника Kafka, добавив специальный SMT, который преобразует исходную сложную схему в более простую, содержащую только одно поле. Содержимое этого поля — исходная схема со всеми данными в строковом представлении JSON или XML. Эта строка помещается в промежуточную таблицу «Ключ/Значение» в целевой БД. Логика на стороне БД с использованием триггеров и хранимых процедур анализирует строку JSON или XML и при необходимости сопоставляет ее с любой реляционной моделью. Исходный код этого коннектора доступен на Github по лицензии Apache 2, а его архитектура выглядит следующим образом:

  • потребитель считывает данные со сложными схемами и вложенными структурами из топика Kafka;
  • SMT-преобразователь с JDBC-коннектором приемника на платформе Kafka упрощает схему данных;
  • плоское представление исходной схемы данных в виде строки JSON или XML записывается в промежуточную таблицу «Ключ/Значение» в целевой реляционной базы;
  • парсинг JSON-строки выполняется с помощью хранимых процедур в целевой СУБД;
  • итоговый результат записывается в нужные таблицы базы-приемника.
курсы Kafka обучение пример, Kafka Connect пример SMT JSON парсинг RDBS
Архитектура решения по переносу записей с вложенными массивами из топиков Apache Kafka в реляционные СУБД с пользовательскими SMT-преобразователями и JDBC-коннектором

Представленное решение имеет следующие преимущества:

  • отсутствие дополнительных компонентов типа Kafka Streams или KSQL в интеграционном конвейере;
  • универсальное решение, которое можно применять для нескольких вариантов использования со сходной структурой и большинства реляционных СУБД;
  • изменения схемы данных в сообщениях не отразятся на стороне Kafka Connect, а затронут только хранимые процедуры СУБД;
  • SMT можно комбинировать с другими подобными преобразователями для фильтрации или предварительного преобразования данных.

Обратной стороной этих достоинств являются следующие недостатки:

  • Рост нагрузки на базу данных и повышенное потребление места на жестком диске из-за промежуточных таблиц, куда записываются огромные блоки данных, которые необходимо обрабатывать на стороне БД. При этом из объемной записи JSON может требоваться только один или несколько атрибутов, а остальная строка не нужна, а лишь занимает место.
  • в некоторых реляционных СУБД работа с JSON и XML может быть сложна и требует от разработчика определенных навыков;
  • изменения схемы данных необходимо учитывать в хранимых процедурах, поэтому знать об этом необходимо заранее;
  • мониторинга Kafka Connect недостаточно, также необходимо следить за выполнением логики обработки данных в самой базе;
  • отсутствие безопасности типов – при работе со строками разработчику придется заботиться о совместимости типов данных и применяемых к ним функций самостоятельно.

Некоторые из этих недостатков можно обойти, дополнив описанное SMT-решение функциями поиска конкретной информации из вложенных массивов в записи подключения, например, JsonPath/XPath, чтобы передавать в реляционные СУБД только те данные, которые действительно необходимы, а не огромные строки JSON целиком.

Больше практических деталей по администрированию и использованию Apache Kafka для потоковой аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков больших данных в Москве:

Источники

  1. https://medium.com/bearingpoint-technology-advisory/handle-arrays-and-nested-arrays-in-kafka-jdbc-sink-connector-41929ea46301
  2. https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
  3. https://docs.confluent.io/platform/current/connect/transforms/overview.html
  4. https://github.com/an0r0c/kafka-connect-transform-tojsonstring