Тонкости потоковой передачи данных в BigQuery из Apache Kafka и Spark: 5 неочевидных особенностей

Автор Категория , ,
Тонкости потоковой передачи данных в BigQuery из Apache Kafka и Spark: 5 неочевидных особенностей

В рамках курсов для дата-инженеров и разработчиков распределенных приложений, сегодня рассмотрим пример построения системы потоковой передачи для аналитики больших данных на базе Apache Kafka, Spark и Google BigQuery. Читайте далее про Proof of Concept для конвейера продуктовой аналитики, который обрабатывает 50 миллиардов событий каждый день, и какие важные уроки ИТ-архитектор PayPal вынес из этого опыта.

Постановка задачи: зачем нужен PoC конвейера на Apache Kafka, Spark и Google BigQuery

Команда дата-инженеров PayPal управляет конвейером поведенческой и продуктовой аналитики больших данных, ежедневно обрабатывая более 50 миллиардов событий. После изменения структуры этого data pipeline’а было решено использовать Google BigQuery в качестве системы хранилища данных, поскольку оно обеспечивает быстрые SQL-запросы и интерактивный анализ Big Data [1]. Этот RESTful веб-сервис для интерактивного широкомасштабного анализа больших наборов данных, расположенных в облачном хранилище Google Storage. Благодаря модели инфраструктура как услуга, BigQuery можно использовать вместе с MapReduce, поддерживая пакетную загрузку, а также потоковую передачу.

BigQuery предоставляет внешний доступ к масштабируемой, интерактивной системе ad-hoc запросов Dremel для анализа данных, доступных только для чтения. Сперва данные следует загрузить в Google Storage, а затем импортировать с помощью BigQuery API HTTP. BigQuery требует аутентификации для всех запросов через механизмы подобные OAuth, позволяя предоставлять доступ к данным произвольным пользователям, группам и приложениям. BigQuery позволяет создавать и удалять таблицы на основе JSON-схемы, а также импортировать данные в формате CSV или JSON с Google Storage. Для запросов используется стандартный диалект SQL, а результат возвращается в формате JSON. BigQuery можно использовать в Google Apps Script как скрипт для Google Docs или на любом языке, поддерживающем REST API или клиентские библиотеки [2].

При пакетной загрузке исходные данные, например, из файла в формате CSV или Parquet, загружаются в таблицу BigQuery за одну пакетную операцию. Обычно так выполняются типовые ETL-задания. В случае потоковой передачи данные отправляются по одной записи за раз или объединенные в микропакеты на конечную точку потоковой передачи с помощью потоковых API, предоставляемых сервисами Google. Чтобы оценить возможности измененного конвейера обработки данных в условиях пиковой нагрузки, дата-инженеры решили сперва сделать доказательство концепции (Proof of Concept, PoC) перед тем, как принять решение о переходе на потоковую передачу. В качестве основного PoC-компонента была выбрана крупная таблица из 25 миллиардов событий. Данные находятся в локальной центре обработки, который соединяется с облаком Google.

Чтобы быстрее создать PoC, было решено использовать Spark Structured Streaming для получения событий из Kafka и потоковую запись в BigQuery. При этом использовался не готовый коннектор, поддерживающий чтение таблиц Google BigQuery в DataFrames Spark и запись DataFrames обратно в BigQuery с помощью API источника данных Spark SQL для связи с BigQuery [3]. А написан собственный коннектор, который записывает данные с помощью потоковых API в корзину Google Cloud Storage, а затем выполняет пакетную загрузку этих данных в BigQuery. Также для рассматриваемого PoC был написан Spark-приемник потоковой передачи BigQuery. С помощью настраиваемого Spark-потребителя было создано простое ETL-задание потоковой передачи, которое потребляет и записывает данные в post-преобразования BigQuery.

Для обработки 25 миллионов событий ежедневно PoC с пиковым трафиком 420 тысяч событий в секунду, что составляет примерно 1.25 Гб/сек, подключался к On-Premise облаку со скоростью соединения 20 Гб/сек.

конвейер Kafka Spark Streaming BigQuery, обучение Spark, курсы Spark, обучение Kafka, курсы Kafka, примеры аналитики больших данных с Kafka и Spark
Конвейер потоковой передачи на Apache Kafka, Spark и Google BigQuery

Весь конвейер с представленным PoC работает по следующему принципу [1]:

  • данные из множества разных устройств записываются в топики Apache Kafka;
  • приложение Spark Structured Streaming считывает данные из топиков Kafka;
  • коннектор Spark-BigQuery отправляет данные в таблицы облачного Google хранилища;
  • данные из таблиц BigQuery представляются на веб-дэшбордах для наглядной BI-аналитики.

За период тестовой эксплуатации созданной системы ее разработчики вынесли полезные выводы об особенностях потоковой передачи в BigQuery, которые мы рассмотрим далее.

Пакетирование

API потоковой передачи BigQuery позволяет объединить строки в один запрос и вызвать конечную точку. Здесь есть зависимость между размером пакета и задержкой. Например, в рассматриваемом PoC для пакета из 500 записей размером 1,25 МБ задержка составила около 450-650 мс, а для пакета размером в 2 раза больше (1000 записей), задержка выросла до 1100-1500 мс. Поэтому в качестве приемлемого варианта был выбран размер пакета в 500 записей.

Предел квоты на вставку потоковой передачи

Максимальная квота потоковой передачи составляет 1 ГБ в секунду для каждого проекта Google Cloud Platform по молчанию. При превышении этого предела возникает исключение BigQueryException с ошибкой quotaExceeded. Именно это случалось при тестировании PoC, когда входящий трафик составил около 60% реального объема.

Возникла потребность увеличить квоту на уровне проекта, создав билет в Google и внедрив логику повторной обработки запроса при каждой ошибке, чтобы избежать потери неудачной записи в итоговой таблице. Отслеживать использование квоты можно с помощью следующего BigQuery SQL-запроса, чтобы корректировать ее до того достижения предела:

SELECT

start_timestamp,

SUM(total_requests) AS total_requests,

SUM(total_rows) AS total_rows,

SUM(total_input_bytes) AS total_input_bytes,

SUM(IF(error_code IN (“QUOTA_EXCEEDED”, “RATE_LIMIT_EXCEEDED”), total_requests, 0)) AS quota_error,

SUM(IF(error_code IN (“INVALID_VALUE”, “NOT_FOUND”, “SCHEMA_INCOMPATIBLE”, “BILLING_NOT_ENABLED”, “ACCESS_DENIED”, “CUNAUTHENTICATED”), total_requests, 0)) AS user_error,

SUM(IF(error_code IN (“CONNECTION_ERROR”,“INTERNAL_ERROR”), total_requests, 0)) AS server_error, SUM(IF(error_code IS NULL, 0, total_requests)) AS total_error,

FROM `region-us`.INFORMATION_SCHEMA.STREAMING_TIMELINE_BY_PROJECT

GROUP BY start_timestamp

ORDER BY 1 DESC

Квота рассчитывается не на основе фактических данных потоковой передачи в BigQuery, а на базе полезных данных в формате JSON и метаданных по количеству байтов, полученных в конечной точке RESTful API. Так дополнительные теги из JSON также учитываются при текущем расчете квоты.

Нет повторам: включение дедубликации

При тестировании PoC было обнаружено, что во время потоковой передачи данные дублируются, и в итоговой таблице получаются повторяющиеся записи. BigQuery позволяет исключить дубли, создавая объект «RowToInsert»  с включенной дедупликацией:

RowToInsert row = InsertAllRequest.RowToInsert.of(uniqueInsertId,content);

Без включения дедупликации RowToInsert row = InsertAllRequest.RowToInsert.of(content), на каждые 500 000 загруженных записей возникал 1 дубль. При включении дедупликации, дубль случался на каждые 5 миллионов загруженных записей. Таким образом, включение дедупликации не гарантирует 100%-ное удаление дубликатов, но намного снижает вероятность их возникновения. BigQuery использует уникальный идентификатор uniqueInsertId в каждой записи для выявления и удаления дублей в течение последних 10 минут.

Кэширование при потоковой передаче таблицы

BigQuery обеспечивает базовое кэширование для пакетной загрузки таблиц. При запуске повторяющиегося запроса BigQuery пытается повторно использовать кешированные результаты. Но, если данные загружаются в таблицу через потоковую передачу, BigQuery не кэширует их.

Задержка потоковой передачи в BigQuery

При тестировании описанного PoC было отмечено более высокую задержку при запросе таблицы потоковой передачи по сравнению с таблицей пакетной загрузки, даже при меньшем объеме данных. Это случается по причине сканирования буфера потоковой передачи, где изначально хранятся загруженные записи, еще не отправленные в постоянное колоночное хранилище BigQuery Capacitor, которое оптимизировано для операций с интенсивным чтением. А буфер BigQuery Streaming оптимизирован для записи большого объема данных. Это и является причиной задержки, которая может составить 15–30 секунд в зависимости от объема запрашиваемых данных. Сперва буферу потоковой передачи требовалось 10–90 минут для отправки данных в постоянное колоночное хранилище. Сегодня во 2-ой версии серверной части BigQuery это время сокращено до 2–3 минут.

Завтра мы продолжим разбирать кейсы использования BigQuery с Apache Spark и рассмотрим пример цифровой трансформации автомобильной компании Renault с сервисами Google Cloud Platform. А детально узнать все технические подробности эксплуатации Apache Spark и Kafka для разработки распределенных приложений и аналитики больших данных вам помогут специализированные курсы в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://aride.medium.com/learnings-from-streaming-25-billion-events-to-google-bigquery-57ce81fa9898
  2. https://ru.wikipedia.org/wiki/BigQuery
  3. https://github.com/GoogleCloudDataproc/spark-bigquery-connector