Как перейти от Python к PySpark: ТОП-10 рекомендаций по настройке Spark-заданий

Автор Категория , ,
Как перейти от Python к PySpark: ТОП-10 рекомендаций по настройке Spark-заданий

Говоря про обучение Apache Spark для разработчиков, сегодня мы рассмотрим, как быстро конвертировать Python-скрипты в задания PySpark и какие конфигурационные параметры при этом нужно настроить, чтобы эффективно использовать все возможности распределенных вычислений над большими данными (Big Data). Читайте далее, чем отличаются датафреймы в Pandas и Apache Spark, для чего нужны Arrow и Koalas, а также как оптимально превратить локальный датасет в RDD.

5 шагов от Python-скриптов к заданиям PySpark

Прежде всего, напомним, что язык программирования Python ориентирован на локальную работу с данными в пределах одного компьютера, тогда как Apache Spark – это фреймворк распределенных вычислений, где данные распределены по нескольким узлам кластера. Поэтому, несмотря на наличие API-интерфейса Python в Spark, называемого PySpark, чтобы использовать всю мощь распределенной среды, Python-код не просто конвертируется в задания PySpark, а требует последующей настройки. При этом необходимо поработать как с датасетом, приведя в соответствие структуры данных, так и с программной логикой, выполнив следующие действия [1]:

  1. преобразование локального датафрейма Pandas в Spark Dataframe. Это можно сделать с помощью Apache Arrow, независимого от языка столбцового формата данных в оперативной памяти или Koalas, который представляет собой API Pandas в Apache Spark. Koalas дополняет API DataFrame PySpark, обеспечивая совместимость с Pandas.

При работе с Apache Arrow, следует установить конфигурационный параметр Spark spark.sql.execution.arrow.pyspark.enabled в значение True. Ниже показан пример, как это сделать на PySpark:

import numpy as np

import pandas as pd

# Enable Arrow-based spark configuration

spark.conf.set(“spark.sql.execution.arrow.enabled”, “true”)

# Generate a pandas DataFrame

data = [1,2,3,4,5]

pdf = pd.DataFrame(data) 

# Create a Spark DataFrame from a pandas DataFrame using Arrow

df = spark.createDataFrame(pdf)

# Convert the Spark DataFrame back to a pandas DataFrame using Arrow

final_pdf = df.select(“*”).toPandas()

  1. Создание пользовательской функции PySpark (UDF, User Defined Function), аналогичной функции Python.

                       UDF в PySpark можно определить двумя способами:

  • через лямбда-выражение udfname = udf(LAMBDA_EXPRESSION, RETURN_TYPE )
  • через настраиваемую функцию udfname = udf(CUSTOM_FUNCTION, RETURN_TYPE)

В любом случае, UDF PySpark принимает столбцы и применяет логику построчно для создания нового столбца. Например, следующий код показывает, как определить UDF-функцию возведения целого числа в квадрат:

from pyspark.sql.functions import col, udf

from pyspark.sql.types import IntegerType

def squared(s):

        return s * s

square_udf = udf(lambda x: squared(x), IntegerType())

df = spark.createDataFrame([(4),(8)], [“numbers”])

df.withColumn(“num_square”, square_udf(col(“numbers”))

  1. Загрузка датасета в распределенные структуры данных Spark: RDD или DataFrame, что можно сделать следующим образом:

#Load dataset as RDD

path=”file path with file name”

text_rdd = sc.textFile(path)

#Load dataset as DataFrame

df=spark.read.format(“csv”).option(“header”,”true”).option(“inferSchema”,”true”).load(path)

  1. Использование преобразования map() вместо циклов для каждого элемента RDD с применением функции, возвращающей новый RDD. Это повысит производительность кода, т.к. к примеру, из-за цикла for вычисления будут происходить последовательно, а не параллельно, увеличивая время выполнения программы.
  2. Учет взаимозависимости датафреймов: например, если новое значение столбца DataFrame зависит от других таких же структур данных, целесообразно соединить их через JOIN и вызвать UDF-функцию, чтобы получить новое значение столбца.

Полученное таким образом приложение PySpark нуждается в дополнительной настройке параметров. Это стоит сделать, чтобы более эффективно работать в кластере Apache Spark, оптимально используя все ресурсы распределенной среды. Частично эту проблему решает развертывание Spark-заданий решает их развертывание в кластере Kubernetes, о чем мы рассказываем здесь. А как самостоятельно оптимизировать переход на распределенный режим с локального Python-приложения, мы рассмотрим далее.

Что настроить в приложении Apache Spark: 5 главных факторов

Итак, поскольку приложение PySpark выполняется в распределенной среде кластера Apache Spark, при настройке Spark-заданий перед их запуском стоит учитывать следующие аспекты [1]:

  1. уровень параллелизма – если он недостаточный, часть ресурсов кластера Spark будет простаивать, что неэффективно с точки зрения экономики. С другой стороны, при слишком большом распараллеливании заданий, увеличиваются накладные расходы, связанные с каждым разделом (partition) для распределенной обработки RDD. Настроить уровень параллелизма можно такими способами:
  • указать количество разделов при вызове операций перетасовки данных (shuffle);
  • распространять данные с помощью repartition() – для дататфреймов или coalesce(), чтобы уменьшить количество разделов.
  1. количество исполнителей (executor) и ядер. Примечательно, что простое увеличение этих факторов не всегда ведет к повышению производительности. Каждая задача будет обрабатываться одним ядром процессора в кластере. Например, если указано 3 ядра, один исполнитель будет обрабатывать 3 задачи параллельно. Обычно, рекомендуется задавать не более 5 ядер. Можно автоматически изменять этот параметр, используя динамическое размещение dynamicAllocation.enabled, которое масштабирует количество исполнителей в приложении в зависимости от рабочей нагрузки.
  2. Широковещательные переменные (broadcast variable), которые похожи на распределенный кэш в Hadoop, позволяя локально хранить переменную только для чтения в кэше на отдельном узле кластера, а не отправлять ее копию вместе с задачами.

Широковещательный набор данных означает, что данные будут доступны для всех исполнителей, что снижает перетасовку данных и уменьшает накладные расходы на передачу данных по сети. Использовать broadcast-переменные целесообразно в следующих случаях:

  • для эффективного предоставления каждому узлу копии большого входного набора данных;
  • при работе со справочными данными, которые доступны только для чтения и не меняются на протяжении всего жизненного цикла Spark-приложения;
  • одни и те же данные используются на нескольких этапах выполнения приложения;
  • данных слишком мало, чтобы заполнить память рабочего узла;
  • большой набор данных соединяется с небольшим датасетом, broadcasting которого поможет повысить общую производительность приложения.

По умолчанию параметр spark.sql.autoBroadcastJoinThreshold равен 10 МБ, что означает максимальный размер в байтах для таблицы, которая будет транслироваться всем рабочим узлам при выполнении соединения. Изменить это значение можно до того размера, который требуется установить для трансляции большого датасета другим узлам кластера Spark [2].

  1. Кэширование данных, которые используются более одного раза в задании Spark, чтобы избежать повторного вычисления RDD или DataFrame.
  2. Сериализация данных, которая в Spark по умолчанию выполняется на базе Java. Это обеспечивает гибкость и совместимость с большинством классов, но производится достаточно медленно. Для ускорения сериализации и десериализации рекомендуется делать это с помощью библиотеки Kryo. Она поддерживает кэширование и перетасовку RDD, но изначально не поддерживает сериализацию на диск. А часто используемые на практике методы saveAsObjectFile() в RDD и objectFile() в SparkContext поддерживают только Java-сериализацию. Тем не менее, если нужно повысить производительность и снизить потребление памяти, стоит попробовать Kryo. В частности, сериализация влияет на JOIN-операции и группировки, которые обычно включают перетасовку данных. Однако, чем меньше объем данных для перетасовки, тем быстрее будет выполняться операция [3]. Задать количество разделов, используемых при перетасовке данных для объединений или агрегатов можно с помощью параметра sql.shuffle.partitions.

Переключиться на использование Kryo можно, инициализировав задание с помощью объекта SparkConf, например, conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”). Также можно задать сериализатор в команде spark-submit: – conf spark.serializer = org.apache.spark.serializer.KryoSerializer.

С сериализацией и десериализацией также связан фактор формата данных. В частности, Avro и Parquet предпочтительнее для файлов с большими данными, чем текстовые форматы, CSV и JSON.

Завтра мы продолжим разбираться с методами повышения производительности Spark-приложений и рассмотрим особенности кэширования SQL-запросов в этом Big Data фреймворке. А освоить все особенности администрирования и разработки Apache Spark для аналитики больших данных вы сможете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

 

 

Источники

  1. https://medium.com/hashmapinc/5-steps-to-converting-python-jobs-to-pyspark-4b9988ad027a
  2. https://spark.apache.org/docs/latest/sql-performance-tuning.html
  3. https://blog.knoldus.com/kryo-serialization-in-spark/