3 проблемы с топиками Kafka для администратора кластера и способы их решения

Автор Категория ,
3 проблемы с топиками Kafka для администратора кластера и способы их решения

В этой статье рассмотрим типичные проблемы топиков Apache Kafka, с которыми сталкивается каждый администратор Big Data кластера. Читайте далее, почему топики чрезмерно разрастаются, как работает очистка логов, когда старые сообщения могут остаться в почищенных сегментах и какие параметры конфигураций помогут справиться со всем этим.

Брокеры и разделы: как устроены топики Apache Kafka

Мы уже рассказывали о трудностях построения конвейера стирания больших данных на базе Apache Kafka с точки зрения инженерии Big Data. Сейчас сосредоточимся на задачах администрирования Kafka-кластера, но сперва кратко напомним принципы работы этой распределенной платформы потоковой передачи событий. Кластер Kafka состоит из одного или нескольких брокеров, и содержит топики, которые играют роль очереди сообщений, куда клиентские приложения записывают свои данные и считывают их оттуда. Топики разделены на разделы, которые распределены между брокерами. Любой брокер хранит несколько разделов каждого топика. Благодаря реплицированию разделов на несколько брокеров, Kafka обеспечивает избыточность для надежности: если какой-то брокер откажет, его разделы распределятся между остальными. Разделы хранятся на диске как «сегменты» и по умолчанию имеют размер 1 ГБ. Таким образом, чем больше размер раздела, тем больше сегментов у него будет на диске. На практике эта стройная архитектурная концепция имеет свои подводные камни, о которых мы поговорим далее [1].

Пики удаления или неравномерность очистки

Рассмотрим ситуацию с неравномерной очисткой топиков. К примеру, топик с коэффициентом репликации 3 настроен на хранение данных в течение недели, а его размер варьируется от 16 до 32 ГБ. Это тема с 32 разделами и. По умолчанию сегменты развертываются, когда они достигают размера 1 ГБ или раз в неделю, в зависимости от того, что наступит раньше. Политики удаления старых сообщений не действуют на активные сегменты. Если сегменты не заполнены, они не будут развернуты до тех пор, пока не будет выполнено количество часов, заданное в конфигурации log.roll.hours. Поэтому пик удаления старых сообщений представляет собой сумму размеров сегментов для всех разделов топика, которые не достигли 1 ГБ за 7 дней. А поскольку все эти процессы запускаются в одно и то же время еженедельно, возникает цикличность пик-спад.

Сама по себе такая цикличность не несет вреда, но для более эффективного использования дисковое пространства, оптимально иметь топики с постоянным размером. Пиков можно избежать, уменьшив значение параметра segment.bytes, который определяет размер сегмента, до объема данных, обрабатываемых топиком [1].

Справедливости ради стоит отметить, что конфигурация log.roll.hours позволяет задавать максимальное время до развертывания нового сегмента журнала (в часах). Она вторична по отношению к свойству log.roll.ms – максимальному времени до развертывания нового сегмента журнала в миллисекундах [2].

Размер имеет значение

Вторая проблема возникает, если размер топика Kafka превышает изначально ожидаемый. Например, некоторым приложениям необходимо предварительно загрузить и обработать все данные из топика, из-за большого размера которого время начальной загрузки приложений увеличивается, а их производительность – снижается. Чтобы очистить старые записи в топики и ограничить его размер, используются политики удаления Kafka. Однако, на практике, даже после настройки максимального размера топика и времени ожидания перед удалением файлов журнала, его размер может постоянно увеличиваться. Это случается из-за низкой степени детализации политики удаления: они могут удалять сегменты, но не старые данные внутри них. Таким образом, удаляются только те сегменты, в которых совокупная информация старше срока хранения, т.е. последний добавленный лог старше заданного времени. Это приводит к значительным колебаниям размера топика, включая его увеличение. Бороться с такой проблемой можно с помощью конфигурации segment.bytes [1]. Она управляет размером файла сегмента для журнала. Хранение и очистка всегда выполняются по одному файлу, поэтому больший размер сегмента означает меньшее количество файлов, но меньший контроль над хранением. Не стоит путать это со свойством log.segment.bytes – максимальным размером одного файла журнала [2].

Старые записи в топиках Kafka не удаляются

Если не до конца настроить политику удаления или сжатия (конфигурация cleanup.policy), то проблема разрастания размера топика может возникнуть снова. Например, даже при использовании политики компактного удаления с периодом хранения в одну неделю, в топике Kafka встречаются сообщения месячной давности. Это случается из-за того, что Kafka сначала запускает процесс уплотнения топика, а затем его очистки. Сжатие работает, анализируя весь сегмент, оставляя последнюю запись для каждого ключа сообщения и удаляя старые записи. Вместо того, чтобы создавать несколько небольших сегментов, политика сжатия объединит их.

Таким образом, на этапе сжатия неповторяющиеся ключи записей, которые находятся в начале топика и расположены в первых сегментах раздела, будут объединены с сегментами, содержащими более новые записи. Новые сегменты будут иметь ключи, срок хранения которых еще не истек. После завершения сжатия Kafka активирует политику удаления. А, поскольку политика удаления оперирует сегментами, а не отдельными сообщениями, она не удалит сегменты с данными, срок действия которых не истек. Поскольку самый старый сегмент содержит как старые, так и новые ключи, он не будет удален. Именно поэтому пользователь такого топика увидит очень старую информацию! Предупредить такую ситуацию можно, уменьшив размер segment.bytes [1].

В продолжение этого отметим, что очистка логов в Apache Kafka включена по умолчанию. Чтобы включить очистку логов определенного топика, следует задать конфигурацию log.cleanup.policy=compact. Этот параметр конфигурации брокера, определенный в файле брокера server.properties влияет на все разделы в кластере, для которых не действует переопределение конфигурации. Вообще конфигурация cleanup.policy может включать значения delete (удалить) или compact (сжать) для обозначения политики хранения в старых сегментах лога. Значение delete будет отбрасывать старые сегменты, когда будет достигнут предел их срока хранения или размера, а compact – включает сжатие лога в топике [2].

Разобраться со всеми конфигурационными настройками и освоить администрирование кластера Apache Kafka, чтобы обеспечить надежность потоковой аналитики больших данных, вам помогут специализированные курсы в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

 

 

Источники

  1. https://medium.com/thousandeyes-engineering/kafka-topics-pitfalls-and-insights-38bafc791a83
  2. https://kafka.apache.org/documentation