5 советов по совместному использованию Apache Spark и PostgreSQL

Автор Категория ,
5 советов по совместному использованию Apache Spark и PostgreSQL

В этой статье по обучению дата-инженеров и разработчиков Big Data рассмотрим, как эффективно записать большие данные в СУБД PostgreSQL с применением Apache Spark. Читайте далее, чем отличается foreach() от foreachBatch(), как это связано с количеством подключений к БД, асимметрией разделов и семантикой доставки сообщений.

Как Spark-приложение записывает данные в PostgreSQL

Если в вашем конвейере данных Spark-приложение отправляет записи в объектно-реляционную базу PostgreSQL, стоит помнить о следующих особенностях этого Big Data фреймворка [1]:

  • пакетный или потоковый режим работы;
  • количество разделов Spark;
  • асимметрия или перекос (Skew) данных в разделах Spark;
  • применение метода foreach() или foreachBatch() в потоковой передаче;
  • SQL-оператор для записи данных в БД.

Каждый из этих аспектов мы подробнее рассмотрим далее.

Пакеты или потоки Big Data

Пакетный или потоковый режим работы конвейера данных определяет выбор вычислительного примитива и соответствующей библиотеки – Spark Streaming или Structured Streaming, об отличиях которых мы писали здесь. В случае Spark Streaming обработка данных идет в виде дискретизированного потока (DStream на основе RDD) из микро-пакетов данных, а Structured Streaming оперирует с высокоуровневыми и оптимизированными API Spark SQL DataFrame и DataSet практически в реальном времени. Поэтому именно Structured Streaming отлично пойдет, если нужно, например, прочитать 10 миллионов строк и записать их в СУБД за 10–15 секунд.

Как количество разделов Spark влияет на число подключений к БД

В Spark Streaming можно использовать встроенный метод записи в СУБД, который отсутствует в Structured Streaming. Вместо него есть приемники foreach() и foreachbatch(), у которых число соединений для подключения к БД зависит от количества разделов. По умолчание количество одновременных подключений в Postgres ограничено значением 100. Поэтому выбор разделов определяется тем, сколько Spark-заданий и приложений одновременно пытаются получить доступ к БД. Увеличение этого параметра снижает производительность базы данных и общую скорость всего Big Data конвейера=, поскольку каждое соединение PostgreSQL потребляет оперативную память для управления соединением или клиентом, использующим его. В случае большого количества подключений рекомендуется использовать pg_bouncer или драйвер PostgreSQL, чтобы объединить их и сократить потребление [2].

Перекосы Spark и запись данных в СУБД

Асимметрия или перекос (Skew) данных в разделах Spark происходит из-за их неравномерного распределения и зависит от источника. Исправить это можно, явно указав количество нужных разделов Spark с помощью метода dataframe.repartition(), который перетасует данные между ними и распределит их поровну. Это займет несколько секунд в зависимости от объема данных. Эта проблема отсутствует, если источником данных является Kafka, где есть возможность равномерно распределять данные между разделами топика. В этом случае нужно просто поддерживать одни и те же разделы при записи в БД.

Foreach() vs foreachBatch()

Оба этих метода позволяют применять произвольные операции и записывать логику к выходным данным потокового запроса Spark. Однако, у них разные варианты использования: foreach() позволяет настраивать логику записи для каждой строки, а foreachBatch() допускает произвольные операции и настраиваемую логику на выходе каждого микро-пакета потокового запроса.

С foreachBatch() можно делать следующее [3]:

  • повторно использовать существующие источники пакетных данных;
  • обеспечить запись в несколько расположений, отправив выходной DataFrame или Dataset несколько раз. Чтобы избежать повторных вычислений в случае сбоя, рекомендуется кэшировать выходные данные, а затем извлекать их из кэша.
  • Выполнять дополнительные операции DataFrame, которые не поддерживаются в Spark Streaming, с самостоятельной разработкой сквозной семантики их выполнения. По умолчанию foreachBatch обеспечивает только гарантию записи хотя бы один раз (at-least-once). Предотвратить дублирование данных позволит идентификатор микро-пакета (batchId), проверка которого имитирует строго-однократную доставку сообщений (exactly once).

Однако, поскольку foreachBatch() работает с микро-пакетами, он не подходит для непрерывной обработки. В этом случае следует использовать foreach(), который работает следующим образом [3]:

  • логика записи данных реализуется с помощью методов open(), process() и close();
  • при запуске потокового запроса Spark вызывает функцию или методы объекта так, что одна его копия отвечает за все данные, генерируемые одной задачей в запросе, и один экземпляр отвечает за обработку одного раздела распределенных данных.
  • объект должен быть сериализуемым, потому что каждая задача получит новую сериализованную-десериализованную копию предоставленного объекта;
  • любую инициализацию записи данных, например, открытие соединения или запуск транзакции, рекомендуется выполнять после вызова метода open(), чтобы задача была готова к генерации данных;
  • в каждом разделе (с partition_id) для каждого пакета или эпохи потоковой передачи данных (epoch_id) вызывается метод open(partitionId, epochId). Если он возвращает значение true, то для каждой строки в разделе и пакете/эпохе вызывается метод process(row);
  • метод close(error) вызывается с ошибкой при обработке строк и возвращается успешно независимо от возвращаемого значения, кроме случаев, сбоя процесса JVM или Python;

Spark не гарантирует одинаковый вывод для (partitionId, epochId), поэтому так нельзя предотвратить дублирование данных, например, если источник предоставляет различное количество разделов или их изменяет оптимизация Spark. Поэтому при строгих требованиях к отсутствию дублирования в результатах следует использовать foreachBatch().

SQL-оператор для записи данных в СУБД

Чаще всего для записи данных в БД используется оператор INSERT. Например, встроенный метод DStream работает именно так. Однако, в случае огромного числа записей применяется оператор COPY, который записывает данные быстрее. В Spark-приложении для отправки записей в Postgres пригодится пакета COPY Manager. Он подходит как для приемника foreach(), так и для foreachbatch() и выполняет следующие действия [1]:

  • считывание DataFrame;
  • выполнение требуемых преобразований;
  • для последнего датафрейма с помощью foreach() метод open() открывает соединение с БД и инициализирует необходимую переменную, метод process() выполняет любое преобразование на уровне строки и записывает его в построитель строк, обеспечивая единый поток ввода вместо построчной записи, а метод close() записывает построитель строк в БД и закрывает соединение.
  • При использовании приемника foreachbatch() для заключительного датафрейма каждый микропакет разбивается на разделы, откуда строки добавляются строки в построитель строк, который записывается в БД.

Больше подробностей по разработке потоковых и пакетных приложений аналитики больших данных с Apache Spark вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

 

 

Источники

  1. https://nowshad-shaik.medium.com/tips-for-writing-to-postgres-database-using-spark-ac96be931689
  2. https://help.compose.com/docs/postgresql-connection-limits
  3. https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html