Непредсказуемость Apache Spark SQL и как от нее избавиться: про UDF и Catalyst

Автор Категория ,
Непредсказуемость Apache Spark SQL и как от нее избавиться: про UDF и Catalyst

Сегодня в рамках обучения разработчиков Apache Spark и дата-аналитиков, поговорим про детерминированность UDF-функций и особенности их обработки оптимизатором SQL-запросов Catalyst. На практических примерах рассмотрим, как оптимизатор Spark SQL обрабатывает недетерминированные выражения и зачем кэшировать промежуточные результаты, чтобы гарантированно получить корректный выход.

 

Еще раз про детерминированность функций и планы выполнения запросов в Apache Spark SQL

О некоторых особенностях определенных пользователем функций (User Defined Functions, UDF) в Spark SQL мы рассказывали здесь. Чтобы понять, почему обработке оптимизатором SQL-запросов Catalyst обрабатывает их без чуть дольше, чем готовые функции фреймворка по причине отсутствия фильтров в физическом плане, рассмотрим саму природу детерминированности вычислений в распределенных системах.

В функциональном программировании свертка (сокращение, накопление, агрегирование или сжатием) относится к семейству функций высшего порядка, которые анализируют рекурсивную структуру данных и рекомбинируют результаты рекурсивной обработки ее составных частей, создавая возвращаемое значение. Обычно свертка представлена fold-​​функцией, верхним узлом структуры данных и, возможно, некоторыми значениями по умолчанию, которые будут использоваться при определенных условиях. Затем свертка переходит к объединению элементов иерархии структуры данных с систематическим использованием функции [1].

Именно этот принцип структурной рекурсивной обработки данных реализуется в планах выполнения SQL-запросов – отчетах с подробным описанием шагов, определенных оптимизатором СУБД, в виде дерева узлов, каждый из которых передает свой результат вышестоящему. На узле происходят операции с данными: сканирование, соединение, агрегирование или сортировка, которые могут быть выполнены различными методами.

В Spark SQL пользовательские UDF-функции – это программируемые процедуры, которые работают с одной строкой. По умолчанию Spark рассматривает любую UDF как детерминированную функцию, которая, в отличие от недетерминированной, всегда возвращает для одних и тех входных аргументов одинаковое значение. Решение рассматривать UDF как детерминированное по умолчанию связано с оптимизацией Spark плана выполнения SQL-запроса. Например, UDF в API DataFrame для добавления дополнительного столбца с помощью метода withColumn() с последующим применением преобразования к полученному DataFrame. UDF потенциально может выполняться несколько раз для каждой записи, что влияет на общую производительность приложения. Обойти это можно через кэширование DataFrame с помощью метода cache(). Это создаст промежуточный датафрейм, чтобы избежать дублирования вызовов [2].

Начиная с версии Apache Spark 2.3 можно объявить UDF недетерминированной через метод

asNondeterministic(). Например, на Scala это будет выглядеть так [3]:

val random = udf(() => Math.random())

spark.udf.register(“random”, random.asNondeterministic())

Что находится под капотом этого метода, рассмотрим далее.

Недетерминированность UDF, оптимизатор Catalyst и неповторимость результатов

Недетерминированность – это контракт для выражений Catalyst, которые не являются детерминированными и не свертываются. Недетерминированные выражения требуют явной инициализации с текущим индексом раздела перед вычислением значения. В пакете org.apache.spark.sql.catalyst.expressions это прописано следующим образом [4]:

trait Nondeterministic extends Expression {

  // only required methods that have no implementation

  protected def initializeInternal(partitionIndex: Int): Unit

  protected def evalInternal(input: InternalRow): Any

}

Catalyst, внутренний оптимизатор Spark SQL – это не зависящая от исполнения структура для представления и управления графом потока данных, то есть деревьями реляционных операторов и выражений. Выражением называется исполняемый узел в дереве Catalyst, который может оценивать значение результата при заданных входных значениях, то есть может создавать объект JVM для каждой внутренней строки (InternalRow).

Выражение может генерировать исходный код Java, который затем используется при вычислениях. В детерминированных выражениях, которые дают один и тот же результат для одного и того же входа, все его дочерние выражения, т.е. узлы в дереве плана запросов, также являются детерминированными. Недетерминированные выражения не допускаются в некоторых логических операторах и исключаются при некоторых оптимизациях.

Частью контракта для вычисления интерпретируемого выражения, а не сгенерированного кода, т.е. вычисления выражения Catalyst для объекта JVM для внутренней двоичной строки, является метод

eval(input: InternalRow): Any. Он просто обертка evalInternal, который проверяет, что инициализация выражения уже была выполнена и сообщает об исключении IllegalArgumentException, когда внутренний флаг инициализации выключен в противном случае [4].

Примером недетерминированного выражения является rand, при использовании которого в UDF можно столкнуться с ошибкой. В частности, следующий код на PySpark

def f(col):

    return col

from pyspark.sql import functions as F

from pyspark.sql.types import DoubleType

df = spark.range(10)

udf = F.udf(f, returnType=DoubleType()).asNondeterministic()

df.withColumn(‘new’, udf(F.rand())).show() 

 

выдаст ошибку:

Py4JJavaError: An error occurred while calling o469.showString.

: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 20.0 failed 1 times, most recent failure: Lost task 0.0 in stage 20.0 (TID 34, localhost, executor driver): java.lang.NullPointerException

 

Это происходит как раз из-за недетерминированности UDF-функциий. Недетерминированная функция rand() на стороне JVM зависит от временной переменной rng, которая не проходит сериализацию и десериализацию в сочетании с реализацией eval, т.к. это небезопасно с нулевым значением. Подобная ошибка может возникнуть с использованием функции randn().

В данном примере Spark оценивает udf(F.rand()) как одно Python-выражение UDF и сериализует вызов rand() в command_pickle, по пути теряя инициализированный переходный процесс. Это видно в плане выполнения:

df.withColumn(‘new’, udf(F.rand())).explain()

 == Physical Plan ==

*(2) Project [id#0L, pythonUDF0#95 AS new#92]

+- BatchEvalPython [f(rand(-6878806567622466209))], [id#0L, pythonUDF0#95]

   +- *(1) Range (0, 10, step=1, splits=8)

 

Чтобы сделать класс Rand для генерации случайных чисел NULL-безопасным можно создать свой собственную UDF-функцию rand() на базе случайного генератора Python, например [5]:

from pyspark.sql import functions as F

from pyspark.sql.types import DoubleType

from random import random

def f(col):

    return col

df = spark.range(10)

udf = F.udf(f, returnType=DoubleType()).asNondeterministic()

rand = F.udf(random, returnType=DoubleType()).asNondeterministic()

df.withColumn(‘new’, udf(rand())).show()

На практике с таким сценарием использования UDF-функций генерации случайных значений можно столкнуться в задачах, связанных с криптографией. В частности, создания так называемой соли – модификатора входа хэш-функции – строки данных, которая передаётся вместе с входным массивом для вычисления хэша, чтобы повысить надежность системы, затруднив злоумышленникам подбор паролей. Например, код соединяет 2 таблицы через join для преодоления проблем с искаженными данными. И при использовании UDF для генерации случайных чисел результат JOIN-запроса может иметь разное количество строк в зависимости от конкретных генерируемых случайных чисел. Знание этих особенностей Spark SQL позволит разработчика обходить ограничения фреймворка, к примеру, с помощью промежуточного кэширования датафрейма, что мы отметили выше [7]. О других полезных лайфхаках, упрощающих ежедневную работу аналитика данных, Data Scientist’а и разработчика распределенных приложений с Apache Spark, читайте в нашей новой статье.

Узнайте больше тонкостей эксплуатации Apache Spark для разработки распределенных приложений и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

 

 

Источники

  1. https://en.wikipedia.org/wiki/Fold_(higher-order_function)
  2. https://nellaivijay.medium.com/how-to-size-spark-application-ff2a9d487926
  3. https://spark.apache.org/docs/latest/configuration.html
  4. https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-Expression-Nondeterministic.html
  5. https://question-it.com/questions/3173320/pochemu-proishodit-sboj-pyspark-udf-kotoryj-rabotaet-so-stolbtsom-sgenerirovannym-rand
  6. https://stackoverflow.com/questions/63154028/unexpected-behavior-of-udf-for-random-integers-with-join-operation