Как повысить отказоустойчивость продюсера Kafka: 5 практик по настройке ТОП-10 конфигураций

Автор Категория ,
Как повысить отказоустойчивость продюсера Kafka: 5 практик по настройке ТОП-10 конфигураций

В этой статье поговорим про практическое обучение Apache Kafka и рассмотрим, как сделать продюсеров еще более отказоустойчивыми, чтобы улучшить общую надежность всей Big Data системы. Читайте далее про наиболее важные конфигурации продюсеров Kafka и эффективные рекомендации по их настройке.

 

10 самых важных параметров продюсера Apache Kafka

Из множества конфигурационных параметров для отправителей сообщений в топики Kafka – продюсеров (producer), наиболее значимыми в прикладном смысле являются следующие [1, 2]:

  • Acks – подтверждения того, что лидер получил сообщение, прежде чем продюсер может считать запрос завершенным – нужно для контроля долговечности отправляемых записей;
  • replica.lag.time.max.ms – время, в течение которого лидер не удаляет follower’а из набора синхронизированных реплик (In-Sync Replicas, ISR), даже если он не отправлял никаких запросов на выборку или не использовал до конца смещения лог лидера. в Apache Kafka 7 этот параметр конфигурации относится не только ко времени, прошедшему с момента последнего запроса на выборку из реплики, но и ко времени с момента последней обработки реплики. Реплики, которые все еще получают сообщения от лидеров, но не догнали последние сообщения в течение replica.lag.time.max.ms, будут считаться рассинхронизированными.
  • insync.replicas – минимальное количество реплик, которые должны подтвердить запись, чтобы запись считалась успешной;
  • retries – разрешение повторных попыток отправить сообщения, которые ранее не удалось записать. Обычно вместо этой конфигурации пользователи задают параметр timeout.ms для управления поведением повторных попыток.
  • enable.idempotent – идемпотентность (свойство объекта или операции при повторном применении возвращать тот же результат, что и при первом), включается автоматически, если продюсер авторизован для определенного идентификатора транзакции (transactional.id).
  • max.in.flight.requests.per.connection – максимальное количество неподтвержденных запросов, которые клиент отправит в одном соединении до блокировки. Когда значение этого параметра больше 1 и есть неудачные отправки, то возможно изменение порядка сообщений из-за повторных попыток, если они разрешены.
  • buffer.memory – общий объем памяти в байтах, который продюсер может использовать для буферизации записей, ожидающих отправки на сервер.
  • max.block.ms – время, в течение которого методы send(), partitionsFor(), initTransactions(), sendOffsetsToTransaction(), commitTransaction() и abortTransaction() будут блокироваться в KafkaProducer.
  • linger.ms – время искусственной задержки перед отправкой сообщений в топики Kafka, чтобы объединить записи в пакет;
  • batch.size – размер пакета, в который объединяются сообщения для записи в Kafka.

Примечательно, что все эти конфигурации дают эффект не сами по себе, а тесно взаимосвязаны друг с другом. Поэтому чаще всего, настраивая один параметр, следует также изменить и другой. Как это сделать, чтобы повысить надежность продюсера Apache Kafka, мы рассмотрим далее.

Acks (acknowledges) и синхронизированные реплики

Acks (acknowledges) – это подтверждение, которое продюсер получает от брокера Kafka, чтобы гарантировать успешную передачу ему отправленного сообщения, прежде чем зафиксировать commit. Значение по умолчанию – 1 означает, что пока производитель получает подтверждение от ведущего брокера этого топика, он считает фиксацию успешной и переходит к следующему сообщению. Acks = 0 не дает никаких подтверждений о фиксации. А acks = all гарантирует, что продюсер получит подтверждения от всех синхронизированных реплик (ISR) этого топика, что обеспечивает максимальную долговечность сообщений, но требует много времени и увеличивает общую задержку.

Максимальное количество реплик равно количеству брокеров в кластере Apache Kafka. Среди реплик есть лидер, который обрабатывает все запросы на чтение и запись, а последователи пассивно копируют лидера. Синхронизированная реплика полностью догоняет лидера за время, указанное в replica.lag.time.max.ms. Если брокер выйдет из строя или возникнут проблемы с сетью и последователь не сможет связаться с лидером, и через replica.lag.time.max.ms секунд этот брокер будет удален из ISR. Конфигурация min.insync.replicas определяет, сколько реплик должен получить producer, прежде чем считать фиксацию успешной. Этот параметр добавляется поверх acks = all и делает хранение сообщения более надежным за счет некоторого увеличения временной задержки. Т. е. в этом случае разработчик и администратор Apache Kafka должен искать приемлемый баланс.

Повторные попытки отправки сообщений в случае неудачи

Можно разрешить продюсеру повторно отправлять сообщения, задав конфигурации retries значение больше 0 – максимальное количество повторных попыток, которое производитель сделает, если фиксация не удалась. Например, установив retries = 5, producer попытается повторить отправку максимум 5 раз. При этом в логе продюсера не отразится количество повторных попыток, т.к. там записывается только факт успешной или неуспешной фиксации в общем итоге. Но заметить повторную попытку можно из лога на стороне брокера по инкременту параметра retries.

Дублирование сообщений и как его предупредить

Бывают ситуации, когда сообщение было фактически передано всем синхронизированным репликам, но брокер не смог отправить ответное подтверждение, к примеру, из-за проблем с сетью. А если вышерассмотренная конфигурация retries>0, producer будет повторно отправлять сообщения, что может привести к их дублированию в топике Kafka. Предупредить это позволяет семантика строго однократной доставки сообщений (exactly once), о которой мы рассказывали здесь. Это можно обеспечить с помощью идемпотентности продюсера, включив ее в конфигурации enable.idempotent. Сообщения отправляются пакетами, каждый из которых имеет порядковый номер. На стороне брокера он отслеживает наибольший порядковый номер для каждого раздела. Если приходит пакет с меньшим или равным порядковым номером, брокер не запишет его в топик Kafka, что также обеспечивает порядок.

Отправка сообщений по порядку

Еще одна важная конфигурация для обеспечения порядка – max.in.flight.requests.per.connection – количество неподтвержденных запросов, которые могут быть помещены в буфер на стороне продюсера. Если количество повторных попыток больше 1 и первый запрос завершился неудачно, но второй запрос был успешным, то первый запрос будет отправлен повторно, а сообщения будут в неправильном порядке. Однако, как мы уже отметили ранее, при значении этой конфигурации больше 1 и есть риск изменения порядка сообщений из-за повторных попыток, если они разрешены. Когда при отключенной идемпотентности продюсера нужно сохранить порядок записи сообщений, вам следует установить для параметра max.in.flight.requests.per.connection значение 1. А если включив идемпотентность через CLI-скрипт kafka-acls.sh, определять конфигурацию max.in.flight.requests.per.connection уже не нужно: фремворк сам выберет подходящие значения для этого параметра. При установке несовместимых значений будет выброшено исключение ConfigException.

Буфер памяти и быстрая отправка сообщений

Когда producer вызывает send(), сообщения не отправляются в топик Kafka немедленно, а добавляются во внутренний буфер, размер которого по умолчанию равен 32 МБ. Если продюсер отправляет сообщения быстрее, чем они могут быть переданы брокеру, или случились перебои с сетью, накопленные сообщения не вмещаются в buffer.memory. Тогда вызов send() будет заблокирован на время, указанное в конфигурации max.block.ms (по умолчанию 1 минута). Эту проблему можно решить, увеличив оба значения. А также можно настроить еще 2 конфигурации: linger.ms и batch.size. linger.ms – время задержки до того, как пакеты будут готовы к отправке. Увеличение linger.ms снижает количество запросов и повышает пропускную способность, но приводит к тому, что в памяти хранится больше сообщений. Batch.size – максимальный размер одного пакета предупреждает слишком долгое ожидание сообщений перед отправкой. Конфигурации linger.ms и batch.size дополняют друг друга: пакеты будут отправлены при достижении любого из этих 2 лимитов. О том, как ненулевое значение linger.ms позволяет сэкономить место на диске, читайте в нашей новой статье.

Узнайте больше про администрирование кластеров Apache Kafka и разработку распределенных приложений потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://towardsdatascience.com/10-configs-to-make-your-kafka-producer-more-resilient-ec6903c63e3f
  2. https://kafka.apache.org/documentation/#producerapi