Непростая аналитика больших данных в реальном времени: 3 способа перезапуска заданий Spark Structured Streaming по метке времени Apache Kafka

курсы Apache Kafka, Обучение Kafka, курсы Apache Spark Strucrured Streaming, обучениеApache Kafka, Обучение Apache Kafka и Spark Strucrured Streaming, курсы по большим данным, обучение Big Data, аналитика больших данных в реальном времени обучение на примерах, Big Data, Kafka, архитектура, Большие данные, обработка данных, Spark,

Совместное использование Apache Kafka и Spark очень часто встречается в потоковой аналитике больших данных, например, в прогнозировании пользовательского поведения, о чем мы рассказывали вчера. Однако, временные метки (timestamp) в приложении Spark Structured Streaming могут отличаться от времени события в топике Kafka. Читайте далее, почему это случается и какие подходы к обработке Big Data применять в этом случае.

Как течет время в потоковой передаче или еще раз об интеграции Apache Kafka и Spark

Итак, рассмотрим сценарий, когда приложение Spark Structured Streaming считывает данные из топиков Apache Kafka, и происходит один из следующих случаев [1]:

  • изменяется конфигурация заданий источника потоковой передачи, например, параметры maxOffsetsPerTrigger, Trigger time, Watermark и пр.;
  • приложение обновилось и нужно отменить предыдущие состояния;
  • обнаружена ошибка в коде и требуется повторно запустить задание с определенной отметки времени;
  • в один из топиков попали поврежденные или смешанные данные;
  • нужно запустить задание Spark Structured Streaming с определенной отметки времени.

Во всех этих случаях нельзя просто перезапустить текущее или запустить новое задание Spark Structured Streaming с существующей контрольной точки (checkpoint) – понадобится создать новое место для хранения информации для восстановления после сбоев. Поэтому следует обеспечить плавное и последовательное хранение смещений Kafka по определенным меткам времени (timestamp).

Напомним, в Apache Kafka события хранятся в топиках (topic), каждый из которых разбит на разделы (partition). Любая запись в разделе имеет смещение (offset), определяющее порядок внутри раздела. В версиях Spark 2.х задание Structured Streaming полагается на контрольную точку для реализации строго однократной доставки (exactly once) сообщений в случае микропакетной передачи или хотя бы однократной для непрерывной обработки данных. Об особенностях реализации строго однократной доставки в Spark Structured Streaming мы недавно рассказывали здесь.

Каждый запрос-триггер Spark Structured Streaming будет сохранять смещения в каталог смещения в местоположении контрольной точки, определенном в параметре checkpointLocation или spark.sql.streaming.checkpointLocation. Драйвер StreamExecution проверяет и вычисляет, какие смещения уже были обработаны, и потребляет новые записи по этим смещениям и другим конфигурациям, например, maxOffsetsPerTrigger.

exactly once, Spark SQL, Apache Spark Structured Streaming
Механизм контрольных точек в Apache Spark Structured Streaming для строго однократной (exactly once) доставки данных

3 способа перезапуска заданий Spark Structured Streaming

Таким образом, обновить и/или перезапустить задание Spark Structured Streaming с новой метки времени можно следующими способами [1]:

  • считать последнее смещение контрольной точки из HDFS с помощью команды Hdfs dfs -ls /checkpointLocation/offsets и скопировать его в параметр скопируйте смещения в параметр startOffsets структуры readStream;
  • получить данные о минимальном смещении, отметка времени которого больше, чем нужный timestamp, используя метод consumer.offsets_for_times() библиотеки Kafka-Python, о которой мы рассказывали здесь. При этом идет перебор топиков Apache Kafka с учетом нужной временной метки, которая устанавливается для каждого раздела, после чего вызывается метод offsets_for_times() для получения смещений. Конечный результат выводится в JSON-структуре.
  • Наконец, можно использовать встроенную реализацию startOffsetsByTimestamp в Apache Spark0, которая устанавливает временную метку в миллисекундах для каждого раздела TopicPartition. О других нововведениях крупного релиза этого Big Data фреймворка читайте в этом материале. Этот способ имеет некоторые особенности применения, о которых мы поговорим далее.

Особенности конфигурации startOffsetsByTimestamp в Apache Spark 3.0

Параметр startOffsetsByTimestamp является опциональным и описывает начальную точку отметки времени при запуске запроса в виде JSON-строки для каждого раздела топика Kafka (TopicPartition). Возвращаемое смещение для каждого раздела – это самое раннее смещение, метка времени которого больше или равна заданной метке времени в соответствующем разделе. Если совпадающее смещение не существует, запрос немедленно завершится ошибкой, чтобы предотвратить непреднамеренное чтение из такого раздела. Это своего рода ограничение, которое планируется устранить в будущих релизах Apache Spark, который просто передает информацию о временной метке в KafkaConsumer.offsetsForTimes и никак не интерпретирует это значение.

Значение метки времени может варьироваться в зависимости от конфигурации Kafka log.message.timestamp.type. При использовании startOffsetsByTimestamp нужно помнить о следующих особенностях этого параметра [2]:

  • он поддерживается версией Kafka 0.10.1.0 или выше;
  • startOffsetsByTimestamp имеет приоритет над startOffsets, который означает начальную точку при запуске запроса или самую последнюю, определяющую начальное смещение для каждого раздела TopicPartition.
  • для потоковых запросов этот параметр применим только при запуске нового запроса, и возобновление всегда будет начинаться с того места, где запрос был остановлен. Новые обнаруженные во время запроса разделы будут запущены раньше.

Больше практических примеров разработки распределенных приложений для потоковой аналитики больших данных с Apache Spark и Kafka вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

 

 

Источники

  1. https://medium.com/@ZeevFeldbeine/how-to-start-spark-structured-streaming-by-a-specific-kafka-timestamp-e4b0a3e9c009
  2. https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html