7 новых фич Apache Kafka Streams в релизе 2.8.0

Автор Категория ,
7 новых фич Apache Kafka Streams в релизе 2.8.0

Вчера мы говорили про важные обновления Apache Kafka 2.8.0, помимо долгожданного KIP-500, который позволяет избавиться от Zookeeper для синхронизации метаданных в распределенном кластере с помощью встроенного Quorum Controller. Сегодня рассмотрим, какие KIP’ы нового релиза коснулись одного из основных инструментов разработчика Apache Kafka – библиотеки Streams для создания распределенных приложений потоковой аналитики больших данных.

Apache Kafka Streams в релизе 2.8.0: 7 KIP’ов с обновлениями

Напомним, помимо наиболее известного и самого важного KIP-500, релиз Apache Kafka 2.8.0 включает реализацию целых 17 предлагаемых улучшениях платформы (KIP, Kafka Improvement Proposal). 7 из них касаются клиентской библиотеки разработки распределенных приложений потоковой аналитики больших данных Kafka Streams [1]:

  • KIP-696 – обновление конечного автомата состояний потокового приложения и уточнение состояния ошибки (ERROR);
  • KIP-689 – расширение класса StreamJoined для дополнительных конфигураций хранилища;
  • KIP-680 – возможность не добавлять аргументы свойств для класса TopologyTestDriver;
  • KIP-671 – введение специфичного обработчика неперехваченных исключений;
  • KIP-663 – новый API для запуска и остановки потоков Streams;
  • KIP-659 – улучшения классов TimeWindowedDeserializer и TimeWindowedSerde для обработки размеров временного окна;
  • KIP-572 – улучшение таймаутов и повторных попыток.

Что именно представляет собой каждое из реализованных улучшений, мы рассмотрим далее.

KIP-696 и новый граф состояний

Чтобы анализировать состояние приложений в журналах и метриках, а также запускать определяемое пользователем поведение при переходах между состояниями, Kafka Streams предоставляет конечный автомат. Исторически состояние «ERROR» означало, что все потоки завершены. Но с добавлением улучшений устойчивости KIP-663 и KIP-671, о которых мы поговорим далее, отсутствие запущенных потоков больше не указывает на ОШИБКУ и не является окончательным состоянием. KIP-696 обновляет конечный автомат, чтобы уточнить конкретное значение ошибки и теперь ERROR указывает на то, что в приложении Kafka Streams произошла фатальная ОШИБКА.

Дополнительные конфигурации хранилища с KIP-689

Kafka Streams предлагает класс StreamJoined, чтобы установить различные параметры конфигурации для операций соединения. KIP-689 добавляет возможность управлять настройками топиков журнала изменений, которые делают состояние соединения устойчивыми. Конфигурация по умолчанию по-прежнему подходит для большинства приложений, но позволяет настраивать конфигурации внутренних разделов, поддерживающих потоковую обработку.

KIP-680: TopologyTestDriver без аргумента свойств

Kafka Streams включает класс TopologyTestDriver, который поддерживает тестирование целых приложений в быстрой однопоточной детерминированной среде без запуска каких-либо дополнительных компонентов, таких как брокеры или ZooKeeper. Конструктор класса TopologyTestDriver отражает конструктор основной среды выполнения KafkaStreams: он принимает само приложение (топологию) в качестве одного аргумента и конфигурацию в качестве второго аргумента.

Исторически класс TopologyTestDriver применял те же конфигурации, что и KafkaStreams, включая строку подключения брокера и идентификатор приложения, даже если эти это не имело смысло для самой топологии. В версии 2.8.0 эти стандартные конфигурации не требуются, а KIP-680 упрощает общий случай, добавляя новые перегрузки конструктора, которые вообще не требуют аргумента конфигурации. Главный конструктор по-прежнему доступен, поэтому текущие тесты продолжат работать и можно по-прежнему использовать его, чтобы указать дополнительные параметры конфигурации.

Обработка неперехваченных исключений с KIP-671

Kafka Streams инкапсулирует сложную логику, включая пользовательский код и системные операции, в т.ч. ввод-вывод, многопоточность и пр., где могут возникнуть неожиданные исключения. Раньше в Kafka Streams использовался примитивный, но безопасный подход: выброс исключений на верхний уровень, что прерывало соответствующий поток выполнения, а наглядность обеспечивалась с помощью обработчика UncaughtExceptionHandler. Напомним, чтобы поймать любые неожиданные исключения пред запуском приложения Kafka Streams можно установить обработчик java.lang.Thread.UncaughtExceptionHandler, который вызывается всякий раз, когда поток неожиданно завершается [2].

Однако, на практике часто требуется не просто видимость, когда поток неожиданно завершается. KIP-671 добавляет новый обработчик StreamsUncaughtExceptionHandler, который предлагает тот же уровень видимости, а также предоставляет механизм для замены отказавшего потока, если требуется повышенная устойчивость, выключения системы или всех потоков в текущем экземпляре или каждого инстанса в кластере Apache Kafka. Обработчик позволяет выбирать различные действия в зависимости от фактического исключения и полезен для немедленной остановки обработки, чтобы предотвратить повреждение данных. Фактически он отключит все потоки в клиенте и пригодится, когда топик источника/приемника удален или случился сбой сериализации/десериализации данных [3].

KIP-663: API для запуска и остановки потоков Kafka Streams

Приложения Kafka Streams структурированы как кластер экземпляров, каждый из которых имеет определенное количество потоков выполнения. Количество потоков настраивается при запуске. При большой нагрузке можно поэкспериментировать с увеличением или уменьшением количества потоков в экземпляре, чтобы эффективнее использовать системные ресурсы или уменьшить узкие места. Раньше это предполагало остановку, перенастройку и перезапуск каждого экземпляра. KIP-663 добавляет новые методы в интерфейс KafkaStreams, позволяя избирательно добавлять и удалять потоки обработки, не нарушая работу других работающих потоков в том же экземпляре.

Улучшение оконных операций с KIP-659

Kafka Streams предоставляет возможность работать с оконными функциями, позволяя выбирать временные окна входного потока записей для агрегирования и других подобных операций. Например, при вычислении ежечасного количества обновлений для каждого ключа размер временного окна составляет один час. Размер временного окна определяется как часть логики потоковой обработки в Streams DSL. Kafka Streams автоматически настраивает сериализатор и десериализатор для хранения и извлечения этих окон из локального хранилища и топиков Kafka. Поскольку сам размер окна фиксирован и известен для конкретной операции, сериализатор и десериализатор для оптимизации вычислительного пространства сохраняют только временную метку начала окна, т.к. его конец вычисляется через добавление размера к времени начала.

Но на практике иногда необходимо напрямую загрузить сериализованные записи, например, при отладке приложения или проверке промежуточной фазы обработки. Поэтому KIP-659 позволяет вызывающим абонентам напрямую настроить десериализатор TimeWindowedDeserializer с размером окна аналогично тому, как Kafka Streams настраивает свой собственный внутренний десериализатор для тех же данных.

KIP-572 для улучшения таймаутов и повторных попыток в приложениях Kafka Streams

KIP-572 был частично реализован в Apache Kafka 2.7.0 и завершен в 2.8.0. Этот KIP добавляет новое поведение повтора, чтобы заполнить важный пробел в устойчивости при запуске приложений Kafka Streams. Многие функции Kafka Streams полагаются на удаленные вызовы, например, обращения к брокерам, которые, как и любые сетевые операции подвержены произвольным ошибкам и задержкам.

Клиентские библиотеки внутри Kafka Streams имеют свои собственные настройки устойчивости, которые могут сгладить перебои сетевой передачи данных. Но слишком высокая устойчивость клиентов чревата тем, что любой вызов клиентского API может блокироваться на длительное время, что влияет на общую стабильность. приложения. С другой стороны, установка слишком коротких тайм-аутов клиента приведет к сбою приложений при незначительных сбоях сети. KIP-572 добавляет цикл повтора более высокого уровня. Теперь, когда Kafka Streams обнаруживает исключение тайм-аута при обработке конкретной задачи, он будет пытаться выполнить другие, прежде чем повторить попытку выполнения неудачной.

Таким образом, Kafka Streams стал еще более удобным для разработчика распределенных приложений потоковой аналитики больших данных. Освойте практику эффективного использования этого компонента Apache Kafka и тонкости администрирования кластеров на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

 

 

Источники

  1. https://blogs.apache.org/kafka/entry/what-s-new-in-apache5
  2. https://kafka.apache.org/10/documentation/streams/developer-guide/write-streams
  3. https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Introduce+Kafka+Streams+Specific+Uncaught+Exception+Handler