От AWS EMR к Apache Spark 3 на Kubernetes в маркетплейсе Joom

Автор Категория , ,
От AWS EMR к Apache Spark 3 на Kubernetes в маркетплейсе Joom

Развивая наши курсы по Apache Spark и AirFlow для дата-инженеров и администраторов кластеров, сегодня рассмотрим кейс крупного маркетплейса Joom по переходу от 2-ой версии фреймворка на облачной платформе EMR к развертыванию сотен распределенных заданий на 3-ей версии в Amazon Elastic Kubernetes Service. Про сокращение расходов, повышение производительности и апдейт вычислительных движков.

Постановка задачи: история миграция

В 2021 году дата-инженеры Joom перенесли все Spark-задания со второй версии фреймворка с YARN в Amazon EMR на третью версию, также развернутую в облаке AWS на сервисе Elastic Kubernetes. Изначально компания работала на кластере AWS EMR, где были настроены Hadoop HDFS, YARN, Hive, Spark, HBase и Zeppelin. По мере развития бизнеса и стандартизации Spark в качестве основной Big Data технологии у этого решения появились некоторые недостатки:

  • высокие затраты. С AWS EMR нужно платить за инстансы EC2 и за саму платформу EMR. Можно использовать спотовые инстансы, чтобы снизить затраты на EC2, но тогда дополнительная плата за EMR может увеличить общий счет на 50 %. Кроме того, в кластере EMR всегда должны быть постоянные главные и основные узлы, которые увеличивают стоимость, но не сильно повышают производительность. Apache Spark с открытым исходным кодом позволит платить только за необходимые экземпляры EC2.
  • Сложности интеграции Spark с инструментами анализа данных, такими как Apache Zeppelin, Jupyter и пр. При запуске их вне EMR, нельзя напрямую создавать сеансы Spark, что снижает удобство использования. Если запускать их внутри EMR, настройка становится очень запутанной, и автомасштабирование практически невозможно. Альтернатива в виде развертывания Zeppelin и Jupyter в Kubernetes делает возможным автомасштабирование по мере необходимости и прямое создание сеансов Spark. Например, в Jupyter можно запустить запрос Spark, преобразовать его результат в датафрейм Pandas и приступить к обучению ML-моделей в одной ячейке, что удобно для специалистов по Data Science.
  • отсутствие Delta Lake, доступного в Spark 3. В частности, инженерам Joom балы необходима функция nested schema pruning, которая значительно ускоряет чтение нескольких столбцов из Parquet-файлов с вложенной схемой. Ожидать официальный релиз EMR с 3-ей версией фреймворка было слишком долго.

Таким образом, перед дата-инженерами и ИТ-архитекторами маркетплейса встала задача перевода существующих заданий Spark из AWS EMR на Elastic Kubernetes Service (EKS), управляемый контейнерный сервис для запуска и масштабирования приложений Kubernetes в облаке и локальной среде. Как это было реализовано и почему пришлось разрабатывать собственные альтернативы некоторым готовым решениям, рассмотрим далее.

Архитектура нового решения

В Kubernetes активные контроллеры берут YAML-описания заданий и запускают их. Для Spark используется соответствующий оператор от Google Cloud Platform, который берет описания приложений и создает поды драйверов. Хотя это незначительное преимущество по сравнению с запуском spark-submit вручную, оператор может перехватывать создание подов исполнителей Spark и изменять их, чтобы применить некоторые специфические свойства расписания. К примеру, сходства (affinity) и допуски (tolerations), которые нельзя указать напрямую через конфигурацию Spark, а использовать шаблон пода не очень удобно.

При том, что Spark Operator имеет готовую интеграцию с Airflow, она не позволяет указать YAML-файл и заменить в нем переменные шаблона. Поэтому разработчики Joom  написали собственную высокоуровневую интеграцию Airflow, которая:

  • предоставляет большинство свойств Spark как именованные параметры;
  • содержит общие параметры для каждой команды, которые могут использоваться отдельными узлами Airflow для удобного применения параметров;
  • собирает и сообщает об окончательном состоянии всех исполнителей Spark, включая сбои из-за нехватки памяти;
  • имеет прямые ссылки из пользовательского интерфейса Airflow на наглядные журналы и панели мониторинга.
Spark AirFlow Kubernetes EKS
Архитектура нового решения в Joom

Во время работы приложения YARN предоставляет ссылку на пользовательский интерфейс Spark. После выхода из приложения ссылка меняется на сервер истории Spark. Во время работы приложения логи исполнителя доступны с физических узлов через пользовательский интерфейс YARN. После выхода из приложения журналы собираются YARN в HDFS и становятся доступны. В Kubernetes требуется хранилище журналов. В Joom для ведения логирования сервисов используется Loki с Grafana. Чтобы повысить наглядность мониторинга применяется Filebeat для отправки логов подов в Kafka и небольшое Spark-задание для их хранения в AWS S3.

Поскольку пользовательский интерфейс и журналы Spark для запущенных и завершенных приложений расположены в разных местах, для предоставления стабильных URL-адресов и корректного перенаправления используется небольшой настраиваемый сервис. Чтобы облегчить разработку и запускать задания без создания DAG Airflow была расширена служба диспетчера Spark. К Spark Manger добавлен API запуска, который принимает JAR-файл и параметр задания, создает объект Spark Application Kubernetes и передает его выходные данные обратно. Также добавлена пользовательская задача Gradle для отправки JAR-пакета через этот API.

Для анализа данных в Kubernetes также были развернуты Apache Zeppelin и Jupyter Notebook. Zeppelin поддерживает нескольких интерпретаторов в одном блокноте, позволяя использовать Scala Spark, PySpark, Clickhouse и Athena вместе. Также эта среда обеспечивает автоматическую визуализацию данных и динамические формы ввода, которые можно изменить для повторного запуска запросов.

Zeppelin в AWS EMR позволяет создавать Spark-сессии, но AWS EMR использует модифицированный Spark и не позволяет копировать его за пределы кластеров EMR.  В EKS эта проблема отсутствует, однако при миграции на Kubernetes пришлось исправлять ошибки пользовательского интерфейса в кодовой базе Angular 1, которые вызывали нарушение иерархии представления записных книжек.

Аналогичные ограничения коснулись и Jupyter Notebook. Сперва JupyterHub, работающий в Kubernetes, не мог напрямую создавать сеансы Spark. Обходом стало применение SparkMagic для запуска запросов через Livy, а затем в другой ячейке копировать данные из кластера EMR в Python. Напомним, Sparkmagic – это набор инструментов для интерактивной работы с удаленными кластерами Spark через RESTful API Livy в ноутбуках Jupyter. Проект Sparkmagic включает набор инструментов для интерактивного запуска кода Spark на нескольких языках, а также несколько ядер, которые можно использовать для превращения Jupyter в интегрированную среду Spark. После миграции на EKS можно напрямую запустить код PySpark и собрать результаты в датафрейм Pandas.

Проблемы миграции и их решения

Описанный опыт миграции с AWS EMR на EKS с 3-ей версией Apache Spark и настраиваемым запуском контейнерных приложений Kubernetes в целом оказался удачным и решил первоначальные проблемы. Однако, в процессе эксплуатации были обнаружены некоторые сложности. В частности, фрагментация узлов кластера. В случае инстансов разного размера можно оказаться в ситуации, когда инстанс 8xlarge с 32 виртуальными ЦП остается в рабочем состоянии из-за того, что один опаздывающий исполнитель использует 4 виртуальных ЦП. Эта проблема усугубляется поведением планировщика Kubernetes по умолчанию, который выбирает узлы случайным образом. Обойти это позволяет привязка подов, чтобы модули из отдельного задания Spark планировались вместе.

Также возникла проблема со спотовыми инстансами – резервными ресурсами AWS, которые доступны с большой скидкой, но их запуск может завершиться неудачей или обрывом без уведомления. Если спотовый узел с исполнителем Spark прерывается, все данные на нем теряются, и Spark приходится перезапускать соответствующие задачи. AWS EC2 предоставляет уведомление за 2 минуты до прерывания, и Spark 3 тоже имеет функцию вывода исполнителя из эксплуатации (Graceful Decommission of Executors), которая может перемещать все данные другим исполнителям или в S3. Но на практике включение этой функции приводит к частому повреждению данных. В лучшем случае у исполнителя есть 120 секунд на вывод из эксплуатации, и он может перемещать данные со скоростью 150 МБ/с. Это добавляет до 18 ГБ данных, но в масштабе больших объемов такого недостаточно. Поэтому происходит точечное прерывание, которое может привести к повторным попыткам выполнения задачи.

Впрочем, неудачи при запуске оказались более серьезными. К примеру, разные команды инженеров выполняют несколько больших заданий на своих собственных группах узлов, но используют один и тот же тип инстанса в одной зоне доступности. Если закончатся спотовые инстансы, каждое задание может получить слишком мало ресурсов для успешного завершения, а некоторые из них завершатся ошибкой. Можно надеяться, что по мере завершения заданий масштабы групп узлов уменьшатся, а спотовые узлы будут возвращены в EC2, готовые к выделению другим группам узлов. Но автомасштабирование Kubernetes по умолчанию так не работает. Как только группа узлов A запрашивает узел и получает отказ при запуске, автомасштабирование отключает уменьшение масштаба во всех других группах узлов. Иначе говоря, даже если у группы узлов B нет заданий, ее узлы не будут освобождены и никакие ресурсы не вернутся в EC2, пока A все еще ждет.

Чтобы решить эту проблему, дата-инженерам Joom пришлось создать несколько инстансов Cluster Autoscaler, каждый из которых управляет своей собственной группой узлов. Так наличие двух десятков групп узлов на команду стало неуправляемым, поэтому в перспективе планируется рассмотреть более продвинутые средства автомасштабирования, такие как Karpenter.

Таким образом, даже при базовой настройке перенос задания из Spark 2 с Yarn/EMR на третью версию в Kubernetes Spark 3 требует времени и может выявить проблемы. Однако, такая миграция позволила Joom вдвое снизить расходы на кластер, повысить производительность, сделать использование Jupyter и Zeppelin удобным и использовать возможности последней версией Apache Spark. В рамках миграции были настроены поды, чтобы обеспечить быстрый запуск новых пользовательских сеансов и включено динамическое распределение Spark, чтобы предоставить пользователям больше вычислительных ресурсов для ускоренного выполнения аналитических задач.

Читайте в нашей следующей статье, как организовать удобный мониторинг за приложениями Apache Spark в кластере Kubernetes с помощью Prometheus и Grafana, вместо наглядных дэшбордов AWS EMR с Java-библиотекой Dropwizard Metrics и средством настройки оповещений Alertmanager. Подобный кейс читайте в нашей новой статье.

Больше подробностей про применение Apache Spark и AirFlow для задач дата-инженерии, разработки распределенных приложений и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://medium.com/@vladimir.prus/spark-on-kubernetes-in-2022-32458999e831
  2. https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
  3. https://github.com/jupyter-incubator/sparkmagic#installation
  4. https://spark.apache.org/docs/latest/job-scheduling.html