5 советов по потоковой аналитике больших данных с Apache Kafka и Spark Streaming

Автор Категория , ,
5 советов по потоковой аналитике больших данных с Apache Kafka и Spark Streaming

В продолжение вчерашнего материала про потоковую аналитику больших данных с Apache Kafka и Spark, сегодня рассмотрим особенности совместного использования этих технологий Big Data. В этой статье мы собрали для вас 5 лучших практик эффективного применения Apache Kafka и Spark Streaming для разработки распределенных приложений аналитики больших данных в режиме реального времени.

Apache Kafka + Spark Streaming для real-time аналитики Big Data

В качестве примера рассмотрим случай обработки пользовательского поведения на сайте, когда нужно анализировать такие события, как просмотр страницы, клики и прочие действия в браузере. Анализируемые события происходят не случайным образом, а следуют друг за другом, поэтому эти потоковые данные следует обрабатывать именно в порядке их возникновения. Для этого удобно использовать специальные технологии Big Data, которые позволяют обрабатывать потоки больших данных «на лету», т.е. в режиме реального времени, например, Apache Kafka и Spark. Обычно взаимодействие Kafka и Spark Streaming устроено следующим образом:

  • исходные данные записываются в топики Apache Kafka;
  • приложение Spark Streaming считывает (потребляет) нужные данные и обрабатывает их согласно бизнес-логике;
  • полученные результаты приложение Spark Streaming отправляет в место назначение – новый топик Kafka, озеро данных на базе Hadoop HDFS, аналитическую СУБД (HBase, Hive, Greenplum и пр.) или BI-систему.

О способах интеграции Apache Kafka и Spark Streaming мы рассказывали здесь. Однако, для эффективного совместного использования этих технологий Big Data стоит помнить про их некоторые особенности. Наиболее полезными для разработчика распределенных приложений можно назвать следующие лучшие практики [1], о каждой из которых мы расскажем далее.

Kafka, Spark, Spark Streaming, real time analytics
Потоковая аналитика больших данных с Apache Kafka и Spark Streaming

Используйте правый ключ раздела (right partition key)

При распространение данных по разделам топика Kafka хранит логи каждого раздела в отдельном каталоге журналов. Поэтому, если приложению требуется, чтобы данные потреблялись в том же порядке, в котором они производятся, важно выбрать правильный ключ раздела. По умолчанию при распространении данных по разделам топика Kafka использует хэш ключа и номера доступных разделов. Это можно изменить, используя настраиваемый разделитель, или сопоставив событие с определенным разделом топика. Например, для топика из 4 разделов каждый из них имеет журналы для подмножества ключей в том же порядке, в котором они помещаются. Хотя несколько источников данных могут писать в топик с разными интервалами, два источника не должны создавать данные для одного ключа, поскольку временные метки и интервалы записи могут отличаться.

Apache Kafka topic partition
Пример разделения топика Apache Kafka на разделы

Помните про масштабирование разделов в зависимости от исполнителей

Больше разделов означает более высокую пропускную способность в Kafka. Тем не менее, в потоковой передаче Spark не создает группы потребителей на основе исполнителей, доступных для работы. ConsumerCoordinator Spark создает группу потребителей для каждого раздела топика и распределяет как задачи всем исполнителям с разделением смещений для каждого пакета. Напомним, несмотря на позиционирование потоковой обработки данных, Spark Streaming работает в микро-пакетном режиме (micro-batch). К примеру, Spark создает равное количество задач, которые обрабатываются всеми доступными исполнителями параллельно. Большое количество разделов увеличивает время обработки каждого пакета, что приводит к общей задержке в конвейере потоковой передачи. Kafka позволяет увеличивать количество разделов топика после его создания. Однако, если необходимо уменьшить этот масштаб, топик придется удалить и создать заново.

 

Не используйте shuffle-функции

Они перемешивают/перераспределяют данные. Apache Spark перераспределяет свои структуры данных (Dataframe и RDD), если к ним применяется сортировка по столбцу (Sort by column) или sortBy. Это по умолчанию изменяет порядок сообщений, когда они перетасовываются в разные разделы. Можно использовать repartitionAndSortWithinPartitions, что поддерживает настраиваемый разделитель для сортировки данных, сопоставляя их для секционирования в отсортированном порядке. В частности, вторичная сортировка в Apache Spark позволяет упорядочивать данные по значению в дополнение к сортировке по ключу на этапе сокращения задания Map-Reduce. Например, сортировка результатов по дню и времени, а также по идентификатору пользователя (естественный ключ) поможет определить тенденции пользовательского поведения. Дополнительная сортировка по дню и времени – это показательный пример вторичной сортировки [2].

Регулируйте объем сообщений и интервал пакетной обработки

Мы уже упоминали про микропакетной подход в потоковой обработке данных с помощью Spark Streaming. По умолчанию Spark выбирает все доступные сообщения для каждого раздела топика при работе с каждым пакетом. Это приведет к сбою Spark-задания, если в одном пакете имеется объем данных, превышающий размер памяти исполнителя. Для приложений потоковой обработки Big Data, которые работают в высоконагруженной production-среде, рекомендуется установить параметр spark.streaming.backpressure внутреннего механизма обратного давления, чтобы регулировать объем данных для каждого пакета. Свойство spark.streaming.backpressure.enabled, отключенное по умолчанию отключено (False), позволяет Spark Streaming управлять скоростью приема на основе текущих задержек планирования пакетов и времени обработки. Это позволяет всей системе потоковой обработке Big Data принимать данные с такой скоростью, с какой она может обрабатывать, динамически устанавливая максимальную скорость приемников. Эта скорость ограничена значениями spark.streaming.receiver.maxRate и spark.streaming.kafka.maxRatePerPartition, если они заданы. Параметр spark.streaming.receiver.maxRate устанавливает максимальную скорость (количество записей в секунду) для получателя данных, фактически позволяя каждому потоку потреблять наибольшее количество записей в секунду. Параметр spark.streaming.kafka.maxRatePerPartition задает максимальную скорость (количество записей в секунду), с которой данные будут считываться из каждого раздела Kafka при использовании нового API прямого потока Kafka [3].

Рекомендуется устанавливать интервал между пакетами Spark-приложения как среднее время обработки первых нескольких пакетов, чтобы избежать скопления следующих пакетов в очередь. Хотя организация очередей не оказывает прямого влияния на работающее приложение Spark Streaming, общая задержка конвейера потоковой обработки данных может возрасти или не быть корректно обработанной [1].

Сохраняйте смещения разделов для обработанных данных

Чтобы перезапустить их с нужного места. Даже если потоковое приложение работает долго, бизнес-логика постоянно меняется. Поэтому в реальном production-случае, помимо незапланированных сбоев заданий, возможны остановки текущего приложения и новые запуски с другой бизнес-логикой. Чтобы не потерять данные в таких ситуациях, следует корректно завершить работу приложения и перезапустить его с сохраненными смещениями. При этом можно воспользоваться механизмом плавного завершения Spark-заданий graceful shutdown, о котором мы рассказывали здесь. Стоит помнить, что в Apache Kafka семантика доставки сообщений в случае сбоя задания зависит от того, как и когда хранятся смещения. В Apache Spark операции вывода данных выполняются как минимум один раз (at-least-once). Поэтому, если нужен эквивалент строго однократной семантики (exactly-once), следует сохранять смещения после идемпотентного вывода или в атомарной транзакции вместе с ним. Благодаря интеграции Apache Kafka и Spark Streaming есть 3 варианта хранения смещений в порядке увеличения надежности и сложности реализации [4]:

  • контрольные точки (checkpoints), которые включаются в Apache Spark. Это самый простой в реализации вариант, но с недостатками. Операция вывода должна быть идемпотентной, т.к. выводы будут повторяться. Поэтому это не подойдет для транзакций. Более того, восстановление не получится, если код приложения изменился. Для плановых обновлений это можно смягчить, запустив новый код одновременно со старым, т.к. выходные идемпотентные данные не будут конфликтовать. Но в случае незапланированных сбоев, требующих изменения кода, возможна потеря данных, если не будет другого способа определить известные начальные смещения.
  • собственно Kafka, которая имеет API фиксации смещения, сохраняющий их в специальном топике. По умолчанию новый потребитель будет периодически автоматически фиксировать смещения. Однако, сообщения, успешно считанные потребителем, возможно, еще не привели к операции вывода Spark, что в результате дает неопределенную семантику доставки. Можно зафиксировать смещения в Kafka после сохранения вывода, используя commitAsync API. Преимущество этого способа по сравнению с контрольными точками в том, что Kafka является надежным хранилищем независимо от изменений в коде приложения. Однако Kafka не является транзакционным хранилищем, поэтому выходные данные должны быть идемпотентными. Пример подобного кода на Java выглядит следующим образом

stream.foreachRDD(rdd -> {

  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

  // some time later, after outputs have completed

  ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);

});

  • внешнее хранилище, поддерживающее транзакции. Сохранение смещений в той же транзакции, что и результаты, обеспечивает их синхронизацию даже в случаях сбоя. При возникновении повторяющихся или пропущенных диапазонов смещения, откат транзакции предотвратит влияние дублирования или потери сообщений на результаты. Это эквивалентно строго однократной семантике доставки сообщений. Такую тактику также можно использовать даже для выходных данных, полученных в результате агрегирования, которое обычно трудно сделать идемпотентным. Кроме того, по сравнению с контрольными точками и сохранением смещений в самой Kafka, подход с использованием внешнего источника не зависит от изменений кода, а также имеет большую гибкость изменения смещений в случае сбоя [1].

Освоить все эти и другие тонкости практического администрирования кластеров Apache Kafka и разработки приложений потоковой аналитики больших данных со Spark Streaming вы сможете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

 

 

Источники

  1. https://medium.com/@sathishjayaram/points-to-remember-while-processing-streaming-timeseries-data-in-order-using-kafka-and-spark-38cdf787a304
  2. http://codingjunkie.net/spark-secondary-sort/
  3. https://spark.apache.org/docs/latest/configuration.html#spark-streaming
  4. https://spark.apache.org/docs/2.3.1/streaming-kafka-0-10-integration.html#storing-offsets