Ускоряем и масштабируем Apache Spark Structured Streaming: 2 проблемы строго однократной доставки и их решения

Автор Категория , ,
Ускоряем и масштабируем Apache Spark Structured Streaming: 2 проблемы строго однократной доставки и их решения

Вчера мы говорили про реализацию exactly once семантики доставки сообщений в Apache Spark Structured Streaming. Сегодня рассмотрим, что не так с размером компактных файлов для хранения контрольных точек потоковой передачи, какие параметры конфигурации Spark SQL отвечают за такое логирование и как ускорить микро-пакетную обработку больших данных и чтение результатов выполнения заданий в этом Big Data фреймворке.

 

Проблема увеличения размера компактных файлов в Apache Spark Structured Streaming

Напомним, чтобы гарантировать отсутствие дублей и потерь данных, Apache Spark Structured Streaming реализует строго однократную (exactly once) семантику доставки сообщений с помощью механизма контрольных точек (checkpoint) и commit-файлов для каждого микропакета.

Основная цель контрольных точек – обеспечить отказоустойчивость потоковых заданий. Благодаря метаданным, хранящимся в файлах контрольных точек, можно перезапустить обработку в случае сбоя бизнес-логики или технической ошибки. Контрольная точка – это физический каталог, отвечающий за хранение 4 типов данных:

  • source – файлы в этом каталоге содержат информацию о различных источниках, используемых в потоковом запросе. Например, для Apache Kafka исходный файл с контрольной точкой будет содержать карту между разделами топика и смещениями при первом выполнении запроса. Это значение неизменяемо и не меняется при выполнении запроса.
  • offsets – содержит файл с информацией о данных, которые будут обрабатываться при выполнении данного микропакета. Он генерируется перед физическим выполнением микропакета и представлен классом apache.spark.sql.execution.streaming.OffsetSeqLog. Последний элемент из файла смещения – это строка JSON с сопоставлением разделов и смещений для каждого источника данных, задействованного в запросе. Например, в случае источника Apache Kafka он будет хранить имя темы, номер раздела и начальное смещение для данного микропакета: {“raw_data”: {“0”: 246189}}.
  • commits (коммиты или логи фиксации) – файл-маркер с информацией о водяном знаке (watermark), который будет использоваться в следующем микропакете. Он представлен классом apache.spark.sql.execution.streaming.CommitLog, а метаданные – org.apache.spark.sql.execution.streaming.CommitMetadata. Файл коммитов содержит только одну запись с водяным знаком, примененным к следующему выполнению запроса: {“nextBatchWatermarkMs”: 1564634495145}.
  • state (состояние) – местоположение контрольной точки также отвечает за хранение состояния, созданного логикой обработки с отслеживанием состояния. Эти файлы хранятся в сжатом виде.

Данные с контрольными точками, то есть смещения, фиксации и источники, хранятся в виде обычного текста и начинаются с номера версии, например, «v1». Это нужно, чтобы избежать несогласованности, когда контрольная точка, созданная с помощью новой версии Apache Spark, может обрабатываться более старой версией платформы. Переопределение конфигурации также обеспечивает согласованность между выполнениями. К примеру, нужно выполнить одно и то же Spark-задание потоковой передачи, но с другим поставщиком состояний. Если не загружать состояние, можно просто запустить другой потоковый запрос с совершенно новым местоположением контрольной точки [1].

Apache Spark Structured Streaming checkpoints
Что хранят контрольные точки Apache Spark Structured Streaming

С целью оптимизации дискового пространства для долговременного хранения все вышерассмотренные файлы контрольных точек сжимаются и упаковываются с расширением .compact. В компактных файлах хранятся сведения обо всех файлах, обработанных и сгенерированных заданием Structured Streaming с даты начала. Поскольку обычно задания Spark Structured Streaming выполняются круглосуточно и без выходных, размер этих файлов со временем увеличивается, достигая 10 ГБ и более [2].

Справедливости ради, что стоит отметить, что Apache Spark всегда будет сохранять то количество файлов с контрольными точками, которое указано в конфигурации spark.sql.streaming.minBatchesToRetain. По умолчанию значение этого свойства равно 100. Этот параметр также определяет период повторной обработки данных. Например, если сохраняются только последние 10 записей, которые создаются каждую минуту, то не получится повторно обработать данные старше 10 минут, просто передавая данные о контрольных точках в запрос. Очистка контрольной точки – это операция физического удаления, поэтому информация будет потеряна на неопределенный срок [1].

Создание контрольных точек и сжатие commit-файлов выполняются в драйвере для каждых 10 пакетов (начиная с 0). Это единый процесс, который занимает до 10 минут в зависимости от размера компактного файла. Он в целом снижает производительность задания Spark Structured Streaming, поскольку во время работы драйвера над сжатием файла, все рабочие узлы простаивают, и задание впустую расходует ресурсы [2].

В большинстве production-систем все наборы данных сохраняются в HDFS – распределенной файловой системе Apache Hadoop. Проблема замедления скорости обработки компактных файлов может возникнуть примерно после нескольких месяцев с начала их накопления. Например, один из практических случаев показывает, что после непрерывного выполнения задания Spark Structured Streaming в течение 5–6 месяцев с 3000 КБ файлов каждые 3 минуты, время выполнения задания возрастает до 8–10 минут. При этом размер компактного файла увеличился до 6 ГБ, куда после каждых 10 пакетов добавлялось изменение последних 9 пакетов, а затем создавалась его новая копия. Это приводило к потере большого количества ресурсов и снижения пропускной способности задания.

В рассматриваемом примере компактные файлы JSON-формата в исходных и выходных коммитах содержали список обработанных файлов и сгенерированных файлов за 180 дней. Работа с таким количеством данных уменьшила пропускную способность потокового Spark-задания и приводила к сбоям на драйвере из-за нехватки памяти (OOM, Out Of Memory). О причинах возникновения этой ошибки в Spark-приложениях и способах ее решения мы писали здесь.

Для устранения этой ошибки в анализируемом кейсе был реализован следующий подход [3]:

  • плановое завершение заданий потоковой передачи Spark раз в неделю;
  • запуск Spark-задания, чтобы прочитать последний компактный JSON-файл для создания контрольных точек и коммитов;
  • фильтрация всех записей старых файлов на основе срока их хранения;
  • сохранение старого компактного файла для резервного копирования;
  • перемещение нового компактного файла меньшего размера в каталог контрольных точек Spark и каталог commit’ов с сохранением той же файловой структуры и имени файла;
  • при обновлении компактного файла метаданных стоит убедиться, что задание Spark Streaming не запущено;
  • очистка компактных файлов исходных и целевых метаданных.

Эта последовательность действий помогла уменьшить размер файла с 6 ГБ до 3 ГБ, что сократило на 40% время выполнения задания по сжатию пакетов и позволило распараллелить нагрузку на несколько задач. Таким образом, весь процесс уменьшения входных и выходных метаданных занял всего 10 минут, а общая производительность Spark-заданий в час увеличилась на 10%. Задание обрабатывало около 18–20 пакетов в час, после выполнения вышеотмеченных действий оно экономило 6–8 минут каждый час, что эквивалентно еще 2-м пакетам 2 или обработке дополнительных 6000 файлов [3]. Однако, такие нововведения не решили всех проблем с пропускной способностью Spark Structured Streaming, одну из которых мы рассмотрим далее.

 

Чтение метаданных и проблема замедления следующих заданий

Задание потоковой передачи Spark создает каталог _spark_metadata внутри своего выходного каталога. Он содержит информацию о коммитах для всех пакетов вывода потокового задания, и каждый 9-й файл представляет собой компактный файл с информацией обо всех файлах, созданных заданием Spark Streaming, с даты начала. Даже после вышеописанного сокращения размера компактных файлов с 6 до 3 ГБ, все последующие задания выполнялись с длительной паузой при чтении входных данных, снижая пропускная способность следующих этапов pipeline’а.

exactly once, Spark SQL, Apache Spark Structured Streaming
Механизм контрольных точек в Apache Spark Structured Streaming для строго однократной (exactly once) доставки данных

Согласно логам драйвера Spark, проблема была в том, что драйвер брал долгую паузу на чтение списка входных файлов из каталога _spark_metadata. Следующий драйвер Spark-задания содержит последний компактный файл со всеми новыми пакетами и обрабатывает этот список, чтобы извлечь и перечислить все необходимые входные файлы на основе значений разделов.

Например, драйвер Spark пытался обработать 3 ГБ данных как одну задачу в драйвере задания, что и было основной причиной замедления. Решить эту проблему можно с распределением чтения Spark-метаданных можно следующим образом [3]:

  • написать новую библиотеку для чтения _spark_metadata, используя ту же логику кода, чтобы перечислить последний компактный файл со всеми новыми файлами пакетной фиксации;
  • создать датафрейм из файлов JSON;
  • отфильтровать датафрейм на основе идентификаторов входных разделов для следующего задания;
  • создать список входных файлов и отправить его в Spark-драйвер для последующего ввода заданий.

Такое распределение чтения файлов метаданных позволило оптимизировать время чтения входных данных для последующих заданий на 5–8 минут.

Еще больше интересных приемов по оптимизации разработки распределенных приложений и потоковой аналитики больших данных с Apache Spark вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

 

 

Источники

  1. https://www.waitingforcode.com/apache-spark-structured-streaming/checkpoint-storage-structured-streaming/read
  2. https://medium.com/@Iqbalkhattra85/exactly-once-mechanism-in-spark-structured-streaming-7a27d8423560
  3. https://medium.com/@Iqbalkhattra85/optimize-spark-structured-streaming-for-scale-d5dc5dee0622