Как ускорить потоковые приложения: 5 способов оптимизации Apache Spark Streaming

Apache Spark Structured Streaming примеры курсы обучение, курсы Apache Spark SQL для разработчиков и дата-инженеров, анализ данных с Apache Spark, аналитика больших данных курсы, криптография в Apache Spark, обучение большим данным для разработчиков и инженеров данных примеры Spark, Школа Больших Данных Учебный Центр Коммерсант

Разработка высоконагруженных систем потоковой аналитики больших данных включает не только написание кода, но и его оптимизацию. Поэтому разработчикам приложений Apache Spark Structured Streaming и дата-инженерам полезно знать, как можно повысить эффективность своих Big Data систем. В этой статье мы рассмотрим конфигурации и приемы, которые могут ускорить пакетные и потоковые вычисления.

Спекуляция задач

Apache Spark имеет функцию спекулятивного выполнения для обработки медленных задач любого этапа из-за проблем с сетью, диском и других факторов окружающей среды. Если какая-то из задач этапа выполняется медленно, драйвер Spark может запустить для нее спекулятивную задачу прогнозирования на другом хосте. Выбирая между обычной и спекулятивной задачей Spark возьмет результат из первой успешно завершенной и уничтожит более медленную.

Спекуляция задач в Apache Spark настраивается с помощью параметра конфигурации speculation, который по умолчанию имеет значение false. Также эта конфигурация имеет другие параметры, которые пригодятся для оптимизации потокового приложения:

  • quantile – доля задач, которые должны быть выполнены, прежде чем спекуляция будет включена для определенного этапа. По умолчанию этот параметр равен 0,75. К примеру, из 600 задач этапа, 10 выполняются слишком медленно из-за внешних факторов. В этом случае параметр speculation.quantile составит примерно 0,98.
  • multiplier – коэффициент замедления, показывает, во сколько раз задача выполняется медленнее медианного значения по этапу. По умолчанию этот параметр конфигурации равен 1,5. Spark вычисляет медианное время выполнения выполненных задач (calculatedMedianRunTime) и использует его в качестве порога, чтобы спекулировать запущенными задачами. Например, медианное время выполнения задач в момент начала спекуляции составляет 1 секунду, а speculation.multiplier равен 2. Тогда Spark будет спекулировать запущенной задачей через 2 секунды (calculatedMedianRunTime*speculationMultiplier).

Чрезмерно низкое значение этих конфигураций может привести к слишком раннему запуску спекуляции, что приведет к ненужному использованию ресурсов. С другой стороны, очень высокое значение этих параметров приведет к тому, что спекуляция вступит в силу слишком поздно и ухудшит производительность приложения, увеличив период микропакета. Запуск спекулятивной задачи не уничтожит исходную, но как только одна из задач (исходная или спекулятивная) будет выполнена, оставшаяся будет уничтожена. Также важно помнить, что спекуляция задач в Apache Spark не поможет в том случае, если замедление вызвано неравномерным распределением данных, их сбросом на диск из-за нехватки оперативной памяти и других внутренних факторов приложения, о которых мы упоминали здесь.

Черные списки исполнителей Apache Spark

Эта возможность позволяет Spark предотвращать планирование задач для исполнителей, которые были занесены в черный список из-за слишком большого количества сбоев, допустимое количество которых определяется в конфигурации blacklist.enabled. По умолчанию значение этого параметра равно false. Алгоритм внесения исполнителей в черный список задается различными конфигурациями. Например, иногда задачи могут запускаться на проблемном исполнителе или узле, который имеет трудности с сетью, диском, является узлом с большой нагрузкой и пр. Эти проблемы исполнителя могут привести к сбою задач, запущенных на нем. Чтобы исключить повторение таких ситуаций, следует настроить параметры черного списка:

  • killBlacklistedExecutors – конфигурация разрешает Spark автоматически уничтожать и воссоздавать исполнителей, когда они занесены в черный список. По умолчанию этот параметр равен false. Когда весь узел добавляется в черный список, все исполнители на нем будут уничтожены. Этот параметр позволит Apache Spark уничтожить исполнителя в случае занесения в черный список, чтобы выделить для приложения нового.
  • application.fetchFailure.enabled – эта конфигурация позволит Apache Spark немедленно внести исполнителя в черный список в случае сбоя. Исключение Fetch Failure Exception, которое случается в Reduce-задаче, указывает на сбой исполнителя при чтении в случайном порядке одного или нескольких блоков данных. Такие сбои могут возникать по нескольким причинам, но чаще всего это случается из-за проблем на локальном диске отдельного узла. Поэтому можно его сразу занести в черный список.

Регулярные выражения

Регулярные выражения для поиска и замены текста в строке, одном или нескольких файлах, активно используются разработчиками и тестировщиками. Они позволяют значительно сократить объем кода. При разработке приложений Spark Structured Streaming для регулярных выражений можно использовать встроенный метод rlike(), реализация которого основана на Java. Но его недостатком является то, что время выполнения не линейно относительно длины ввода. В случае длинных входных данных можно столкнуться с длительным временем оценки регулярного выражения. Есть еще одна реализация механизма регулярных выражений от Google – библиотека re2, написанная на C++. Она работает очень быстро и не зависит от размера входных данных. Однако, re2 не поддерживает все функции регулярных выражений, такие как обратные ссылки или поиск. Кроме того, производительность re2 не всегда лучше, чем Java-реализация регулярных выражений. Можно заменить встроенный в Spark метод rlike() собственной реализацией, которая сначала пытается скомпилировать регулярное выражение с помощью re2, а в случае неудачи вернуться к реализации регулярного выражения Java.

Ожидание локальных задач

Это особенно полезно для stateful-приложений: конфигурация locality.wait, по умолчанию равная 3 секунды, определяет, как долго ждать запуска локальной задачи данных, прежде чем запустить ее на более отдаленном узле. Поскольку Apache Spark является распределенной системой, локальность данных сильно влияет на время выполнения операций над этими данными. Есть несколько типов локальности (процесс, узел, стойка, любой), которые отличаются скоростью выполнения. В частности, локальность на уровне процесса или узла позволяет избежать передачи данных по сети, исключая увеличение длительности микропакета через некоторое время с момента запуска потокового приложения. Если во время выполнения приложения Spark Structured Streaming некоторые исполнители были уничтожены, это приводит к тому, что многие разделы перестают быть локальными для определенного исполнителя. Из-за этого каждой задаче приходится ждать данных, что увеличивает задержку запуска и приводит к перекосу распределения данных.

Например, 100 задач со средним временем выполнения 2 секунды выполняются на 2-х исполнителях с 1 ядром ЦП каждый. Предположим, 90 задач находятся в локальном узле у исполнителя А, а остальные 10 — в локальном узле у исполнителя Б. Если запустить 2 задачи параллельно, то 100 задач будут выполняться около 100 секунд: (100 задач* 2 секунды)/2 ядра. Но в этом случае каждая задача, для которой установлена локальность на исполнителе А, по умолчанию будет ждать 3 секунды для локальности на уровне узла. Поэтому, даже если есть пустой слот на исполнителе B, большинство задач будут ожидать локальности на исполнителе A, что значительно увеличит продолжительность микропакета.

Эту проблему можно исправить, установив значение конфигурации locality.wait равным нулю, чтобы состояние равномерно распределялось между исполнителями, без ожидания ненужной локальности. Подробнее про локальность данных мы писали здесь.

Распределение исполнителей Kubernetes

Если потоковые приложения развернуты поверх Kubernetes, имеет смысл поработать с конфигурацией kubernetes.allocation.batch.size, которая управляет количеством подов в каждом раунде выделения подов исполнителя. По умолчанию она равна 5. Интервал раунда выделения модулей исполнителя настраивается с помощью конфигурации kubernetes.allocation.batch.delay, которая по умолчанию равна 1 секунде. Если несколько приложений Spark Structured Streaming, которые часто масштабируются вверх и вниз, или они активно перезапускаются, эти значения по умолчанию слишком малы. Например, если Spark-приложение имеет 100 исполнителей, драйвер будет запрашивать 5 исполнителей в каждом раунде и будет ждать, пока статус пода исполнителя не изменится с ожидания (PENDING) на выполнение (RUNNING). Если в кластере Kubernetes включено автомасштабирование, эта операция может занять много времени. Поэтому следует заранее настроить эту конфигурацию в соответствии с ограничениями кластера Kubernetes.

Напомним, в Kubernetes горизонтальное автомасштабирование выполняется с помощью HorizontalPodAutoscaler (HPA), который автоматически развертывает больше подов при увеличении рабочей нагрузки. Это отличается от вертикального масштабирования – выделения большего количества ресурсов (памяти или ЦП) уже запущенным и работающим подам. При снижении нагрузки, если количество подов больше настроенного минимума, HPA дает указание ресурсу уменьшить масштаб. Сам HPA реализован как ресурс Kubernetes API и контроллер. Ресурс определяет поведение контроллера, который периодически корректирует желаемый масштаб в зависимости от средних значений системных метрик (использования ЦП, потребление памяти и пр.). Подробно про автомасштабирование в Kubernetes мы писали здесь.

Освойте администрирование и использование Apache Spark для задач дата-инженерии, разработки распределенных приложений и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://medium.com/@roeyshemtov1/5-tips-for-optimizing-spark-structured-streaming-applications-5ceb568a46e1
  2. https://github.com/google/re2
  3. https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/
Поиск по сайту