Apache Kafka 3.2: что нового?

Kafka обновление 3.2.0, администрирование кластера Kafka, Kafka кластер администратор примеры курсы обучение, Apache Kafka для разработчика и дата-инженера примеры курсы обучение, обучение большим данным, Школа Больших Данных Учебный Центр Коммерсант

17 мая 2022 года вышел очередной релиз главной платформы потоковой передачи событий. Смотрим самые важные обновления свежей Apache Kafka 3.2.0 с точки зрения разработчика распределенных приложений, дата-инженера и администратора кластера.

ТОП-5 новинок свежей версии Apache Kafka для администратора кластера

Apache Kafka 3.2.0 включает 2 новые фичи, 36 улучшений и 65 исправлений ошибок. Одним из исправленных багов стала замена библиотек log4j и slf4j-log4j12 на reload4j и slf4j-reload4j. О проблемах информационной безопасности из-за критической уязвимости в библиотеке Apache Log4j мы писали здесь. Обновление влияет только на модули, которые указывают серверную часть логирования, например, connect-runtime и kafka-tools. Ряд модулей, в т.ч. kafka-client, позволяют приложению указать нужный бэкенд логирования. Проекты, которые зависят от затронутых модулей Kafka, должны использовать slf4j-log4j12 не ниже версии 1.7.35 или slf4j-reload4j, чтобы избежать проблем совместимости с инфраструктурой ведения журналов. Возврат к log4j, но уже версии 2.x планируется в следующем мажорном релизе Apache Kafka

Для администраторов кластера, которые хотят окончательно избавиться от Zookeeper, пригодится новый авторизатор KRaft. Хотя этот режим KRaft пока еще не рекомендуется для реальных проектов, он получит статус «готовый к работе» уже в Apache Kafka 3.3. А в версии 3.2.0 представлен встроенный авторизатор StandardAuthorizer, который не зависит от Zookeeper, позволяя безопасно запустить кластер Kafka без этого стороннего сервиса синхронизации метаданных. StandardAuthorizer хранит свои ACL-списки в разделе __cluster_metadata и используется по умолчанию в кластерах KRaft. StandardAuthorizer выполняет все те же действия, что и AclAuthorizer для кластеров, зависящих от Zookeeper.

Теперь контроллер может сообщать вновь избранному лидеру раздела топика, был ли он избран с использованием неясной стратегии выбора лидера, чтобы тот восстановил свое состояние. Это пригодится для очистки состояния транзакции, которое может остаться несогласованным после перевыбора.

При большом количестве крупных клиентов, избрание предпочтительного лидера может привести к задержке повторных попыток или замедлению продюсеров из-за множества подключений, почти одновременно открытых клиентами. При этом SYN-бэклог для принимающих сокетов TCP заполняется слишком быстро. Чтобы избежать этого, в Apache Kafka 3.2.0 представлена ​​новая конфигурация socket.listen.backlog.size, которая позволяет установить размер SYN-бэклога для принимающих TCP-сокетов на брокерах. Увеличение значения этой конфигурации может смягчить проблемы из-за большого количества открытых подключений.

Когда потребитель покидает группу или присоединяется к ней, в свежем выпуске платформы регистрируется причина, чтобы упростить устранение неполадок при перебалансировке, вызванной LeaveGroupRequest и JoinGroupRequest.

Начиная с версии 2.4.0, когда было введено статическое членство, потребители могут присоединиться к группе потребителей после непродолжительного отсутствия без запуска перебалансировки. Если бы лидер потребительской группы ненадолго отсутствовал, а затем вернулся, то он остался бы лидером. Но из-за невозможности сообщить вновь присоединившемуся потребителю, что он по-прежнему является лидером, без запуска перебалансировки группа могла пропустить изменения метаданных, например, увеличение раздела. Теперь возвращающийся лидер информируется о своем лидерстве без вычисления нового назначения.

Наконец, в Apache Kafka 3.2.0 разрешена настройка num.network.threads на каждого слушателя, чтобы индивидуально устанавливать размер пула сетевых потоков и динамически реагировать на всплески трафика или снижать использование памяти при разной сетевой нагрузке. Ранее существовавшая конфигурация num.network.threads обновлена для поддержки установки на определенных слушателях через listener.name.<name of the listener>.num.network.threads.

Самые важные обновления для разработчика распределенных приложений и дата-инженера

Начиная с Apache Kafka 3.2.0, Kafka Streams может распределять резервные реплики по отдельным стойкам, используя теги в конфигурации приложения. Например, клиенты Kafka Streams могут быть помечены кластером или облачным регионом, в котором они работают. Пользователи могут указать теги, которые следует использовать для распространения резервных реплик с учетом стоек, задав конфигурацию rack.aware.assignment.tags. Во время назначения задачи Kafka Streams постарается распределить резервные реплики по ее разным измерениям, чтобы повысить отказоустойчивость в случае выхода из строя всей стойки. Это пригодится, например, для обеспечения распределения реплик по разным зонам доступности в провайдере облачного хостинга. О том, как это устроено и какую пользу приносит, читайте в нашей новой статье.

Добавлено поле кода ошибки верхнего уровня в ответ API DescribeLogDirs. Ранее DescribeLogDirs возвращал пустой ответ, если у пользователей не было необходимой авторизации для запроса, а клиенты должны были интерпретировать это как ошибку CLUSTER_AUTHORIZATION_FAILED. Теперь API DescribeLogDirs совместим с другими API-интерфейсами и позволяет возвращать другие ошибки, кроме CLUSTER_AUTHORIZATION_FAILED.

Важный инструмент отладки, kafka-console-producer, теперь может записывать заголовки и нулевые значения, добавляя их к записи сжатого топика. В каких случаях следует отказаться от этого, мы разбираем здесь.

Еще одной новой фичей является улучшенный интерфейс интерактивных запросов в Kafka Streams (Interactive Query v2, IQv2), чтобы упростить и ускорить запросы к хранилищу состояний, а также снизить затраты на обслуживание при изменении существующих хранилищ состояний и добавлении новых. Определенные типы запросов можно добавить в IQv2 путем реализации интерфейса Query. Также определен класс KeyQuery, позволяющий пользователям оценивать поиск по ключу/значению с помощью IQv2.

Добавлен класс RangeQuery в IQv2, который реализует интерфейс Query, позволяющий запрашивать хранилища состояний в диапазоне, заданном верхней или нижней границей ключа, а также сканировать все записи хранилища состояний, когда границы не указаны.

Добавлены 2 класса, реализующие интерфейс Query для сканирования окон:

  • WindowKeyQuery – сканирует окна с заданным ключом в заданном временном диапазоне;
  • WindowRangeQuery – сканирует окна в заданном временном диапазоне независимо от ключей.

Пока IQv2 находится в бета-версии, а его интерфейсы помечены как @Evolving, т.е они могут нарушить совместимость в второстепенных выпусках без периода устаревания, если пользователи обнаружат существенные недостатки в текущем API. В будущем выпуске аннотация @Evolving будет удалена, а сам IQv2 станет стабильным.

В Kafka 3.2.0 добавлен метод recordMetadata() в класс StateStoreContext, чтобы предоставить доступ к топику, разделу и смещению обрабатываемой в данный момент записи. Предоставление текущего контекста таким образом позволяет хранилищам состояний отслеживать свое текущее смещение в каждом входном разделе, чтобы реализовать механизмы согласованности.

Идемпотентность для продюсера включена по умолчанию, если не установлены конфликтующие конфигурации. В версиях 3.0.0 и 3.1.0 ошибка не позволяла применять это значение по умолчанию. Идемпотентность оставалась отключенной, если только пользователь явно не установил для конфигурации enable.idempotence значение true. Эта проблема исправлена: в версиях 3.0.1, 3.1.1 и 3.2.0 значения по умолчанию применяются корректно.

Теперь Connect по умолчанию отключает идемпотентное поведение для всех своих продюсеров, чтобы единообразно поддерживать любые версии брокера Kafka. Пользователи могут изменить это поведение, чтобы включить идемпотентность для некоторых или всех продюсеров с помощью конфигурации исполнителя Connect и/или коннектора. Connect может включить идемпотентных продюсеров по умолчанию в будущем мажорном релизе.

Улучшен Connect API для получения списка всех подключаемых модулей коннектора и получения определений их конфигурации: расширена конечная точка GET /connector-plugins – добавлен новый параметр запроса connectsOnly. Этот параметр, установленный в значение false, выводит список всех доступных подключаемых модулей, а не только коннекторов. Так пользователи могут проверять, какие подключаемые модули доступны, без необходимости знать, как настроена среда выполнения Kafka Connect. По умолчанию для ConnectorsOnly установлено значение true, чтобы обеспечить обратную совместимость через использование GET /connector-plugins?connectorsOnly=false. Также добавлена еще одна конечная точка, которая возвращает конфигурации данного плагина через GET /connector-plugins/<plugin>/config. Новая конечная точка работает со всеми подключаемыми модулями, возвращаемыми командой GET /connector-plugins.

Добавлена ​​поддержка различной точности времени Unix в TimestampConverter SMT с помощью нового дополнительного поля конфигурации unix.precision. Это позволяет пользователю определить желаемую точность для SMT в секундах, миллисекундах, микросекундах и наносекундах, т.к. во внешних системах Unix время представлено с разной точностью.

В Kafka 3.2.0 исходные задачи могут обрабатывать исключения продюсера, чтобы повысить устойчивость коннекторов. Поскольку исходные коннекторы принимают данные из внешних систем, полученные сообщения могут быть слишком большими или необрабатываемыми для настроенной рабочей роли Connect, брокера Kafka и других компонентов экосистемы. Раньше такая ошибка всегда прерывала коннектор. Теперь WorkerSourceTask проверяет сконфигурированное значение error.tolerance при сбое отправки сообщения. Если для error.tolerance задано значение all, WorkerSourceTask проигнорирует исключение, позволяя коннектору подтвердить свою исходную систему и продолжить обработку. Если для error.tolerance не задано значение all, исходный коннектор завершится ошибкой.

Существующие source-коннекторы, которые задают для error.tolerance значение all и ожидают выхода из строя при сбое продюсера, необходимо обновить. Это изменение не затронет source-коннекторы, для которых не задано all-значение error.tolerance, и они будут уничтожены в случае сбоя продюсера.

Наконец, примеры коннекторов FileStreamSourceConnector и FileStreamSinkConnector были удалены из пути к классам по умолчанию. Чтобы использовать их в автономном или распределенном режиме Kafka Connect, их необходимо явно добавить, например, CLASSPATH=./lib/connect-file-3.2.0.jar ./bin/connect-distributed.sh

Как перейти на релиз 3.2.0

При обновлении до релиза 3.2.0 с любой версии от 0.8.x до 3.1.x следует помнить, что после изменения inter.broker.protocol.version вернуться обратно невозможно. Для перехода нужно проделать следующий набор действий:

  1. Обновить properties для всех брокеров и добавить свойства CURRENT_KAFKA_VERSION (текущая версия, с которой идет обновление) и CURRENT_MESSAGE_FORMAT_VERSION (используемый в текущей версии формат сообщения). Если версия формата сообщения была переопределена, следует сохранить ее текущее значение. В качестве альтернативы при обновлении с версии до 0.11.0.x, CURRENT_MESSAGE_FORMAT_VERSION должен соответствовать значению CURRENT_KAFKA_VERSION.
  • broker.protocol.version=CURRENT_KAFKA_VERSION (например, 3.1, 3.0 и пр.)
  • message.format.version=CURRENT_MESSAGE_FORMAT_VERSION

При обновлении с версии 0.11.0.x или выше без переопределения формат сообщения нужно переопределить только версию межброкерского протокола:

  • broker.protocol.version=CURRENT_KAFKA_VERSION (например, 3.1, 3.0 и пр.)
  1. Обновлять брокеры нужно по одному, выключив каждый, обновив код и перезапустив его, чтобы убедиться в совпадении фактического поведения и производительности кластера ожиданиям. На этом этапе все еще можно откатиться назад, понизив версию, если возникли какие-либо проблемы.
  2. После проверки поведения и производительности кластера нужно повысить версию протокола, отредактировав broker.protocol.version и установив для нее значение 3.2.
  3. Чтобы новая версия протокола вступила в силу, следует перезапустить брокеры по одному. Как только брокеры начнут использовать последнюю версию протокола, откат назад, до предыдущей версии, станет невозможным.
  4. При переопределении версии формата сообщения, необходимо выполнить еще один последовательный перезапуск, чтобы обновить его до последней версии. Когда все или большинство потребителей будут обновлены до версии 0.11.0 или более поздней, нужно изменить log.message.format.version на 3.2 на каждом брокере и перезапустить их один за другим. Старые клиенты Scala, которые больше не поддерживаются, не поддерживают формат сообщений, представленный в версии 0.11. Поэтому, чтобы избежать затрат на преобразование и применять семантику строго-однократной доставки сообщений (exactly once), следует использовать более новые клиенты Java.

Освойте практические навыки администрирования и эксплуатации Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://www.confluent.io/blog/apache-kafka-3-2-0-new-features-and-updates/
  2. https://downloads.apache.org/kafka/3.2.0/RELEASE_NOTES.html
  3. https://kafka.apache.org/32/documentation.html#upgrade

Поиск по сайту