Управляемые повторы отправки сообщений из Apache Kafka c фреймворком Sping

Автор Категория ,
Управляемые повторы отправки сообщений из Apache Kafka c фреймворком Sping

Специально для обучения разработчиков распределенных приложений и дата-инженеров, рассмотрим практический пример использования возможностей фреймворка Spring для управления повторными попытками отправки сообщений потребителям из топика Apache Kafka.

Повторные попытки отправки сообщений и Spring для Apache Kafka

Довольно часто Kafka-приложения требуют высокой надежности обработки сообщений. Например, в финтех- или медтех-проектах, а также системах интернета вещей и платформах информационной безопасности. Например, данные о сетевом трафике клиентов, угрозах, приложениях и устройствах пользователей в режиме реального времени интегрируются в топики Apache Kafka, откуда потом извлекаются, обогащаются и отправляются на настроенные конечные точки систем-приемников. Доступность сторонних систем-приемников зависит от внешней инфраструктуры, а сеть передачи данных может быть ненадежной. При недоступности приемника или невозможности внешней системы обработать запрос потока данных, сообщения просто теряются.

В Apache Kafka потерянные сообщения попадают в соответствующую очередь или топик недоставленных сообщений (Dead letter queue/topic, DLT). Поэтому для повторной отправки сообщений получателю следует работать именно с этим топиком. Рассмотрим, как это сделать, используя популярный open-source фреймворк разработки Java-приложений Spring. Он позволяет реализовать наиболее популярные элементы типовых комплексных приложений: от управления транзакциями до работы с сообщениями. Проект Spring для Apache Kafka применяет основные концепции фреймворка Spring к разработке решений для обмена сообщениями, предоставляя соответствующие высокоуровневые абстракции и шаблоны.

Spring Kafka предоставляет функцию повторной попытки с помощью шаблона RetryTemplate с RetryingMessageListenerAdapter или, в последних версиях, путем настройки обработчика ошибок. Например, чтобы настроить топик повторных попыток и DLT для аннотированного метода @KafkaListener, разработчику нужно просто добавить к нему аннотацию @RetryableTopic, и Spring для Apache Kafka загрузит все необходимые топики и потребителей с конфигурациями по умолчанию:

@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
        // ... message processing
}

Для обработки сообщений DLT можно указать метод в том же классе, добавив к нему аннотацию @DltHandler. Если метод DltHandler не указан, создается потребитель по умолчанию, который регистрирует только потребление.

@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}

Важно помнить, что без указания имени шаблона Kafka (kafkaTemplate), будет найден bean-компонент с именем retryTopicDefaultKafkaTemplate. Если bean-компонент не найден, генерируется исключение. В Spring бины (bean) − это классы, созданием экземпляров которых и установкой в них зависимостей управляет контейнер фреймворка. Бины предназначены для реализации бизнес-логики приложения. Bean воплощает паттерн проектирования «одиночка» (singleton), т.е. в некотором блоке приложения существует только один экземпляр данного класса. Поэтому, если бин содержит изменяемые данные в полях, т.е. имеет состояние, то обращение к таким данным необходимо синхронизировать. Возвращаясь к повторным попыткам отправки сообщений в Kafka с помощью Spring, отметим, что можно настроить поддержку неблокирующих повторных попыток, создав bean-компоненты RetryTopicConfiguration в аннотированном классе @Configuration.

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .create(template);
}

Это создаст топики повторных попыток и DLT, а также соответствующих потребителей для всех топиков в методах, аннотированных с помощью @KafkaListener, с использованием конфигураций по умолчанию. Экземпляр KafkaTemplate необходим для пересылки сообщений. Для более детального контроля над обработкой неблокирующих повторных попыток для каждого топика, можно предоставить более одного bean-компонента RetryTopicConfiguration.

Потребители топика повторных попыток и DLT будут назначены группе потребителей с идентификатором группы, который представляет собой комбинацию того, что указано в параметре groupId аннотации @KafkaListener с суффиксом топика. Если ничего не задано, все они будут принадлежать к одной и той же группе, а перебалансировка в топике повторной попытки вызовет ненужную перебалансировку в основном топике.

Если потребитель настроен с помощью ErrorHandlingDeserializer, для обработки исключений десериализации важно настроить KafkaTemplate и его продюсера с сериализатором, который может обрабатывать обычные объекты, а также необработанные значения byte[], возникающие в результате исключений десериализации. Общий тип значения шаблона должен быть Object. Один из методов заключается в использовании DelegatingByTypeSerializer:

@Bean
public ProducerFactory<String, Object> producerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
    new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
          MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
}

Для одного и того же топика можно использовать несколько аннотаций @KafkaListener с ручным назначением разделов или без него вместе с неблокирующими повторными попытками. Но конфигурация топика при этом не меняется. Поэтому лучше использовать один bean-компонент RetryTopicConfiguration для настройки таких топиков. Если для одного и того же топика используется несколько аннотаций @RetryableTopic, все они должны иметь одинаковые значения, иначе одна из них будет применена ко всем слушателям этого топика, а значения других аннотаций будут проигнорированы.

Хотя такой подход к повторным попыткам сохраняет гарантии порядка доставки сообщений, повторная попытка и ожидание задержки повтора выполняется в приложении-потребителе Kafka. Каждый сбой блокирует обработку назначенных разделов топика, вызывая отставание потребителя, что неприемлемо для топиков с высокой пропускной способностью. Кроме того, это не решает проблему, когда сообщение Kafka не может быть обработано в течение длительного периода времени. Обойти это ограничение поможет возможность, появившаяся в Spring Kafka 2.7, о чем мы поговорим далее.

Неблокирующие повторные попытки

В Spring Kafka 2.7 появилась новая функция неблокирующих повторных попыток, которая использует дополнительные топики Kafka с повторными попытками, куда отправляются необработанные сообщения для последующей повторной доставки. Количество топиков и повторных попыток является произвольным, и для каждого можно настроить различную задержку. Однако, поскольку повторные попытки не блокируются, гарантии порядка сообщений Kafka больше невозможны.

Когда топик настроен на повторную попытку, а слушатель, обрабатывающий сообщение, выдает исключение, настроенный механизм восстановления после ошибок выполняет следующие шаги:

  • вычисляет время, когда сообщение должно быть обработано снова;
  • создает повторное сообщение как копию исходного с дополнительными заголовками (время выполнения сообщения, сведения об исключении и пр.);
  • вычисляет следующий топик повтора и публикует в нем это сообщение;
  • если количество попыток исчерпано, сообщение удаляется или отправляется в топик DLT, чтобы обработать его позже при необходимости.

Spring Kafka автоматически создает контейнеры слушателя сообщений для топиков повторных попыток и DLT. Эти контейнеры по умолчанию используют тот же слушатель сообщений и конфигурацию, что и основной топик. Слушатели обёрнуты в специальные адаптеры, реализующие отложенную обработку сообщений следующим образом:

  • проверяется время выполнения входящего сообщения в KafkaBackoffAwareMessageListenerAdapter — если оно уже прошло, сообщение передается слушателю;
  • если положенное время еще не наступило, адаптер заставляет диспетчер отсрочки PartitionPausingBackoffManager выдавать исключение KafkaBackoffException с информацией о том, насколько должна быть отложена обработка;
  • смещение раздела сбрасывается, и потребитель Kafka приостанавливается на требуемый период времени;
  • как только потребитель активизируется, сообщение доставляется снова.

Таким образом, при сбое отправки сообщения в конкретный приемник отправитель создает сообщение для восстановления и публикует его в разделе очереди восстановления. Сообщение содержит полную полезную нагрузку для отправки и идентификации системы-приемника. DLT-слушатель Kafka получает сообщение о сбое, и немедленно пытается выполнить повторную доставку. Если это не удается, сообщение передается в следующую очередь восстановления с возрастающей задержкой до тех пор, пока оно не будет успешно отправлено или не исчерпается количество попыток.

Самый простой способ настроить топик с повторами — использовать аннотацию @RetryableTopic в методе слушателя сообщений вместе с аннотацией @KafkaListener. В аннотации можно настроить большинство свойств повторных попыток: количество, автоматическое создание топика и стратегию отсрочки для расчета задержек доставки.

Альтернативой аннотации является создание bean-компонентов RetryTopicConfiguration, которые обеспечивают более точную настройку:

@Bean
public RetryTopicConfiguration recoveryQueueConfiguration(
                      @Qualifier(RecoveryQueueConfig.TEMPLATE) KafkaTemplate<String, RecoveryMessage> template,
                      @Qualifier(RecoveryQueueConfig.CONTAINER_FACTORY)
                       
ConcurrentKafkaListenerContainerFactory<String, RecoveryMessage> containerFactory,
                      MessageRecoveryConfig messageRecoveryConfig) {        return RetryTopicConfigurationBuilder.newInstance()
                                 .includeTopic(RecoveryQueueConfig.TOPIC)
                                 .suffixTopicsWithIndexValues()
                                 .doNotAutoCreateRetryTopics()
                                 .maxAttempts(messageRecoveryConfig.getAttempts())
                                 .customBackoff(new IntervalBackOffPolicy(messageRecoveryConfig.getIntervals()))
                                 .notRetryOn(MethodArgumentResolutionException.class)
                                 .notRetryOn(MethodArgumentNotValidException.class)
                                 .notRetryOn(DeserializationException.class)
                                 .dltHandlerMethod(RecoveryQueueConfig.MESSAGE_LISTENER, "deadLetter")
                                 .doNotRetryOnDltFailure()
                                 .listenerFactory(containerFactory)
                                 .create(template);
}

При этом разработчику по-прежнему приходилось создавать обязательные связанные с Kafka bean-компоненты, такие как продюсеры и потребители, а также шаблон Kafka для отправки сообщений в DLT и слушатель, который заботится об их фактической повторной доставке.

Практическое использование этого подхода может привести к следующим неожиданным проблемам:

  • сообщения в топиках повторных попыток становятся намного больше исходных из-за добавления к ним заголовков, связанных с исключениями (трассировка стека исключения, полное доменное имя и сообщение). Поскольку это не настраивается, обойти это можно на уровне клиента Kafka, удалив заголовки исключений в перехватчике пользовательского продюсера прямо перед отправкой сообщения.
  • повторяются сообщения, которые не могут быть десериализованы или проверены, поэтому их приходится явно исключать из конфигурации топика повторных попыток. Впрочем, это частично решено в Spring Kafka 2.8.
  • если сообщение попало в DLT из-за ошибок десериализации, оно несет исключение десериализации со всеми данными, которые не могут быть десериализованы. Когда такое DLT-сообщение доставляется обработчику недоставленных сообщений и его не удается десериализовать, создается новое DLT-сообщение еще большего размера. Это создает цикл, когда DLT заполняется постоянно растущими сообщениями, пока не будет превышен максимальный размер записи Kafka. Избежать этого поможет отключение отправки ошибочных недоставленных сообщений в DLT, что по умолчанию настроено в Spring Kafka 2.8.
  • Без явной установки префикса идентификатора клиента в аннотации слушателя метрики потребителя Kafka могут работать некорректно. Исправить ситуацию поможет явная установка параметров конфигурации:
@KafkaListener(
id = RecoveryQueueConfig.ID,
idIsGroup = false,
topics = RecoveryQueueConfig.TOPIC,
clientIdPrefix = "${kafka.consumers.recovery-queue.clientId}",
containerFactory = RecoveryQueueConfig.CONTAINER_FACTORY,
groupId = RecoveryQueueConfig.GROUP_ID)
public void listen(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) String key,
@Nullable @Header(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS) byte[] attempts,
@Valid @Payload RecoveryMessage recoveryMessage) {
...
}

Больше практических кейсов по администрированию и эксплуатации Apache Kafka для потоковой аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://medium.com/jamf-engineering/retryable-topics-with-spring-kafka-946360f2d644
  2. https://docs.spring.io/spring-kafka/reference/html/