Взболтать, но не смешивать: оптимизация вычислений в Apache Spark SQL

Автор Категория ,
Взболтать, но не смешивать: оптимизация вычислений в Apache Spark SQL

Продолжая разговор про оптимизацию Apache Spark и повышение эффективности Big Data приложений, сегодня рассмотрим способы ускорения Shuffle-операций в Spark SQL, разберем, чем хороши широковещательные JOIN-операции и как количество разделов влияет на производительность запросов в распределенных приложениях аналитики больших данных.

4 способа оптимизации Shuffle-операций

При аналитике больших данных с помощью Apache Spark этап Shuffle является промежуточным между шагами Map и Reduce в MapReduce – типовой вычислительной модели обработки Big Data. Можно сказать, что перемешивание – это побочный эффект таких аналитических преобразований, как join(), independent(), groupBy(), orderBy(). Само выполнение этих операций происходит на этапе Map, после считывания данных из источника или предыдущего шага. Объединение результатов выполняется в фазе Reduce, после чего данные записываются на диск. Производительность падает, т.к. большой объем данных передается по сети между сторонами чтения и записи данных. При том, что каждая shuffle-операция выполняется по своему алгоритму, например, groupBy() объединяет множество записей на основе ключа и возвращает одну запись, которая является его счетчиком, а JOIN() соединяет два набора данных по общему ключу и создает по одной записи для каждой совпадающей комбинации, они все имеют некоторые общие свойства [1]:

  • читают данные из какого-то источника;
  • объединяют записи по всем разделам по некоторому ключу;
  • агрегированные записи записываются на диск (Shuffle-файлы);
  • каждый исполнитель (executor) Спарк считывает свои агрегированные записи от других исполнителей;
  • требуют затратных операций дискового и сетевого ввода-вывода.

Снизить отрицательный эффект Shuffle-операций аналитики больших данных в Apache Spark SQL можно следующими способами [1]:

  • сократить количество операций сетевого ввода-вывода за счет использования меньшего числа более worker’ов. Это позволит снизить объем данных, перемещаемых между вычислительными узлами и, соответственно, уменьшит сетевой ввод-вывод;
  • уменьшить объем перемешиваемых данных в целом, например, избавиться от ненужных столбцов, отфильтровать лишние записи, оптимизировать извлечение данных. В частности, колоночный формат Parquet позволяет считывать только те столбцы, которые действительно требуются.
  • использовать широковещательное хэш-соединение для таблиц (BroadcastHashJoin), когда меньшая из двух соединяемых таблиц перераспределяется исполнителям. Это позволяет полностью избежать операции перемешивания и контролируется свойством sql.autoBroadcastJoinThreshold, по умолчанию равному 10 МБ. Если меньшая из двух таблиц не превышает пороговое значение, ее можно транслировать всем рабочим узлам (worker’ам) для join-соединения. Значение параметра spark.sql.autoBroadcastJoinThreshold можно задать самостоятельно в байтах или отключить трансляцию вообще, установив его равным -1 [2]. Подробнее о том, как работает BroadcastHashJoin и другие стратегии Join-операций в Spark SQL, мы писали здесь.
  • предварительно перемешать сегментированные данные, чтобы исключить обмен и сортировку. При этом данные объединяются в сегменты (buckets) и при необходимости сортируются, а результат сохраняется в таблице и доступен для последующих чтений. Об этом методе оптимизации приложений Spark SQL с помощью бакетирования таблиц читайте в нашей новой статье.

Отмеченные способы показывают, что широковещательное соединение позволяет избежать перемешивания большего набора данных, ускоряя скорость вычислений. Однако, этот способ имеет ряд специфических особенностей, которые мы рассмотрим далее.

Чем хорош BroadcastHashJoin и что с ним не так: немного об AQE в Spark 3

Итак, в широковещательном Hash Join один из двух входных наборов данных транслируется всем исполнителям, для каждого из которых строится хеш-таблица. Затем разделы не транслируемого входного набора данных присоединяются к другому набору данных в виде локальной хеш-таблицы. Это не требует перемешивания, но предполагает наличие достаточного объема памяти для размещения транслируемого набора данных у каждого из исполнителей. Поэтому на практике Спарк избегает этого механизма, если оба набора входных данных превышают настраиваемый порог в свойстве spark.sql.autoBroadcastJoinThreshold.

Примечательно, что стоимость широковещательного соединения зависит от относительного размера между двумя таблицами. Поэтому большое значение параметра spark.sql.autoBroadcastJoinThreshold увеличивает накладные расходы на выполнение хэш-таблицы и отправку данных по сети, потребляет память для хранения данных и уменьшает параллелизм [3]. Однако, на практике SQL-операторы Spark часто конвейерные и выполняются в параллельных процессах, а широковещательный обмен прерывает этот конвейер. Каждый этап SQL-запроса материализует свой промежуточный результат, и следующий шаг может начаться только, если все параллельные процессы материализации завершены. Подчеркнем, что Spark SQL планирует широковещательное хеш-соединение, если предполагаемый размер отношения соединения ниже порогового значения широковещательного размера. Но, например, в случае нетипичного фильтра или серии сложных join-операторов оценка размера данных может быть искажена. Поэтому одна из главных новинок Спарк 3.0, AQE-структура адаптивного выполнения запросов (Adaptive Query Execution) перепланировывает join-стратегию во время выполнения, уточняя размер отношения соединения. В частности, если оказалось, что правая сторона соединения оказалась меньше оценки и недостаточна для широковещательной передачи, после повторной AQE-оптимизации статически запланированное соединение сортировкой (Sort Merge) преобразуется в BroadcastHashJoin [4]. Таким образом, изменение конфигурации Spark SQL может повысить производительность распределенных приложений аналитики больших данных. Однако, настройка spark.sql.autoBroadcastJoinThreshold – не единственный вариант оптимизации. Также имеет смысл поиграть с параметром spark.sql.shuffle.partitions, который мы рассмотрим далее.

Настройка разделов для перемешивания

Как мы уже отметили, в Спарк перемешивание выполняется при выполнении аналитических операций, таких как объединение и группировка. Изменяя количество разделов, в которых происходит перемешивание, можно повысить эффективность shuffle-операций с помощью конфигурации spark.sql.shuffle.partitions. По умолчанию значение этого параметра равно 200. Однако, на практике 200 разделов не имеют смысла для обработки файлов размером в несколько ГБ. Их следует изменить в соответствии с объемом данных, который нужно обработать через Spark SQL. Так простое сокращение разделов перемешивания снизит число shuffle-операций и время, необходимое для соединения наборов данных [5]. В заключение отметим, что в случае Apache Spark Structured Streaming эту конфигурацию нельзя изменить между перезапусками SQL-запроса из одной и той же контрольной точки [2].

Узнайте больше про тонкости инженерии больших данных и разработки распределенных Spark-приложений на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

 

 

Источники

  1. https://medium.com/road-to-data-engineering/spark-performance-optimization-series-3-shuffle-104738a83a9e
  2. https://spark.apache.org/docs/latest/configuration.html
  3. https://databricks.com/session_na20/on-improving-broadcast-joins-in-apache-spark-sql
  4. https://databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html
  5. https://riptutorial.com/apache-spark/example/26264/controlling-spark-sql-shuffle-partitions