Проблема межкластерных транзакций в Apache Kafka и способы ее решения

Автор Категория ,
Проблема межкластерных транзакций в Apache Kafka и способы ее решения

Продолжая говорить про обучение разработчиков и администраторов Apache Kafka, сегодня разберем сложности семантики строго однократной доставки сообщений (exactly once) в случае нескольких экземплярах, находящихся в разных кластерах. Читайте далее, что не так с межкластерными транзакциями, какие KIP’ы связаны с этой проблемой и при чем здесь MirrorMaker.

Что не так с межкластерными транзакциями Apache Kafka и какие KIP пытаются ее решить

Для обеспечения строго однократной семантики доставки сообщений (exactly once) в распределенных системах используются специальные протоколы и алгоритмы: двухфазная фиксация, Paxos и Raft. Поддержка семантики exactly once – одно из главных достоинств Apache Kafka, что выгодно отличает эту платформу потоковой обработки событий от другого популярного брокера сообщений, Rabbit MQ. Подробнее об этом мы рассказывали в статье «Apache Kafka vs RabbitMQ».

 Однако, задача транзакционной обработки усложняется, когда в корпоративном масштабе запускается несколько экземпляров Apache Kafka, чтобы обеспечить 100%-ную надежность Big Data системы даже в случае аварийного отказа целого ряда брокеров. Также межкластерные транзакции встречаются при объединении нескольких местных кластеров в единый экземпляр, например, при распределенной филиальной структуре предприятия с агрегацией в одном центре. Наконец, возможно существование разных экземпляров Kafka для различных целей, например, отдельно для приема данных, настроенный так, чтобы обеспечить высокую пропускную способность, и для выполнения ресурсоемких вычислений [1].

Чтобы решить эту проблему, сообщество разработчиков Apache Kafka создало следующие предложения по улучшению (KIP, Kafka Improvement Proposals) [1]:

  • KIP-447 – масштабируемость производителей сообщений, которые могут записывать данные в несколько разделов атомарно, так что все записи считаются успешными или нет.
  • KIP-360 – повышение надежности идемпотентного/транзакционного продюсера, семантика которого зависит от состояния сохранения брокером для каждого активного идентификатора производителя, например, эпохи и порядкового номера.
  • KIP-588 – возможность продюсеров плавно восстанавливаться после тайм-аутов транзакций.
  • KIP-656 – строго однократная доставка с инструментом межкластерной репликации MirrorMaker

Что именно представляют собой эти предлагаемые улучшения Apache Kafka и как это связано с межкластерными транзакциями, мы рассмотрим далее.

KIP-447: идентификаторы транзакций и группы потребителей

В Kafka Streams это обеспечивает единовременную обработку топиков: идентификатор транзакции позволяет сохранить состояние вычислений при сбоях и перезапусках процессов, гарантируя наличие только одного активного экземпляра продюсера в рамках эпохи. Это реализуется в API initTransactions(). KIP-447 пытается решить проблему семантического несоответствия между потребителями в группе и транзакционными продюсерами с учетом перебалансировки потребителей и неизменности идентификатора транзакции, назначенного согласованному набору входных разделов. Такая архитектура плохо масштабируется по мере увеличения количества входных разделов, т.к. у каждого производителя есть свои буферы памяти, потоки, сетевые подключения и пр. отдельные конфигурации, которые ограничивают его производительность. Также несколько одновременных транзакций и дополнительное управление метаданными увеличивают нагрузку на брокеры.

Суть проблемы в том, что координаторам транзакций неизвестна семантика группы потребителей и возможность перемещения разделов можно перемещать между процессами.  KIP-447 предлагает обойти это ограничение, уменьшив тайм-аут транзакции до того же значения по умолчанию, что и тайм-аут сеанса в Kafka Streams, чтобы сократить риск потери производительности из-за задержки выборки со смещением, когда некоторые экземпляры случайно выходят из строя. В настоящее время решение «Один производитель на поток для Kafka Streams» находится в статусе «Адаптировано», а сам KIP-447 в JIRA, где собраны задачи по улучшению и исправлению ошибок Kafka, маркирован отметкой Resolved (Решено).

KIP-360: как повысить надежность идемпотентного/транзакционного продюсера

Идемпотентная/транзакционная семантика зависит от состояния сохранения брокером для каждого активного идентификатора производителя, например, эпохи и порядкового номера. Когда брокер теряет это состояние из-за удаления сегмента или вызова DeleteRecords, то дополнительные запросы на создание приведут к ошибке UNKNOWN_PRODUCER_ID. Производитель пытается обработать эту ошибку, сравнивая последнее подтвержденное смещение со смещением начала лога. Если последнее подтвержденное смещение меньше смещения начала лога, то производитель считает ошибку ложной, сбрасывает порядковый номер на 0 и повторяет попытку, используя существующую эпоху. Однако, этот подход чреват следующими проблемами:

  • сброс порядкового номера безопасен только, если запрос на создание не записан в журнал. А если запрос приходится повторить, то нет возможности узнать об успешности первой попытки и приходится рассматривать это как фатальную ошибку производителя. Это нарушает уникальность создаваемых записей и принципиально небезопасно.
  • нет никаких строгих гарантий того, что смещение начала лога будет монотонно увеличиваться. В частности, можно выбрать нового лидера, не увидев последнего смещения начала журнала относительно предыдущего лидера. При этом состояние продюсера, которое когда-то было потеряно, может снова появиться после переключения лидера, что может привести к ошибке нарушения последовательности, если производитель уже сбросил свой порядковый номер. Это также фатальная ошибка производителя. А отсутствие проверки при первой записи продюсера допускает немонотонные обновления и, следовательно, зависшие транзакции.

Предлагаемые изменения KIP-360 включает безопасное увеличение эпохи, длительное сохранение состояния производителя и упрощенную обработку ошибок клиента. В настоящее время это предложение для повышения надежности идемпотентного/транзакционного производителя находится в статусе «Адаптировано», а сам KIP-360 в JIRA маркирован отметкой Resolved (Решено) [3].

KIP-588: плавное восстановление продюсеров после тайм-аутов транзакций

Координатор транзакций использует эпоху продюсера, чтобы гарантировать единую область записи для каждого идентификатора транзакции. Когда новый производитель с тем же идентификатором транзакции запускается, координатор меняет эпоху. Поэтому предыдущий продюсер, использующий старую эпоху, должен быть изолирован фатальным исключением ProducerFenced. Но эпоха производителя также может быть сброшена, когда время транзакции истекает на стороне координатора из-за короткого периода бездействия клиента при проблемах с сетью. В этом случае, когда продюсер снова подключится к сети и попытается продолжить, он получит исключение ProducerFenced даже при отсутствии конкурентов. Приложение должно завершить работу текущего производителя и запустить нового, дополнительно вычислив громоздкую логику конструкции try-catch.

Это можно улучшить, позволив координатору запоминать факт прерывания транзакции, чтобы актуализировать эпоху текущего продюсера и дать ему возможность продолжать. В частности, когда время транзакции истекает, следует установить lastProducerEpoch на текущую эпоху. Любые транзакционные запросы из старой эпохи приведут к новому коду ошибки TRANSACTION_TIMED_OUT, который передается приложению по всем API-интерфейсам координатора транзакций производителя: AddPartitionsToTransaction, AddOffsetsToTransactio и EndTransaction. Продюсер восстанавливается через внутреннюю отправку InitProducerId с текущей эпохой, а координатор транзакций возвращает удачную эпоху. На май 2021 года KIP-588 находится в статусе «Принято» (Accepted), а его решение в JIRA маркировано отметкой Open (Открыто) [4].

KIP-656: Exactly-Once семантика для MirrorMaker2 и особенности ее реализации

Mirror Maker — это утилита Apache Kafka для зеркального копирования данных с помощью группы потребителей, которые читают данные из выбранного для копирования набора топиков. Mirror Maker запускает для каждого потребителя поток выполнения, который считывает данные из нужных топиков и разделов исходного кластера. Затем создается продюсер для отправки считанных данных на целевой кластер. В настоящее время MirrorMaker2 реализован на платформе Kafka Connect, в модуле Source Connector/Task, который не предоставляют семантику exactly once и не поддерживают идентификатор транзакции.

KIP-656 предлагает включить семантику строго однократной доставки для MirrorMaker 2, расширив SinkTask с помощью новой реализации MirrorSinkTask. Она может управлять смещениями потребителей транзакционным способом подобно HDFS Sink Connector, чтобы сообщения могли доставляться по кластерам [5].

Таким образом, семантика exactly once для одного кластера применяется сразу к нескольким, а единственный источник достоверности смещений потребителей управляется, передается продюсерам и хранится в целевом кластере. Но потребитель по-прежнему существует в исходном кластере, извлекая данные из него. Поэтому необходимо обеспечить корректное изменение смещения потребителя, когда задание передачи данных, т.е. задание MirrorMaker, перезапускается или перебалансируется, а смещения потребителей сохраняются в целевом кластере.

Это можно реализовать, искусственно создав отдельную группу потребителей, которая не будет использовать никаких фактических записей, а работает только со смещениями в топике, который в целевом кластере выполняет роль источника истины. Это позволяет потребителю в MirrorMaker не отслеживать внутренние смещения и обеспечивает согласованную запись транзакций в нескольких кластерах.

В результате этой идеи при успешном завершении транзакции искусственно созданный топик со смещениями в целевом кластере обновляется согласно текущему протоколу exactly once платформы Apache Kafka. В случае прерывания транзакции все записи данных удаляются, а топик со смещениями в целевом кластере не обновляется. Если MirrorMaker перезапускается, он возобновляет работу с последними зафиксированными смещениями, хранящимися в целевом кластере [1]. Сейчас KIP-656 находится в статусе «Черновик» (Draft), а его решение в JIRA маркировано отметкой PATCH AVAILABLE (Обновление доступно) [5].

Узнайте больше об особенностях администрирования кластеров Apache Kafka и разработки распределенных приложений потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://towardsdatascience.com/exactly-once-semantics-across-multiple-kafka-instances-is-possible-20bf900c29cf
  2. https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
  3. https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89068820
  4. https://cwiki.apache.org/confluence/display/KAFKA/KIP-588%3A+Allow+producers+to+recover+gracefully+from+transaction+timeouts
  5. https://cwiki.apache.org/confluence/display/KAFKA/KIP-656%3A+MirrorMaker2+Exactly-once+Semantics