DSL и Processor API в Apache Kafka Streams для распределенной обработки потоковых данных

Kafka, Big Data, Большие данные, архитектура, обработка данных

Как мы уже писали, в Apache Kafka Streams таблица и поток данных – это базовые и взаимозаменяемые понятия. Сегодня поговорим о том, как работать с этими объектами Big Data с помощью внутренних средств Кафка Стримс, используя готовые методы высокоуровневого языка DSL и низкоуровневый API-интерфейс для распределенной обработки потоковых данных в рамках собственной топологии. Также в эту статью мы включили для вас некоторые примеры программного кода с комментариями.

Как реализовать топологию потоковой обработки в Apache Kafka Streams

Существуют 2 способа определения топологии потоковой обработки Big Data в контексте логической абстракции программного кода, о которой мы рассказывали здесь [1]:

  • высокоуровневый язык Kafka Streams (DSL, Domain Specific Language), реализующий типовые операции преобразования данных (map, filter, join, aggregation);
  • низкоуровневый API-интерфейс обработчика (Processor API), который позволяет разработчику определять и подключать пользовательские обработчики, а также взаимодействовать с хранилищами состояний (state stores).

Именно DSL оперирует следующими высокоуровневыми понятиями:

  • KStream – аналог потока (Stream) в Кафка, абстракция потока записей, каждая из которых представляет собой простую пару ключ-значение в неограниченном наборе данных [2];
  • KTable– аналог таблицы (Table) в Кафка, абстракция потока изменений (changelog stream), где каждая запись считается вставкой (Insert) или обновлением (Update) в зависимости от существования ключа, поскольку любая существующая строка с тем же ключом будет перезаписана [2];
  • GlobalKTable– реплицированные на локальный узел данные с целью сокращения затрат на создание промежуточных топиков и хранение данных в них. Объекты GlobalKTable реплицируют все секции на каждый из узлов приложения Kafka Streams, обеспечивая доступность записи независимо от принадлежности ее ключа к разделу топика Кафка (topic partition) [3].
DSL Kafka Streams, KTable, KStream, GlobalKtable
DSL-методы работы с KTable, KStream и GlobalKtable в Кафка Стримс

Напомним, что в Kafka Streams таблицу можно явно преобразовывать в поток изменений (changelog stream) с помощью функции KTable#toStream(). Для работы со всеми вышеуказанными понятиями высокоуровневый DSL Kafka Streams предоставляет готовые методы с функциональным стилем.

Например, ниже показаны строки кода по применению метода преобразования строчных символов объекта kStream в заглавные и отправке результатов в выходной топик (topic) Кафка [2].

val upperCaseKStream = kStream.mapValues(_.toUpperCase);

upperCaseKStream.to(«OutTopic») //sending data to out topic

Аналогичный код с использованием API-интерфейса обработчика (Processor API) будет существенно длиннее, поскольку в этом случае разработчику необходимо самостоятельно объявлять переменные и инициировать методы. Однако, именно с помощью API-интерфейса обработчика можно реализовать уникальную топологию обработки информационных потоков, самостоятельно определив узлы (source, stream и sink-processors) и сложную логику их программного поведения. Также можно ввести в эту топологию хранилища состояний (state store) и добавить обработчики-источники данных из топиков Kafka для генерации входных потоков в топологию и обработчики-приемники для отправки выходных потоков в нужные топики Кафка [4].

Ниже показан пример реализации такой топологии, где

  • узел обработчика-источника (Source) добавляется в топологию с помощью метода addSource, и в него подается один топик Kafka (source-topic);
  • узел обычного обработчика потока (Process) с ранее предопределенным программистом методом подсчета слов во входном потоке данных (WordCountProcessor) добавляется в качестве нижестоящего узла для обработчика-источника (Source) с помощью метода addProcessor.
  • постоянное хранилище состояний значения ключа создается и связывается с узлом Process с помощью ранее предопределенной переменной countStoreSupplier;
  • наконец, чтобы завершить топологию, метод addSink добавляет узел обработчика-приемника, принимая узел Process в качестве вышестоящего и записывая его в отдельный топик-приемник Кафка. Возможно динамическое определение топика Kafka для сохранения каждой полученной записи от вышестоящего узла.
код Кафка Стримс, Kafka Streams code example, пример Apache Kafka
Определение топологии обработчиков Кафка Стримс через API

Отметим, что средствами DSL также можно определить топологию, однако это будет реализовано не в таком явном и понятном виде, как с помощью Processor API [4]. Таким образом, низкоуровневый API-интерфейс обработчика предоставляет клиенту доступ к потоковым данным для выполнения бизнес-логики на входящем потоке данных и отправки результата. Там, где высокоуровневый DSL позволяет применять готовые методы с функциональным стилем, низкоуровневый процессор API предоставляет гибкость для реализации логики обработки в соответствии с бизнес-потребностями [2]. Примеры DSL-методов по группировке данных в определенном временном промежутке смотрите в нашей следующей статье.

Big Data, Kafka Topic, Kafka Streams
Работа с разными топиками Kafka с помощью Кафка Стримс

Еще больше тонкостей по работе с Apache Kafka Streams API для быстрой обработки больших данных в режиме онлайн на специализированных курсах для руководителей, архитекторов, инженеров, администраторов, аналитиков Big Data и Data Scientist’ов в лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:

расписание компьютерные курсы для руководителей, аналитиков, программистов, администраторов и пользователей Internet of Things, Big Data и Machine Learning Смотреть расписание занятий
регистрация на компьютерные курсы для руководителей, аналитиков, программистов, администраторов и пользователей Internet of Things, Big Data и Machine Learning Зарегистрироваться на курс

Источники

  1. https://kafka.apache.org/23/documentation/streams/core-concepts
  2. https://dzone.com/articles/kafka-streams-more-than-just-dumb-storage
  3. https://habr.com/ru/company/piter/blog/457756/
  4. https://docs.confluent.io/current/streams/developer-guide/processor-api.html