3 проблемы движка интеграции ClickHouse с Kafka и способы их решения

Big Data, Большие данные, обработка данных, архитектура, SQL, Greenplum, Arenadata, Kafka, ClickHouse, Docker

Вчера мы рассматривали интеграцию ClickHouse с Apache Kafka с помощью встроенного движка. Сегодня поговорим про проблемы, которые могут возникнуть при его практическом использовании и разберем способы их решения для корректной связи этих Big Data систем.

Почему случаются тайм-ауты: многопоточность и безопасность

Напомним, интеграцию ClickHouse и Kafka обеспечивает встроенный движок (engine), который позволяет публиковать потоки данных и подписываться на них, организовать отказоустойчивое хранилище и обрабатывать потоки по мере их появления через использование таблиц с указанием специальных параметров [1].

На практике при подключении ClickHouse к Kafka с помощью этого движка может возникнуть проблема, связанная с криптографическим протоколом SSL, который обеспечивает защищенное соединение. Из-за того, что данный протокол не входит в настройки подключения, которые заданы по умолчанию, библиотека librdkafka теряет связь с брокером Кафка (Local: timed out). Аналогичный случай возникает с пакетом kafkacat, который также использует эту библиотеку. Проблема решается изменением параметров подключения: в конфигурационный файл /etc/clickhouse-server/config.xml следует добавить строку с указанием защищенного протокола SSL: <kafka><security_protocol>ssl</security_protocol></kafka> [2].

Примечательно, что ранее исходящие SSL-соединения вообще не работали в Docker-образе ClickHouse из-за отсутствия сертификатов. Эта проблема была исправлена в 19.16.19.85, 20.1.12, 20.3.9 и более поздних релизах. При этом движок Kafka в Кликхаус поддерживает аутентификацию на уровне сервера, что указывается в конфигурационном файле config.xml [3].

Вообще стоит отметить, что при наличии в ClickHouse большого числа таблиц типа «движок Kafka» система генерирует тайм-ауты из-за того, что библиотека librdkafka создает один поток для каждого брокера и служебных данных. Много потоков в ClickHouse приводит к большому количеству переключений контекста, в результате чего возникает ошибка Local: timed out. Сократить число таблиц Kafka, использующих разные топики, можно с помощью материализованных представлений, о которых мы рассказывали здесь. При этом несколько материализованных представлений могут фильтровать данные по виртуальному столбцу «_topic». Также стоит учитывать, что таблица Kafka использует поток из пула потоков «background_schedule«. Если их слишком много, имеет смысл увеличить параметр background_schedule_pool_size, который задает количество потоков для выполнения фоновых операций в движках таблиц [3].

Параллельная обработка топиков Кафка

В случае Big Data системы на базе распределенного кластера Кликхаус следует создать таблицу движка Kafka на каждом сервере. При интеграции с Кафка нужно, что все таблицы движков Kafka использовали одно и то же имя группы потребителей (обязательный параметр kafka_group_name), чтобы параллельно использовать один и тот же топик. Если количество потребителей (серверов ClickHouse с таблицами движков Kafka) больше числа разделов (партиций) топика, некоторые потребители не смогут ничего считать. Эта проблема решается корректным заданием партиций топика Кафка с учетом числа серверов ClickHouse. Также можно сбросить данные из таблицы движка Kafka в распределенную таблицу с определенным ключом шардинга. Распределенные таблицы ClickHouse будут повторять вставки одного и того же блока, и могут быть дедуплицированы – записаны лишь 1 раз при многократной записи [3].

Проблема искаженных сообщений из Kafka: форматы и виртуальные столбцы ClickHouse

Иногда корректность переноса данных из топиков Кафка в Кликхаус зависит от формата сообщений. Напомним, ClickHouse может принимать (INSERT) и отдавать (SELECT) данные в различных форматах: Parquet, JSON, CSV, ORC, XML и еще 20 видов [4].

При заливке данных в ClickHouse из Kafka формат сообщений задается при создании таблицы «движок Кафка». Его следует точно указать в обязательном параметре kafka_format. При том в опциональных параметрах можно определить максимальное количество некорректных сообщений в блоке (kafka_skip_broken_messages). По умолчанию kafka_skip_broken_messages = 0, в случае положительного целочисленного значения движок отбросит столько сообщений Кафки, которые не получилось обработать. Одно сообщение Kafka в точности соответствует одной записи (строке) ClickHouse. Также в опциональных параметрах можно задать символ-разделитель записей (строк), которым завершается сообщение (kafka_row_delimiter) и определение схемы для тех форматов, где она необходима (kafka_schema). Например, формат бинарных сообщений Capn Proto требует путь к файлу со схемой и название корневого объекта schema.capnp:Message  [1].

Важно, что параметр kafka_skip_broken_messages не работает для всех форматов одинаково. Например, если одно сообщение содержит несколько строк, одна из которых искажена, движок пропустит именно искаженную строку, а не все строки в самом сообщении, как можно подумать из-за названия параметра. В JSON, TSV и подобных текстовых форматах можно анализировать все поля как строки, преобразуя их к соответствующим типам на уровне материализованного представления. Это позволяет ClickHouse помещать искаженные сообщения в отдельный поток со специальными виртуальными столбцами, такими как _error или _raw_message. Затем для таких данных можно использовать материализованное представление, чтобы отфильтровать или сохранить их отдельно [3].

Здесь стоит уточнить, что в Кликхаус виртуальный столбец является неотъемлемой частью движка таблиц, определенный в его исходном коде. Виртуальные столбцы не указываются в запросе создания таблицы (CREATE TABLE) и не отображаются в результатах запросов на отображение данных (SHOW CREATE TABLE и DESCRIBE TABLE). Виртуальные столбцы доступны только для чтения, в них нельзя записать данные. Имя виртуального столбца не должно совпадать с именем «реального» столбца в таблице Кликхаус, иначе виртуальный столбец станет недоступным. Чтобы избежать этого, имена виртуальных столбцов обычно пишутся с нижним подчеркиванием впереди [5].

В следующей статье мы рассмотрим альтернативные варианты для организации загрузки потоковых данных из Apache Kafka в ClickHouse и разберем, почему крупный сервис такси «Ситимобил» подключил к этим Big Data системам еще одну – реляционную аналитическую In-Memory MPP-СУБД Exasol. А как самостоятельно обеспечить эффективную интеграцию Apache Kafka с другими внешними источниками для потоковой обработки больших данных вы узнаете на практических курсах по Кафка в нашем лицензированном учебном центре повышения квалификации и обучения руководителей и ИТ-специалистов (разработчиков, архитекторов, инженеров и аналитиков Big Data) в Москве:

 

Источники

  1. https://clickhouse.tech/docs/ru/engines/table-engines/integrations/kafka/#kafka
  2. http://leftjoin.ru/all/clickhouse-as-a-consumer-to-amazon-msk/
  3. https://www.altinity.com/blog/clickhouse-kafka-engine-faq
  4. https://clickhouse.tech/docs/ru/interfaces/formats/
  5. https://clickhouse.tech/docs/ru/engines/table-engines/#table_engines-virtual_columns