Как быстрее обработать массив в Apache Spark 3.1: сравнение 9 разных методов

курсы по Spark, обучение Apache Spark, курсы Spark-программистов, обучение разработчиков Big Data, разработка Spark-приложений, PySpark для больших данных курсы обучение, Школа Больших Данных Учебный Центр Коммерсант

Apache Spark предоставляет для разработчика распределенных приложений множество возможностей, позволяя достигать одной целей разными способами. Чтобы проиллюстрировать это, сегодня рассмотрим бенчмаркинговое сравнение 9 методов обработки массивов в Spark 3.1, обращая внимание на их производительность и особенности использования. Также разберем важные для обучения разработчиков Spark темы про отличия преобразований от действий и оптимизацию отложенных вычислений.

Преобразования и действия: под капотом вычислений Apache Spark

Прежде чем измерять производительность методов обработки массивов в Apache Spark, следует вспомнить особенности выполнения преобразований над данными в этой фреймворке. В частности, оптимизатор Spark может упростить план запроса так, что фактическое преобразование, производительность которого планируется оценить, будет пропущено, т.к. оно не требуется для определения окончательного результата [1]. Например, в DataFrame можно вызвать 2 типа операций: преобразования (transformations) и действия (actions). Преобразования являются отложенными (ленивыми, lazy): они не запускают вычисления при их вызове, а просто создают план запроса для последующего выполнения в случае необходимости. При этом сами данные все еще находятся в хранилище и ожидают обработки, которая фактически произойдет после материализации запроса и вызове какого-либо действия. После вызова действия, которое обычно является функцией, запрашивающей вывод, Spark запускает вычисление из нескольких шагов [2]:

  • план запроса оптимизируется оптимизатором Spark;
  • создается физический план;
  • физический план запроса компилируется в RDD DAG (направленный ациклический граф, Directed Acyclic Graph);
  • RDD DAG делится на этапы (stages) и задачи (tasks), которые выполняются в Spark-кластере.

Core Spark - основы для разработчиков

Код курса
CORS
Ближайшая дата курса
28 ноября, 2022
Длительность обучения
16 ак.часов
Стоимость обучения
40 000 руб.

Поэтому, чтобы убедиться, что все преобразования действительно выполнены, лучше вызвать действие write() и сохранить результат вывода во внешнее хранилище. Но это увеличит время отладки из-за процесса записи. Последний релиз Apache Spark 3.0, о котором мы рассказывали здесь, упрощает подобное тестирование благодаря встроенным источникам данных или noop функциям в пакетном и потоковом режимах. В частности, при назначении нового источника данных в пакетном режиме материализация набора данных выполняется без дополнительных накладных расходов, связанных с действиями и преобразованием значений строк в другие типы. Это полезно при тестировании производительности и кэшировании. В случае потоковой передачи функция no-op позволяет расширить источник данных для поддержки приемника [3]. Таким образом, начиная с версии фреймворка 3.0, можно указать noop как формат записи итогового датафрейма df.format(«noop»), который материализует запрос и выполнит все преобразования, но никуда не запишет результат.

Подход к бенчмаркингу: что будем сравнивать и на каких данных

Бенчмаркинговое сравнение, подробно описанное в источнике [1], проводилось в следующей среде:

  • Apache Spark1.1 (выпуск марта 2021 года) на платформе Databricks со средой выполнения 8.0;
  • кластер на 3 рабочих узла m5d.2xlarge (всего 24 ядра);
  • входной набор данных, в каждой строке которого есть массив слов, хранится в объектном хранилище AWS S3 в файле формата Apache Parquet и имеет 1047 385 835 строк (чуть более одного миллиарда строк);
  • сравнивается производительность выполнения запроса на добавление к набору данных нового столбца (массива), значения которого соответствуют длине слов в исходном массиве.

Сравнивалась производительность следующих методов обработки массивов в Apache Spark [1]:

  • функции высшего порядка (HOF, Higher-Order Functions) – преобразования столбцов для сложных типов данных, таких как массивы;
  • пользовательские функции Python (UDF, User Defined Function), которые позволяют расширить API DataFrame с помощью простого интерфейса и реализовать любое настраиваемое преобразование. Однако Spark не может сразу преобразовать в инструкции JVM Python-код из UDF, который должен выполняться локально. Чтобы передать данные из JVM worker’у Python они преобразуются в байты и передаются по сети, что не очень эффективно и требует довольно большого объема памяти.
  • UDF с использованием API библиотеки Pandas, что подходит для обработки массивов, передавая данные из JVM в кластере Spark на рабочий узел Python в in-memory формате Apache Arrow, что более эффективно чем в предыдущем способе. Также Pandas UDF может использовать векторизованное выполнение на стороне Python, что полезно при численных вычислениях векторизованных операций, которые поддерживаются библиотеками NumPy или SciPy.
  • Scala UDF, чтобы избежать передачи данных в Python, и выполнять преобразования в JVM. Но в этом случае остаются некоторые накладные расходы на преобразование данных из внутреннего формата в объекты Java. Для собственных преобразований Apache Spark использует внутренний формат данных, генерируя эффективный Java-код во время выполнения. С UDF-кодом Spark сперва преобразует данные в объекты Java, а затем выполняет пользовательские функции, после чего снова преобразует данные во внутренний формат.
  • вызов Scala UDF из Python, что позволяет разработчику написать Spark-приложение на Python, а в Scala реализовать только UDF, чтобы сохранить выполнение внутри JVM и ускорить пользовательские преобразования. Файл Scala компилируется с использованием sbt в jar-файл, который загружается в кластер и может использоваться внутри приложения PySpark.
  • Разнесение массива с помощью функции sql.functions.explode(col), которая возвращает новую строку для каждого элемента в исходном массиве, используя имя столбца по умолчанию col для элементов в массиве [4]. Так можно получить доступ к каждому элементу массива отдельно, но в DataFrame станет строк, чем было сначала. Вычислив длину каждого элемента, можно сгруппировать эти результаты в массивы и снова уменьшить DataFrame до исходного размера. Этот метод имеет 5 особенностей, одно из которых напрямую связано с производительностью, два — с корректностью результатов, одно — с формой окончательного вывода, а одно — с обработкой нулей. Подробнее об этом мы поговорим завтра.

Анализ данных с Apache Spark

Код курса
SPARK
Ближайшая дата курса
28 ноября, 2022
Длительность обучения
32 ак.часов
Стоимость обучения
80 000 руб.
  • Разнесение массива с бакетированием источников. Напомним, бакетирование (Bucketing)– это метод оптимизации производительности задач в Apache Spark SQL и Hive, который разбивает данные на более управляемые части (сегменты или бакеты), чтобы ускорить последовательные чтения данных для последующих заданий. В один бакет попадают строчки таблицы с одинаковым значением хэш-функции, вычисленным по определенной колонке. Как это устроено, мы рассказывали здесь. Если разделить данные по ключу группировки, можно избежать перемешивания, т.е. сократить накладные расходы на передачу данных по сети.
  • RDD API Python — низкоуровневый API, который сегодня считается морально устаревшим. Этот метод не рекомендуется использовать на практике – желательно применять вместо него API DataFrame или SQL. Однако, RDD как базовая структура данных Apache Spark по-прежнему используется внутри этого фреймворка. В частности, план запроса всегда компилируется в RDD DAG во время выполнения, но очень эффективно: сперва оптимизируется план запроса, а затем генерируется высокоэффективный Java-код, который работает с внутренним представлением данных в формате Tungsten. RDD в пользовательском коде не оптимизируются, и данные будут представлены как объекты Java с гораздо большим потреблением памяти по сравнению с Tungsten.
  • RDD API Scala — аналогично предыдущему, но сразу в Scala, чтобы избежать трансляции из Python в JVM и обратно.

Результаты выполнения одного и того же запроса на одинаковом наборе данных с использованием вышеописанных методов представлены на диаграмме [1].

Spark, PySpark, вычисления в Apache Spark
Результаты бенчмаркингового теста 9 разных методов обработки массивов в Apache Spark

Таким образом, самым быстрым стал HOF-метод (66 секунд), использующий всю мощность встроенных возможностей последней версии фреймворка.

Освойте все эти и другие функции Apache Spark для разработки распределенных приложений и быстрой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://towardsdatascience.com/performance-in-apache-spark-benchmark-9-different-techniques-955d3cc93266
  2. https://towardsdatascience.com/a-decent-guide-to-dataframes-in-spark-3-0-for-beginners-dcc2903345a5
  3. https://spark.apache.org/releases/spark-release-3-0-0.html
  4. https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.explode.html

Поиск по сайту