AVRO и JSON В Apache Kafka: краткий ликбез по реестру схем

Автор Категория ,
AVRO и JSON В Apache Kafka: краткий ликбез по реестру схем

Apache AVRO не случайно считается очень востребованным форматом и популярной системой сериализации данных, который активно в Kafka. Сегодня рассмотрим, как сериализуются данные в AVRO, каким образом это связано со структурами JSON и при чем здесь реестр схем Confluent.

Еще раз про AVRO и сериализацию данных

Apache Kafka часто используется в качестве инструмента потоковой передачи данных от одного микросервиса. Kafka принимает от приложения-продюсера в качестве входных данных байты и публикует их в топик для потребителя, что занимает много времени в случае множества записей. При этом на стороне потребителя не всегда выполняется проверка формата и структуры данных, полученных через сериализатор Kafka.

Важную роль в производительности этой системы играет сериализация данных, отправляемых в топики. Поскольку двоичные данные хранятся на жестком диске более эффективно, чем необработанный текст, не удивительно, что бинарный формат AVRO предпочтительнее, чем JSON-файлы. А по сравнению с Google Protobuf, в AVRO можно указать, какие поля являются обязательными, т.е. должны быть заполнены. Кроме того, этот формат сериализации не зависит от языка программирования и поддерживает JSON-структуры и эволюцию схемы данных, когда добавляются новые или удаляются существующие поля. В частности, именно поэтому платформа онлайн-обучения Udemy выбрала Apache AVRO вместо Google Protobuf в своей системе мониторинга событий пользовательского поведения, о чем мы недавно писали в этой статье.

Отправка данных в виде байтов приводит к проблемам их проверки и задержке, справиться с которыми можно с помощью реестра схем Confluent, о котором мы рассказывали здесь. Schema Registry обеспечивает нужный уровень обслуживания для метаданных, предоставляет Restful-API для хранения и получения схем AVRO, а также поддерживает эволюцию схемы, храня историю всех версий. Еще реестр схем предоставляет несколько параметров совместимости, позволяя изменять схемы в соответствии с этими параметрами. Наконец, Schema Registry предоставляет сериализаторы, которые подключаются к клиентам Kafka, обрабатывают хранение и извлечение схемы для сообщений в формате AVRO.

Таким образом, можно сказать, что реестр схем предоставляет потребителям и продюсерам Kafka способ управления AVRO-схемами для передачи данных. А формат AVRO обеспечивает миграцию схемы данных для систем, построенных по микросервисной архитектуре, полностью определяя структуру, тип и значение данных.

Kafka AVRO примеры описание обучение курсы
Как Apache Kafka работает с AVRO

Рассмотрим пример *.avsc-файла AVRO-схемы, которая описывает структуру данных на понятном человеку языке JSON (JavaScript Object Notation) в виде текста с парами «ключ/значение». Предположим, необходимо хранить данные о студентах: идентификатор, имя, адрес и телефон. Тогда AVRO-схема данных для таких записей будет храниться в файле с расширением *.avsc и выглядит следующим образом:

{
“type”: “record”,

“name”: “studentInfo”,

“namespace”: “com.domain.avro”,

“fields”: [

{

“name”: “studentId”,

“type”: “long”

},

{

“name”: “studentName”,

“type”: “string”

},

{

“name”: “studentAddress”,

“type”: [“null”, “string”]

},

{

“name”: “studentTel”,

“type”: [“null”, “string”]

}

]

}

Для генерации схем AVRO можно воспользоваться плагином Maven, парсером или методами API.

Apache Spark и Kafka: читаем данные в AVRO и JSON

Рассмотрим пример, когда за потоковую обработку данных, хранящихся в топиках Apache Kafka, отвечает Spark-приложение. Spark считывает данные из этой распределенной платформы потоковой передачи событий в датафрейм:

Dataset<Row> kafka_df = spark.read().format(“kafka”).option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”).option(“subscribe”, “topic1,topic2”).option(“startingOffsets”, “{\”topic1\”:{\”0\”:23,\”1\”:-2},\”topic2\”:{\”0\”:-2}}”).option(“endingOffsets”, “{\”topic1\”:{\”0\”:50,\”1\”:-1},\”topic2\”:{\”0\”:-1}}”).load();

Этот датафрейм состоит из семи столбцов, которые определяют атрибуты каждого сообщения, полученного из Kafka:

  • Ключ – key;
  • Значение – vаlue;
  • Топик – tорiс;
  • Раздел – раrtitiоn;
  • Смещение – оffset;
  • Метка времени – timestаmр;
  • Тип метки времени – timestаmрTyрe.

Столбцы Key и Vаlue используются для извлечения смысла сообщения, наиболее ценная колонка содержит данные, которые можно расширить в датафрейм. Причем сами данные могут быть представлены в Kafka в нескольких форматах.

Например, в виде JSON, если схема данных имеется:

StructType json_schema=”schema of the dataframe

Dataset<Row> input = kafka_df.withColumn(“data”,

functions.from_json(kafka_df.col(“value”), schema)).select(“data.*”);

Этот метод подходит, когда схема данных определена и не меняется, что редко бывает в реальном мире. При отсутствии схемы, если столбец vаlue содержит строки JSON, их можно преобразовать для дальнейшей обработки:

Dataset<Row>  input = sparkSession.read().json(kafka_df.selectExpr(“CAST(value AS STRING)

as value”).map(Row::mkString, Encoders.STRING()))

При этом двоичный столбец vаlue преобразуется в строку в формате JSON, а функция MAP помогает преобразовать набор данных <Rоw> в набор данных <String>. Чтение этого набора данных, содержащего строки JSON с использованием метода АРI reаd().jsоn() поможет создать исходный набор данных для дальнейшей обработки. Также это можно выполнить с помощью Javа RDD:

JavaRDD<String> store = kafka_df.selectExpr(“CAST(value AS STRING) as value”).toJavaRDD().map(x->x.mkString());
Dataset<Row> input = spark.read().json(store);

Эти методы пригодятся в сценариях изменения схемы, когда ее описание отсутствует.

Для изменения данных в AVRO-формате необходимо, чтобы схема была представлена в форме строки JSON. К примеру, данные опубликованы в Kafka, столбец Vаlue отмечен как обязательный.

output.selectExpr(“to_json(struct(*)) AS

value”).write().format(“kafka”).option(“kafka.bootstrap.servers”,

“host:port”).option(“topic”, “topic_name”).save();

Если данные хранятся сразу в AVRO:

output.select(package$.MODULE$.to_avro(struct(“*”)).as(“value”)).write().format(“kafka”).option(“kafka.bootstrap.servers”, “localhost:9093”).option(“topic”,”test_avro”).save();

Идея в обоих случаях в том, чтобы создать поле value как столбец с типом «структура» с вложенными столбцами, и записать его в папку, откуда его можно извлечь и использовать для восстановления датафрейма.

Apache AVRO
Эволюция схемы данных Apache AVRO

Описанные методы могут быть использованы для чтения и записи данных в Apache Kafka.

Узнайте больше про администрирование и эксплуатацию Apache Kafka для эффективной аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://vidhitakher.medium.com/how-to-use-avro-schema-for-serialization-with-kafka-dc319cfa484d
  2. https://docs.confluent.io/platform/current/schema-registry/index.html
  3. https://sidgarg-exp.medium.com/how-to-read-the-kafka-stream-data-of-json-and-avro-type-and-write-it-to-another-kafka-stream-49797524d15
  4. http://avro.apache.org/docs/1.7.6/index.html