Еще 3 причуды API DataFrame в Apache Spark, о которых вы не знали

Автор Категория ,
Еще 3 причуды API DataFrame в Apache Spark, о которых вы не знали

Чтобы сделать наши курсы по Apache Spark еще более полезными, мы рассказываем о неочевидных тонкостях этого фреймворка, знание которых позволит разработчику распределенных приложений использовать возможности этой технологии более эффективно. Сегодня на практических примерах PySpark в API DataFrame рассмотрим разницу между функциями сортировки массивов и особенности объединения контенкации, а также разберемся с детерминированностью вычислений.

Сортировка массивов: array_sort vs sort_array

Обе эти две функции из пакета pyspark.sql.functions предназначены для сортировки массивов. Несмотря на похожее название, они отличаются особенностями применения и обработкой пустых значений [1]:

  • array_sort(col), где col – имя столбца или выражения. Эта функция сортирует входной массив в порядке возрастания, но его элементы должны быть упорядочиваемыми. Нулевые элементы будут помещены в конец возвращаемого массива [2].
  • sort_array(col, asc=True), где col – имя столбца или выражения, а asc – порядок сортировки (по возрастанию или по убыванию). Эта функция сортирует входной массив, помещая нулевые элементы в начало возвращаемого массива при сортировке по возрастанию или в его конец при сортировке по убыванию [2].

l = [(1, [2, None, 3, 1])]df = spark.createDataFrame(l, [‘id’, ‘my_arr’])(

    df
    .withColumn(‘my_arr_v2’, array_sort(‘my_arr’))

    .withColumn(‘my_arr_v3’, sort_array(‘my_arr’))

    .withColumn(‘my_arr_v4’, sort_array(‘my_arr’, asc=False))

    .withColumn(‘my_arr_v5’, reverse(array_sort(‘my_arr’)))

).show()

PySpark, сортировка массивов Spark, API DataFrame Spark SQl
Сортировка массива c array_sort и sort_array

Интересно, что функцию array_sort() можно использовать непосредственно как SQL-выражение в качестве аргумента функции expr(), где второй аргумент является функцией компаратора, начиная с версии Spark 3.0, которой мы писали здесь. Так разработчик сам может определить, как сравнивать элементы для создания порядка. Это существенно расширяет типовые возможности фреймворка, позволяя, например, отсортировать массив структур и определить, по какому полю структуры делать сортировку. При этом функция сравнения в SQL-выражении принимает два аргумента слева и справа, которые являются элементами массива, и определяет, как их сравнивать.

Например, создадим массив структур StructField и просортируем его по полю f2.

schema = StructType([

    StructField(‘arr’, ArrayType(StructType([

        StructField(‘f1’, LongType()),

        StructField(‘f2’, StringType())

    ])))
])
l = [(1, [(4, ‘b’), (1, ‘c’), (2, ‘a’)])]

df = spark.createDataFrame(l, schema=schema)(

    df
    .withColumn(‘arr_v1’, array_sort(‘arr’))

    .withColumn(‘arr_v2’, expr(

        “array_sort(arr, (left, right) -> case when left.f2 < right.f2 then -1

when left.f2 > right.f2 then 1 else 0 end)”))

).show(truncate=False)

PySpark, array_sort vs sort_array
Сортировка массива структур PySpark

Контенкация и нули

Функция контенкации concat(), обычно применяемая для объединения строк, также может использоваться для объединения массивов. Но она если какой-либо аргумент равен нулю, результат также становится нулевым. Поэтому работать с ней следует осторожно, чтобы не потерять данные при объединении двух массивов, если один из них имеет значение NULL. Обойти это ограничение поможет явная обработка с помощью coalesce() [1].

Напомним, функция coalesce(numPartitions) возвращает новый DataFrame с количеством целевых разделов, равным параметру numPartitions. Это приводит к узкой зависимости, позволяя сократить количество разделов и избежать перемешивания. Применять эту функцию тоже следует с осторожностью, т.к. резкое объединение разделов приведет к тому, что вычисления будут выполняться на меньшем количестве узлов, чем нужно для оптимального использования ресурсов. Избежать этого поможет перераспределение с repartition(), которое добавит этап перемешивания, но позволит повысить уровень параллелизма независимо от текущего разделения [2]. Подробнее о функциях coalesce() и repartition() мы писали здесь.

Рассмотрим пример контенкации строк в массиве структур.

from pyspark.sql.types import *

from pyspark.sql.functions import concat, coalesce, arrayschema = StructType([

    StructField(‘id’, LongType()),

    StructField(‘arr_1’, ArrayType(StringType())),

    StructField(‘arr_2’, ArrayType(StringType()))

])

l = [(1, [‘a’, ‘b’, ‘c’], None)]

df = spark.createDataFrame(l, schema=schema)(

    df

    .withColumn(‘combined_v1’, concat(‘arr_1’, ‘arr_2’))

    .withColumn(‘combined_v2’, concat(coalesce(‘arr_1’), array(), coalesce(‘arr_2’, array())))

).show()

курсы по Spark, обучение Apache Spark, курсы Spark-программистов, обучение разработчиков Big Data, разработка Spark-приложений, PySpark для больших данных курсы обучение, Школа Больших Данных Учебный Центр Коммерсант
Как работает функция concat() в Apache Spark

От перемены мест результат меняется: детерминированность функций в Apache Spark

В программировании функция является детерминированной, если для одного и того же набора входных значений она возвращает одинаковый результат. Недетерминированные функции могут возвращать разные значения при одинаковых входных аргументах [3].

Например, функция агрегирования collect_list(col), возвращающая список объектов с дубликатами и может использоваться для создания массива элементов после группировки по некоторому ключу, не является детерминированной, т.к. порядок элементов в результирующем массиве зависит от порядка строк, который может не быть детерминированным после перемешивания.

df2 = spark.createDataFrame([(2,), (5,), (5,)], (‘age’,))

>>> df2.agg(collect_list(‘age’)).collect()

[Row(collect_list(age)=[2, 5, 5])]

Примечательно, что недетерминированные функции обрабатываются оптимизатором Apache Spark с особой осторожностью, в частности, без использования фильтра в физическом плане запроса, который позволяет сократить объем обрабатываемых данных, чтобы ускорить вычисления. Почему это следует учитывать при работе с UDF-функциями, читайте в нашей новой статье. Кроме collect_list() недетерминированными функциями в Apache Spark также являются collect_set(), first(), last(), monotonically_increasing_id(), shuffle() и rand() [2]

Завтра мы продолжим разбираться с неочевидными возможностями и ограничениями API DataFrame в Apache Spark SQL и рассмотрим особенности оконных функций и кэширования датафреймов. 

Узнайте больше особенностей Apache Spark, чтобы эффективно использовать его для разработки распределенных приложений и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

 

 

Источники

  1. https://towardsdatascience.com/did-you-know-this-in-spark-sql-a7398bfcc41e
  2. https://spark.apache.org/docs/latest/api/python/pyspark.sql.html
  3. https://ru.wikipedia.org/wiki/Чистота_функции