Как устроен API администратора Apache Kafka: методы AdminClient с примерами

курсы Kafka администратор кластера, обучение Kafka для разработчиков, обучение Apache Kafka, курсы Apache Kafka, Kafka AdminClient, Admin Client Kafka пример, Школа Больших Данных Учебный центр Коммерсант

В рамках курсов по Apache Kafka для разработчиков и администраторов кластера, сегодня заглянем под капот AdminClient и на практических примерах разберем, как динамически создавать новый топик и описывать его программным способом через API. Еще рассмотрим, почему метод deleteTopics() нужно применять очень осторожно, а также вспомним основы ООП, говоря про классы AdminClient и KafkaAdminClient с интерфейсом Admin.

Что такое AdminClient в Apache Kafka и как он работает

Начиная с версии 0.11, выпущенной в 2017 году, Apache Kafka включает AdminClient, который представляет собой программный API администратора. Он позволяет программным образом, а не через CLI-интерфейс просматривать, создавать и удалять топики, а также настраивать кластер, управлять ACL-списками и изменять конфигурации. Например, если приложение-продюсер будет записывать события в конкретный топик, он должен существовать. Для этого администратору нужно создать топик Kafka, определив его имя, число разделов (партиций) и коэффициент репликации. Для этого в AdminClient есть метод createTopics​(Collection<NewTopic> newTopics, CreateTopicsOptions options). Он создает серию новых топиков и относится к классу CreateTopicsResult [1]:

CreateTopicsResult newTopic = adminClient.createTopics

(Collections.singletonList(new NewTopic(TOPIC_NAME, NUM_PARTITIONS, REPLICATION_FACTOR)));

Если топик с заданными параметрами не существует, то выполняется его создание. Если же такой топик уже есть или вызывается отдельный метод проверки результатов CreateTopic, может быть сгенерировано исключение TopicExistsException. Справиться с этим и проверить правильность конфигурации поможет описание топика через метод describeTopics​(Collection<String> topicNames, DescribeTopicsOptions options), который возвращает объект DescribeTopicResult, который настраивает соответствие имен топиков с их описаниями.

DescribeTopicsResult demoTopic=adminClient.describeTopics(TOPIC_LIST);

TopicDescription topicDescription = demoTopic.values().get(TOPIC_NAME).get();

Если топик с указанными параметрами существует, объект TopicDescription содержит список всех его разделов и для каждого из них какой брокер является лидером, список имеющихся реплик и выполняемых сейчас. Иначе, если топик не существует, сервер не может сопоставить его с описанием и вернет исключение ExecutionException. Впрочем, все объекты результатов AdminClient выдают исключение ExecutionException, когда Kafka отвечает с ошибкой. Поэтому, чтобы определить истинную ошибку, возвращенную Kafka, следует определить причину исключения ExecutionException [1]. Подробнее об этом мы поговорим далее, заглянув в иерархию классов Admin, AdminClient и KafkaAdminClient.

Что под капотом KafkaAdminClient: примеры применения с погружением в ООП

На верхнем уровне иерархии классов находится интерфейс Admin. Напомним, в ООП-парадигме интерфейс можно рассматривать как класс без атрибутов, но с методами, которые описывают поведение объекта класса, реализующего этот интерфейс. Это позволяет обеспечить один из ключевых принципов ООП – полиморфизм, чтобы унифицировать методы работы с разными объектами, вне зависимости от их типа и внутренней структуры. 

В Apache Kafka интерфейс Admin представляет собой клиент администрирования, который поддерживает управление и проверку топиков, брокеров, конфигураций и списков контроля доступа (ACL).

Экземпляры, возвращаемые методами создания этого интерфейса, гарантированно являются потокобезопасными. Однако объекты класса KafkaFutures, возвращаемые из методов запроса, выполняются одним потоком. Поэтому важно, чтобы любой код, который выполняется в этом потоке по завершении, не блокировался слишком долго, например, передать обработку результатов другому потоку.

В интерфейсе Admin операции соответствуют следующему шаблону [2]:

  • экземпляры Admin должны быть созданы с помощью методов create(Properties) или create(Map);
  • каждая операция обычно имеет два overloaded-метода, один из которых использует набор параметров по умолчанию, а во втором последний параметр сам является явным объектом;
  • первый параметр — это коллекция (Collection) элементов, над которыми выполняется операция. Пакетирование нескольких запросов в один вызов более эффективно и предпочтительнее нескольких вызовов одного и того же метода. Для получения общего результата пакета предоставляется метод all(), а метод values​​() обеспечивает доступ к каждому элементу пакета запроса.
  • методы операций выполняются асинхронно, причем каждый метод операции xxx возвращает класс XxxResult с методами объекта KafkaFuture для доступа к результатам операции. Для синхронного поведения используется метод get().

Конфигурация bootstrap-серверов, которые определяют адреса брокеров с метаданными о кластере Kafka, переданная в create(Properties) или create(Map), используется для обнаружения брокеров в кластере, к которым будет подключаться клиент при необходимости. Чтобы избежать проблем с недоступностью брокеров, достаточно указать 2-3 адреса. На практике разные операции обращаются к разным узлам кластера. Например, createTopics(Collection) взаимодействует с контроллером – брокером, который был запущен раньше остальных и отвечает за упревление состоянием разделов и реплик, в частности, выбор ведущих реплик для разделов. А метод describeTopics(Collection) может взаимодействовать с любым брокером. Если получатель ответа на запрос не имеет значения, экземпляр попытается использовать брокера с наименьшим количеством невыполненных запросов. С точки зрения администратора кластера Kafka, интерфейс Admin как клиент удобен тем, что он позволит прозрачно повторять возникшие ошибки, чтобы определить их причину и исправить. Например, если запрос createTopics() будет отправлен на узел, который не является контроллером, метаданные будут обновлены, а сам запрос повторно отправится к контроллеру [2].

Интерфейс Admin реализуется классом AdminClient – абстрактным классом для встроенных клиентов [3] и не имеет объектов, которые содержат методы для непосредственного администрирования Kafka-кластера. За это отвечает потомок AdminClient, класс KafkaAdminClient, который является реализацией интерфейса Admin. Экземпляр KafkaAdminClient создается путем вызова одного из методов create() в AdminClient. Этот класс является потокобезопасным, однако, пользователи не должны напрямую ссылаться на него [4].

Чтобы использовать методы KafkaAdminClient, сперва нужно создать экземпляр класса AdminClient, например, следующим образом [1]:

Properties properties = new Properties();

properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, «localhost:9092»);

AdminClient adminClient = AdminClient.create(properties);

adminClient.close(Duration.ofSeconds(30));

Здесь статический метод create() принимает аргумент объекта Properties с конфигурацией, где задан URI-адрес bootstrap-сервера кластера Kafka. После этого можно выполнить операции администрирования, например, посмотреть список топиков с помощью метода listTopics​(ListTopicsOptions options):

ListTopicsResult topics = adminClient.listTopics();

topics.names().get().forEach(System.out::println);

При этом метод admin.listTopics() возвращает объект ListTopicsResult, который является оберткой над коллекцией Futures. topics.name(), возвращающей будущий набор имен топиков. Вызывая метод get() для этого future-объекта, выполняющийся поток будет ждать от сервера набора имен топиков или выдаст исключение по тайм-ауту. Полученный список итерируется, чтобы показать все названия топиков.

Удалить Kafka-топик с помощью API администратора поможет команда admin.deleteTopics(TOPIC_LIST).all().get(). Метод deleteTopics вызывается со списком имен топиков, которые нужно удалить, с использованием get(), чтобы дождаться его завершения. Важно, что в Apache Kafka удаление топиков является окончательным и безвозвратным: понятие «корзины» отсутствует, как и проверки наличия данных в топике. Поэтому вызывать метод удаления нужно очень осторожно.

В заключение отметим, что при закрытии AdminClient, когда вызывается метод close(), чтобы освободить все связанные ресурсы, некоторые операции все еще могут выполняться. Поэтому close() принимает параметр тайм-аута: после вызова этого метода клиент будет ждать ответов, пока не истечет время ожидания. По истечении тайм-аута клиент прервет все текущие операции с исключением тайм-аута и освободит все ресурсы. Вызов close() без тайм-аута означает отсутствие тайм-аута, что предполагает нелимитированное ожидание в течение такого количества времени, сколько нужно для завершения всех текущих операций. Это стать причиной «зависания» системы.

Несмотря на подобные казусы, AdminClient полезен, когда требуется выполнять некоторые административные команды из клиентского приложения без использования инструментов CLI и GUI для управления Kafka. Наиболее распространенным кейсом является создание новых топиков по запросу на основе пользовательского ввода или данных, например, в приложениях интернета вещей (Internet Of Things, IoT). IoT-приложения получает события от пользовательских устройств и записывает их в разные топики Kafka, например, в зависимости от типа устройства. При выпуске устройство нового типа, с которым ваше приложение еще не знакомо, оно может динамически создать новый топик Kafka.

Развитие AdminClient не стоит на месте: по мере развития самой Apache Kafka, в этот клиент добавляются новые методы. В частности, с версии 2.3 добавлена возможность определять, какие операции пользователям разрешено выполнять с топиками. А в текущем релизе 2.8 добавлен API для описания кластера, который отделяет AdminClient от API метаданных, позволяя администратору Kafka напрямую запрашивать у брокеров информацию (идентификаторы кластера и контроллера, сведения о каждом брокере и разрешенные операции) без ущерба для производителя и потребителя. В будущем Describe Cluster API позволит запросить больше метаданных о кластере: статус каждого брокера, их версия, эпохи и пр., что особенно важно в перспективе отказа от Zookeeper. Подробнее о других новинках Apache Kafka 2.8 без Zookeeper читайте здесь. Также Admin Client поддерживается в проекте KafkaJS, о котором мы рассказываем в новой статье.

Больше практических деталей про администрирование и применение Apache Kafka для разработки распределенных приложений потоковой аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://blog.clairvoyantsoft.com/managing-apache-kafka-programmatically-ee5423aa0730
  2. https://kafka.apache.org/28/javadoc/org/apache/kafka/clients/admin/Admin.html
  3. https://kafka.apache.org/28/javadoc/org/apache/kafka/clients/admin/AdminClient.html
  4. https://kafka.apache.org/28/javadoc/org/apache/kafka/clients/admin/KafkaAdminClient.html
Поиск по сайту