Модульное тестирование Kafka-приложений

Автор Категория ,
Модульное тестирование Kafka-приложений

Чтобы сделать наши курсы по Apache Kafka еще полезнее, сегодня разберем, как тестировать распределенные приложения на базе этой платформы потоковой обработки событий. Краткий ликбез для разработчика Kafka Streams и дата-инженера: классы, методы и приемы модульных тестов с примерами.

Ликбез по модульному тестированию: что такое mock-объекты

Про виды тестирования мы уже писали здесь. Напомним, именно модульное тестирование (Unit Testing) лежит в основе других видов проверки качества работы ПО, когда тестируются отдельные модули или компоненты, чтобы проверить, что каждый из них работает должным образом. Обычно модульное тестирование выполняется разработчиками, к примеру, в случае V-образной модели разработки это является основной концепцией самого процесса. Модульные тесты изолируют часть кода и проверяют его работоспособность. Модульное тестирование предшествует интеграционному и чаще всего проводится по принципу т.н. «белого ящика», поскольку известны не только входы и выходы проверяемого компонента, но и его внутреннее устройство. Отсутствие модульного тестирования при разработке кода значительно увеличивает уровень дефектов при дальнейшем продвижении по уровням тестовой пирамиды, т.е. интеграционном, системном, и приемочном тестировании. Поэтому, хотя на первый взгляд, unit-тесты и увеличивают срок разработки кода, они в итоге экономят время и деньги за счет раннего выявления дефектов.

тестирование ПО, основы тестирования ПО
Пирамида тестирования: виды тестов

В тестировании активно используются так называемые mock-объекты для симуляции поведения реальных объектов. Они заменяют реальные объект в условиях теста и позволяют проверять вызовы своих членов как часть системы в рамках модульного тестирования. Обычно mock-объекты содержат заранее запрограммированные ожидания вызовов, которые они ожидают получить, и применяются в основном для проверки поведения, т.е. интерактивного взаимодействия приложения с пользователем или внешней системой.

Mock-объект не просто возвращает предустановленные данные, но еще и записывает все вызовы, которые проходят через него, чтобы в unit-тесте проверить, что конкретные методы отдельных классов работают должным образом. Например, в модульном тестировании проверяется состояние объекта после прохождения unit-теста (state-based testing) и взаимодействие между объектами, поведение тестируемого метода, последовательность вызовов методов и их параметры и т.д. (interaction/behavioral testing).

Тестирование потребителя Apache Kafka

Для модульного тестирования простого потребителя Apache Kafka можно использовать MockConsumer, предоставляемый библиотекой org.apache.kafka:kafka-clients:X.X.X. Для этого при разработке приложения или рефакторинге необходимо сделать классы, использующие или обертывающие экземпляр KafkaConsumer, тестируемыми. Это означает, что Consumer<K, V> будет передан через вызов фабричного метода, непосредственно в конструкторе или внедрен как bean-компонент.

Чтобы создать экземпляр MockConsumer, нужно просто сопоставить тип ключа и значение записей, сообщив, с чего начать чтение:

val mockConsumer = MockConsumer<String, String>(OffsetResetStrategy.LATEST)

Как KafkaConsumer, так и MockConsumer реализуют интерфейс Consumer<K,V>, поэтому при передаче его в тестируемый класс, он будет взаимодействовать с ним как настоящий потребитель. Но здесь есть дополнительные методы, которые можно использовать для подготовки условий тестов:

  • addRecord() к mock-объекту, чтобы по существу подготовить записи для чтения перед началом теста;
  • schedulePollTask(), где можно передать любой Runnable для выполнения в последующей отправке;
  • setPollException() для проверки поведения потребителя при исключении.

Поскольку это предназначено только для модульного тестирования, здесь можно проверить, способен ли реальный потребитель:

  • десериализовать записи, которые он должен читать;
  • обрабатывать записи, таким образом, как это ожидается, включая фильтрацию;
  • обрабатывать ошибки при десериализации или обработке, а также при подключении к Kafka, включая подписку на топик, считывание сообщений и фиксацию смещаний.

При этом если потребитель манипулирует смещениями нестандартным способом, можно использовать один и тот же экземпляр между всеми модульными тестами, но с помощью методов updateBeginningOffsets() и updateEndOffsets().

Тестирование продюсера

Библиотека kafka-clients также включает класс MockProducer, который реализует тот же интерфейс Producer<K, V>, что и KafkaProducer. Таким образом, подобно тестированию потребителя, классы приложения должны быть разработаны так, чтобы пройти проверку как mock-объект.

Чтобы создать экземпляр MockProducer, следует сопоставить тип ключа и значение записей и «сказать» ему, следует ли автоматически успешно завершить запросы отправки (autocomplete=true) или вы хотите выполнить их явно, вызвав completeNext() или errorNext(). Например,

val mockProduer = MockProducer(true, StringSerializer(), StringSerializer())

Чтобы протестировать нормальный поток работы продюсера, нужно выстроить его работу, используя автозаполнение, а затем проверить, что было отправлено, используя метод history() в MockProducer. Это вернет список всех записей (ProducerRecord), отправленных с момента последнего вызова метода clear() на mock-объекте.

Чтобы проверит обработку исключений, можно установить для автозаполнения значение false и errorNext(), что вызовет любое исключение RuntimeException, которое случится при незавершенном вызове отправки.

Поскольку речь идет о модульном тестировании, можно проверить, способен ли продюсер:

  • сериализовать записи, которые ему нужны;
  • обрабатывать ошибки сериализации;
  • обрабатывать ошибки, связанные с подключением к Kafka, т.е. при вызове метода send();
  • правильно фильтровать записи, т.е. что количество фактически отправленных сообщений соответствует ожидаемому числу;
  • отправлять записи в ожидаемом формате, корректно обогащая или изменяя их формат.

 

Тестирование приложений Kafka Streams

Для Kafka Streams тестовые и рабочие классы разделены на отдельные библиотеки, поэтому нужно добавить зависимость org.apache.kafka:kafka-streams:X.X.X для использования потоков, а затем org.apache.kafka:kafka-streams-test- utils:X.X.X, чтобы использовать удобные тестовые классы.

Теперь вместо создания mock-объекта и передачи его тестируемому классу создается экземпляр TopologyTestDriver, куда передается топологию и свойства потокового приложения. Поэтому, чтобы сделать Kafka Streams-приложение пригодным для модульного тестирования, нужно создать свою топологию и передать ее тестовому драйверу:

val driver = TopologyTestDriver(myTopology, myProperties)

Когда есть экземпляр драйвера, следует явно создать все топики для вашей топологии, например:

val myInputTopic = driver.createInputTopic(
       inputTopicName,
       Serdes.String().serializer(), // key type
       Serdes.String().serializer()  // value type
)val myOutputTopic = driver.createOutputTopic(
       outputTopicName,
       Serdes.String().deserializer(), //  key type
       Serdes.String().deserializer()  //  value type
).... // create as many output topics as your topology hasval myDlqTopic = driver.createOutputTopic(
       dlqTopicName,
       Serdes.String().deserializer(), //  key type
       Serdes.String().deserializer()  //  value type
)

 

После настройки всех тестовых входных и выходных топиков (TestInputTopics и TestOutputTopics) можно приступать к тестированию:

myInputTopic.pipeInput(key, validValue)
...
assertTrue(myDlqTopic.isEmpty)
assertFalse(myOutputTopic.isEmpty)val actualRecord = myOutputTopic.readRecord()
assertEquals(expectedRecord, actualRecord, "Oh no, records don't match)

Можно также работать с несколькими входными значениями одновременно:

  • pipeValueList(List<V>) — если тестовый класс работает только со значениями;
  • pipeKeyValueList(List<KeyValue<K,V>> — если тестовый класс работает с ключами и значениями;
  • pipeRecordList(List<TestRecord<K,V>> — если тестовый класс использует только заголовки или временные метки;

Аналогично для вывода можно применять следующие методы:

  • readValuesToList() — если нужно проверить только значения вывода;
  • readKeyValuesToList() или readKeyValuesToMap() — если нужно проверить только ключ и значение вывода;
  • readRecordsToList() — если внужно проверить заголовки и временные метки вывода.

Эти модульные тесты проверяют, что приложение Kafka Streams может:

  • сериализовать и десериализовать необходимые записи;
  • обрабатывать исключения должным образом;
  • обрабатывать записи как положено, т.е. формат вывода совпадает с ожидаемым, а сам поток данных воспринимается как черный ящик с известными входами и выходами, но скрытым содержимым;
  • выполнять любую фильтрацию, как и ожидалось, т.е. фактическое количество выходных записей соответствует ожидаемому, независимо от того, сколько их было введено.

Потоковый процессор

При использовании потокового обработчика (процессора) пригодится MockProcessorContext для инициализации реализации этого интерфейса. С его помощью вы можете проверить, пересылаются ли записи в нужную тему и фиксируются ли смещения.

val mockContext = MockProcessorContext<String, String>()
val processor = MyProcessor() // implementing Processor
processor.init(mockContext)...processor.process(record)
val forwardedRecords = mockContext.forwareded()
assertEquals(1, forwardedRecords.size)
assertEquals(expectedRecord, forwardedRecords.map{it.record()}.first())
assertEqual(expectedTopic, forwardedRecords[0].childName().get())// if you have scheduled commit (or other action) manipulate time bymockContext.scheduledPunctuators()[0].punctuator.punctuate(time)// check if scheduled task is doneassertTrue(mockContext.committed())

Читайте в нашей следующей статье про интеграционное тестирование Kafka-приложений.

Узнайте больше про тестирование и разработку распределенных приложений с Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков больших данных в Москве:

Источники

  1. https://medium.com/codex/testing-kafka-applications-libraries-for-unit-and-integration-tests-732c0f13b915
  2. https://logrocon.ru/news/unit_testing
  3. https://gist.github.com/vertigra/696e9d92dc72070584e556e2169e850d