Управление хранением данных в Apache Kafka: 5 главных конфигураций

Apache Kafka архитектура и принципы работы примеры курсы обучение, администрирование кластера Kafka примеры курсы обучение, обучение Kafka, курсы Apache Kafka, Kafka администратор кластера курсы, Apache Kafka для дата-инженеров, Apache Kafka для администраторов и инженеров данных, Школа Больших Данных Учебный центр Коммерсант

Политики хранения, сжатия и очистки данных в топиках Apache Kafka: какие конфигурации нужно настроить, чтобы работать с файлами распределенных логов наиболее эффективно. Ликбез для администратора кластера Kafka и дата-инженера.

Хранение данных в Apache Kafka

Мы уже писали, что топик в Apache Kafka представляет собой не физическое, а логическое хранение данных. Топики делятся на разделы, которые, в свою очередь, состоят из сегментов. Сообщение, отправленное в кластер Kafka, добавляется в конец одного из журналов и остается там в течение настраиваемого периода времени или до тех пор, пока не будет достигнут предельный размер лога. Таким образом, можно сказать, что хранение сообщений в топиках Kafka может быть организовано одним из следующих способов:

  • на основе времени, когда сегменты лога помечаются для удаления или сжатия по истечении настроенного времени хранения для сегмента журнала, в зависимости от настроенной политики очистки. По умолчанию срок хранения сегментов составляет 7 дней. При этом настраиваются такие конфигурации брокера, как retention.ms (количество миллисекунд хранения файла журнала перед его удалением), log.retention.minutes (количество минут хранения файла журнала перед его удалением, вторичное по отношению к свойству log.retention.ms) или log.retention.hours (количество часов хранения файла журнала перед его удалением, наименее приоритетная настройка).
  • на основе максимально допустимого размера журнала для раздела топика. Когда размер лог достигает этого предела, начинается удаление сегментов с конца. При этом настраивается конфигурация retention.bytes.

Сжатие лога гарантирует, что Kafka всегда будет сохранять по крайней мере последнее известное значение для каждого ключа сообщения в журнале для одного раздела топика, чтобы гарантировать восстановление состояния после сбоя или повторную загрузку кэшей после перезапуска приложения во время эксплуатационного обслуживания. Простой подход к хранению данных предполагает, что старые данные журнала удаляются по истечении фиксированного периода времени или достижении лог-файлом предельного размера. Это отлично подходит для временных данных о событиях, таких как регистрация, где каждая запись стоит отдельно. Но в случае более сложных сценариев, например, захват измененных данных (CDC), где выполняется журналирование изменений с ключами, следует использовать другой способ.

К примеру, при подписке на изменения системы-источника, когда каждое изменение должно быть отражено в нескольких приемниках. Чтобы перезагрузить кэш или восстановить неисправный поисковый узел, нужен понадобиться полный набор данных, а не только последний лог. Аналогично при поиске событий, когда приложение использует журнал изменений в качестве основного хранилища данных для запросов. Также это пригодится при журналировании данных в системы высокой доступности для обеспечения отказоустойчивости. Процесс, выполняющий локальные вычисления, можно сделать отказоустойчивым, записав изменения, которые он вносит в свое локальное состояние, чтобы другой процесс мог использовать их и продолжить работу в случае сбоя. Это применимо в обработке счетчиков, агрегаций и другой подобной группировке данных в системе потоковых запросов.

В каждом из таких случаев сперва необходимо обрабатывать поток изменений в реальном времени. Но при сбое данные необходимо повторно загрузить или повторно обработать, выполнив полную загрузку. Сжатие логов позволяет отправить эти данные в один и тот же вспомогательный топик. Какие конфигурации при этом настраиваются, рассмотрим далее.

Сжатие и удаление данных

За работу с данными, которые вышли за пределы допустимого хранения в топиках, о чем мы говорили выше, отвечает конфигурация log.cleanup.policy — политика очистки, согласно которой старые сообщения будут удалены (delete) или сжаты (compact). Сжатием журналов занимается очиститель, в котором работает пул фоновых потоков, повторно копирующих файлы сегментов журнала, удаляя записи с ключом, указанном в заголовке. Каждый поток при этом работает следующим образом:

  • выбирает наиболее длинный журнал;
  • создает краткую сводку последнего смещения для каждого ключа в заголовке журнала. Сводка заголовка журнала — это компактная хеш-таблица, которая использует ровно 24 байта на запись, во много раз меньше изначального размера.
  • повторно копирует журнал от начала до конца, удаляя ключи, которые позже появляются в журнале. Новые сегменты немедленно заменяются, поэтому требуется дополнительное дисковое пространство только для одного дополнительного сегмента журнала, а не для всей полной копии.

При этом учитывается значение конфигурации log.cleaner.enable, по умолчанию равной true, что включает процесс очистки журнала для запуска на сервере. Эта конфигурация должна быть включена для топиков с политикой очистки cleanup.policy=compact, включая топик внутренних смещений. Иначе эти топики не будут сжиматься и продолжат постоянно увеличиваться в размерах. Также администратор кластера Kafka может настроить конфигурации log.cleaner.dedupe.buffer.size (общая память для дедупликации журнала во всех потоках очистки) и log.cleaner.delete.retention.ms (время, в течение которого должны сохраняться маркеры удаления для сжатых журналов топика). Параметр log.cleaner.delete.retention.ms также задает ограничение времени, в течение которого потребитель должен завершить чтение, если он начинает со смещения 0, чтобы убедиться, что он получил действительный снимок последнего этапа.

За продолжительность хранения удаленных данных отвечают следующие конфигурации Kafka:

  • log.cleaner.max.compaction.lag.ms — максимальное время, в течение которого сообщение не может быть сжато в журнале;
  • log.cleaner.min.compaction.lag.ms — минимальное время, в течение которого сообщение будет оставаться несжатым в журнале;
  • log.cleaner.min.cleanable.ratio — минимальное соотношение старого журнала к общему, при котором он подлежит очистке. Если также заданы конфигурации log.cleaner.max.compaction.lag.ms или log.cleaner.min.compaction.lag.ms, то средство сжатия журналов считает журнал пригодным для сжатия, как только достигнуто пороговое значение log.cleaner.min.cleanable.ratio, и в журнале есть несжатые записи в течение как минимум времени log.cleaner.min.compaction.lag.ms, или если в журнале присутствуют несжатые записи не более периода log.cleaner.max.compaction.lag.ms.

Таким образом, в Apache Kafka сообщения не удаляются сразу после их использования, т.е. считывания приложением-потребителем, а задается конфигурацией топика, политикой очистки. По умолчанию используется политика удаления, когда старые сегменты удаляются, если превышено их время хранения или предельный размер. Политика сжатия активирует сжатие журналов, выборочно удаляя записи в каждом разделе, где есть более свежее обновление с тем же первичным ключом. Это гарантирует, что в журнале будет хотя бы последнее состояние для каждого ключа. Можно комбинировать оба эти способа в политике очистки, одновременно указав значения удаления и сжатия. Тогда журнал будет сжат, но процесс очистки также будет выполняться согласно настройкам времени хранения или ограничения размера данных. Подробнее про сжатие данных в топиках Kafka мы рассказывали здесь.

Освойте администрирование и эксплуатацию Apache Kafka и Flink для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://medium.com/@goyalyatin20/managing-data-retention-in-kafka-158e9fe761d3
  2. https://kafka.apache.org/documentation

Поиск по сайту