Параллелизм второго порядка в конвейерах данных с Apache Spark

Автор Категория ,
Параллелизм второго порядка в конвейерах данных с Apache Spark

В рамках обучения дата-инженеров и разработчиков Spark-приложений сегодня рассмотрим, как повысить эффективность обработки данных, используя всю мощь этого распределенного движка. Проблемы производительности и эффективности конвейера обработки данных с учетом разницы между действиями и преобразованиями в Apache Spark.

Снова про разницу между действиями и преобразованиями в Apache Spark

Основное преимущество Apache Spark как движка обработки огромных объемов данных в том, что он позволяет эффективно распределять и распараллеливать работу. Однако, из-за этой возможности по умолчанию есть риск пропустить места, где применение дополнительного параллелизма поверх фреймворка может повысить эффективность распределенного приложения. Напомним, движок поддерживает 2 вида операций над датафреймами, о чем мы писали здесь и здесь:

  • преобразования (transformations)– отложенные или ленивые вычисления, которые выполняются не сразу, а только создают план запуска и выполнения, фактически реализуясь после материализации запроса и вызове какого-либо действия. Хотя план запроса создается, сами данные все еще находятся в хранилище и ожидают обработки.
  • действия (actions)– функции, которые запрашивают вывод. План запроса создается план запроса и оптимизируется оптимизатором фреймворка, компилируется в направленный ациклический граф (DAG) RDD, состоящий из этапов (stages) и задач (tasks), которые выполняются в кластере. Оптимизированный план запроса генерирует высокоэффективный Java-код, который работает с внутренним представлением данных в формате Tungsten. Действия можно рассматривать как блокирующие операции, которые фактически выполняют распределенные вычисления, например, count() или repartition(), а также любые операции сохранения и сериализации данных.

Core Spark - основы для разработчиков

Код курса
CORS
Ближайшая дата курса
30 мая, 2022
Длительность обучения
16 ак.часов
Стоимость обучения
40 000 руб.

Spark-задание могут содержать одно или несколько действий, которые могут зависеть друг от друга. Но чаще всего задания содержат несколько независимых действий. Однако, с традиционной структурой приложения фреймворк будет обрабатывать только одно из этих действий за раз, даже если между ними есть прямые зависимости. А это, в свою очередь, напрямую влияет на производительность и эффективность всего конвейера обработки данных. Для измерения производительности обычно используется время выполнения конвейера и объем обработанных данных. Оценить эффективность немного сложнее: ее можно рассматривать как среднее количество времени, в течение которого Spark-исполнители выполняют задание, деленное на его общее время выполнения. Как повысить этот показатель, рассмотрим далее.

Глобальная оптимизация vs локальный оптимум

Итак, чтобы конвейер обработки данных на базе Spark-приложений стал более эффективным, необходимо сократить время простоя исполнителей              . Популярным решением этой проблемы является динамическое выделение (dynamic allocation) ресурсов, когда фреймворк динамически добавляет и удаляет исполнителей задания. За это отвечает параметр конфигурации spark.dynamicAllocation.enabled, установленный в значение True, при котором количество исполнителей, зарегистрированных в приложении, увеличивается или уменьшается в зависимости от рабочей нагрузки. Также необходимо установить в значение True параметры spark.shuffle.service.enabled или spark.dynamicAllocation.shuffleTracking.enabled, связанные с с конфигурациями spark.dynamicAllocation.minExecutors, spark.dynamicAllocation.maxExecutorsspark.dynamicAllocation.initialExecutors и spark.dynamicAllocation.executorAllocationRatio.

При динамическом выделении невозможно точно предсказать, будет ли исполнитель, который будет скоро удален, запускать задачу в ближайшем будущем, или будет ли новый добавленный исполнитель бездействовать. Поэтому нужен набор правил для определения, когда удалять и добавлять исполнителей. Если свойство spark.dynamicAllocation.enabled установлено в значение False, приложение будет удерживать все выделенные ресурсы, даже если они фактически не нужны для обработки. Чтобы избежать блокировки такой ресурсов, рекомендуется устанавливать параметр spark.dynamicAllocation.enabled в значение True.

Однако, динамическое выделение может стать причиной замедления конвейера обработки данных из-за небольшого естественного перекоса в их распределении по разным узлам кластера. Даже небольшие различиями в количестве записей между разделами на уровне 10–20%, вызывают разницу в стоимости вычислений между разными разделами. Исправлять такие перекосы не имеет особого смысла: они в целом оказывают минимальное влияние на время выполнения задания, а сложные схемы перераспределения данных по разделам обычно обходятся дороже в реализации и выполнении.

Анализ данных с Apache Spark

Код курса
SPARK
Ближайшая дата курса
30 мая, 2022
Длительность обучения
32 ак.часов
Стоимость обучения
80 000 руб.

Однако, такой естественный перекос вредит динамическому распределению: по мере приближения задания к завершению, оставшиеся 10-20% задач потребуют дополнительного времени для вычисления. Включенное свойство spark.dynamicAllocation.enabled приведет к тому, что фреймворк завершит 80% простаивающих исполнителей. Но в этом случае распределенное приложение начнет следующее задание с небольшим процентом от первоначального распределения ресурсов, т.е. потребуется дополнительное время на повторное масштабирование, поскольку движок заранее не знает всего объема работы, которую будет выполнять приложение. Как мы уже отметили выше, при вычислении плана выполнения действия нельзя просчитать все заранее. Таким образом, стремление повысить эффективность негативно влияет на скорость обработки данных. Чтобы решить эту проблему, одновременно улучшая время выполнения и эффективность, фреймворк должен с самого начала построить более полный и точный план выполнения запросов.

Решение сводится к введению параллелизма второго порядка. Сам вычислительный движок можно рассматривать как основной механизм параллелизма первого порядка. Чтобы выполнять несколько действий параллельно, нужна конструкция вторичного параллелизма, т.е. второго порядка. Рассмотрим это на примере следующего кода на Scala:

val data = load().map(...).cache()val subset1 = data.filter(...)
val subset2 = data.filter(...)val futures = Futures.sequence(
    Seq(
        Future {
            Table1.save(subset1)
        },
        Future {
            Table2.save(subset2)
        }
    )
)Await.result(futures)

В результате фреймворк будет выполнять команды сохранения параллельно: когда исполнители завершат первое сохранение, они немедленно начнут работу над вторым сохранением и не будут бездействовать, пока оба не будут завершены, и весь объем работы для вашего приложения не будет завершен. Такой прием может повысить эффективность на 10–20% и сократить время выполнения задания, в зависимости от количества действий, которые можно распараллелить. Также это сводит к минимуму время простоя исполнителей и сокращает случаи, когда исполнители освобождаются от задания до его завершения при динамическом выделении ресурсов. А благодаря безопасной работы с потоками в Apache Spark, этот прием не накладывает дополнительных ограничений на его практическое применение.

Узнайте больше про использование Apache Spark для задач дата-инженерии, разработки распределенных приложений и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://towardsdatascience.com/a-decent-guide-to-dataframes-in-spark-3-0-for-beginners-dcc2903345a5
  2. https://zach-ennenga.medium.com/second-order-parallelism-in-spark-based-data-pipelines-54b87024b45a
  3. https://towardsdatascience.com/performance-in-apache-spark-benchmark-9-different-techniques-955d3cc93266