Ускорение PySpark-приложений с PyArrow: лайфхаки Apache Spark для разработчиков

Автор Категория ,
Ускорение PySpark-приложений с PyArrow: лайфхаки Apache Spark для разработчиков

В рамках обучения разработчиков Spark-приложений и дата-инженеров, сегодня рассмотрим, как повысить эффективность выполнения Python-кода с помощью кросс-языковой платформы Apache Arrow. Что такое PyArrow и как это улучшает производительность PySpark-программ.

Почему Spark Java быстрее PySpark и как это исправить с Apache Arrow

Будучи популярным вычислительным движком в области Big Data, Apache Spark способен обрабатывать множество данных очень быстро. Однако, на 100% это утверждение характерно для Java- и Scala-кода, т.е. распределенное Spark-приложение обрабатывает данные очень быстро, пока они находятся в JVM. Но фреймворк поддерживает и другие языки разработки, в частности, весьма распространенный Python, который стал стандартом де-факто в Data Science. Относительно простой синтаксис и большое количество специализированных библиотек для обработки данных, включая функции статистики, ML-алгоритмы и средства визуализации, делают Python очень востребованным, в т.ч. для разработки Spark-приложений. Сам Big Data фреймворк при этом предоставляет соответствующий API, позволяя разработчику писать код на языке Python (PySpark). Таким образом, данные перемещаются между средой Python и JVM, что создает определенные накладные расходы и снижает скорость выполнения программы. Это происходит по причине того, что при выполнении пользовательской PySpark-функции, фреймворк перебирает каждую строку данных и выполняет преобразование из Python в Java для каждого значения с проверкой типов, где большая часть времени уходит на сериализацию данных.

Поэтому возникает потребность в инструменте, который позволит уменьшить этот негативный эффект, сохранив все преимущество простоты написания кода на Python с быстротой обработки данных в JVM. Таким средством является Apache Arrow – кросс-язычная платформа для разработки данных в памяти, о которой мы уже упоминали здесь. Она определяет стандартизированный независимый от языка столбцовый формат памяти для плоских и иерархических данных, организованный для эффективных аналитических операций на современных CPU и GPU. Формат памяти Arrow также поддерживает чтение с нулевым копированием для молниеносного доступа к данным без накладных расходов на сериализацию, предоставляя соответствующие вычислительные библиотеки и потоковую передачу сообщений, а также обеспечивая межпроцессное взаимодействие. Arrow поддерживает довольно много языков: C, C++, C#, Go, Java, JavaScript, MATLAB, Python, R, Ruby и Rust.

Можно рассматривать Apache Arrow как посредника между межъязыковыми компонентами для чтения датафреймов Spark их запись в хранилища данных типа HBase без накладных расходов на сериализацию и десериализацию структуры данных. Поэтому Apache Arrow полезен для предоставления бесшовной и эффективной платформы для обмена данными между различными платформами. В случае разработки кода на PySpark, Arrow позволит сгладить преобразование данных между JVM и Python-средой. Для этого есть специальная реализация реализация Arrow на Python, которая называется PyArrow. Как ее использовать, рассмотрим далее.

Что такое PyArrow и как его использовать

Библиотека PyArrow предоставляет Python API для функций, предоставляемых библиотеками Arrow, а также инструменты для интеграции Arrow и взаимодействия с Pandas, NumPy и другими Python-библиотеками. Поскольку фреймворк выполняет большую передачу данных между Python и JVM, PyArrow пригодится PySpark-разработчикам, которые работают с данными NumPy и Pandas. PyArrow позволяет повысить производительность распределенных приложений при переходе от датафрейма Python-библиотек к датафрейму Spark и наоборот.

Будучи платформой обработки данных в памяти, Arrow исключает этапы сериализации и десериализации, позволяя отправлять данные Pandas, NumPy или других Python-библиотек в JVM пакеты, где их можно использовать напрямую, не выполняя множество преобразований, но обеспечивая точную информацию о типе. Это можно представить как следующий набор действий:

  1. Pandas DataFrame будет распределяться по частям в соответствии с параллелизмом фреймворка Spark по умолчанию;
  2. данные каждого распределенного фрагмента преобразуются в RecordBatch Arrow;
  3. схема Spark создается из данных Arrow, которые имеют все определения типов. В настоящее время все типы данных Spark SQL поддерживаются преобразованием на основе Arrow, кроме MapType, ArrayType of TimestampType и вложенного StructType;
  4. RecordBatches или Arrow Data будут переданы в JVM для создания Java RDD;
  5. добавляется схема Spark с помощью JavaRDD, чтобы создать итоговый датафрейм;

Аналогичным образом в обратной последовательности PyArrow может преобразовать датафрейм Spark в Pandas.

Чтобы использовать PyArrow, его следует сперва установить его через менеджер пакетов pip или conda, а затем включить в настройках:

pip install pyarrow

spark.conf.set(“spark.sql.execution.arrow.enabled”, “true”)

При установке с помощью pip-менеджера PyArrow можно добавить как дополнительную зависимость модуля SQL с помощью команды pip install pyspark[sql]. Иначе следует убедиться, что PyArrow установлен и доступен на всех узлах кластера. Использование установленного PyArrow не является автоматическим по умолчанию, поэтому нужно включить его. Чтобы использовать Arrow при запуске приложения PySpark, пользователям необходимо сначала установить для конфигурации spark.sql.execution.arrow.enabled значение true, что изначально отключено. Если требуется включить это для всех сеансов, нужно добавить одну строку в конфигурацию по адресу SPARK_HOME/conf/spark-defaults.conf:

spark.sql.execution.arrow.enabled=true

Важно помнить, что размер пакета данных Arrow по умолчанию ограничен 10 000 строк. Если этого недостаточно, можно настроить это количество в конфигурации spark.sql.execution.arrow.maxRecordsPerBatch, задав там нужное число. Однако, этот параметр следует настраивать только в том случае, если ограничения памяти точно известны разработчику, поскольку эта память будет использоваться для создания ряда пакетов записей для обработки на основе каждого раздела данных.

В заключение отметим, что периодически следует проверять значение конфигурации spark.sql.execution.arrow.enabled, которое может измениться. В частности, если при преобразовании данных с помощью PyArrow в распределенном приложении возникла ошибка, фреймворк автоматически переключится обратно на оптимизацию, отличную от Arrow, установив конфигурации spark.sql.execution.arrow.enabled значение по умолчанию, т.е. false. Это позволит избежать сбоя приложения в процессе выполнения из-за типов данных, не поддерживаемых PyArrow.

Больше практических подробностей про Apache Spark для задач дата-инженерии, разработки распределенных приложений и аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://towardsdatascience.com/apache-spark-performance-boosting-e072a3ec1179
  2. https://arrow.apache.org/
  3. https://blog.clairvoyantsoft.com/optimizing-conversion-between-spark-and-pandas-dataframes-using-apache-pyarrow-9d439cbf2010