Как Spark-приложению выполнять миллионы операций в секунду с данными в AWS S3

Автор Категория , ,
Как Spark-приложению выполнять миллионы операций в секунду с данными в AWS S3

Чтобы сделать курсы Hadoop и Spark для инженеров данных еще более интересными, сегодня мы рассмотрим кейс фудтех-компании iFood – лидера рынка доставки еды в странах Латинской Америки. Читайте далее, в чем проблема быстрых операций со множеством файлов в облачном хранилище Amazon S3 и как ее решить с помощью префиксов корзины и методов API-интерфейса RDD Apache Spark.

Что не так с UDF-функциями Apache Spark в AWS S3

Когда требуется перемещать более миллиона файлов и десятков терабайт данных в объектное хранилище AWS S3, возникает проблема скорости выполнения этих операций. С этим столкнулись дата-инженеры латиноамериканской фудтех-компании iFood, используя определяемые пользователем функции (UDF) Apache Spark для преобразования миллиардов строк. Сперва инженеры данных рассматривали подход с инкапсуляцией операций AWS S3 внутри своих UDF-функций, чтобы затем использовать датафрейм с путями к файлам S3 в рамках распределенной среды Spark для горизонтального масштабирования задачи. Недостатком этого подхода является невозможность распараллеливания сеанса Spark на несколько узлов кластера, при том, что операции S3 с файлами (GET, PUT, DELETE и COPY) позволяют выполнять эти запросы очень быстро за счет их распараллеливания.

Но просто запустить один глобальный сеанс (Session, сессия) с AWS в драйвере и разделить его между разными узлами для работы с разных IP-адресов – это небезопасно. Такое решение будет постоянно вызывать соответствующие исключения. Альтернативой является создание экземпляра сеанса внутри UDF, который будет применен к датафрейму. Переменные UDF создаются внутри узла, на который была вызвана функция. Каждый узел запускает свои собственные сеансы, что приводит к их росту в глобальном масштабе: одна сессия на одну строку датафрейма. А если речь идет о миллиардах строк, как в случае iFood, то многократно увеличиваются накладные расходы на запуск десятков миллиардов сеансов с AWS. Таким образом, дата-инженерам фудтех-компании пришлось искать баланс между сессиями и параллелизмом Apache Spark. Для этого они использовали простое, но эффективное решение, спустившись на уровень структур данных этого Big Data фреймворка [1].

UDF, сессии, разделы и методы Spark RDD

Чтобы прояснить, как связаны сеансы и параллелизм, вспомним о структурах данных Spark и их API-интерфейсах. Изначально в этом фреймворке были достаточно простые абстракции – устойчивые распределенные наборы данных (Resilient Distributed Dataset, RDD), на базе которых в версиях 1.3 и 1.6 были реализованы DataFrame и DataSet соответственно. Подробнее о том, чем похожи и отличаются RDD c DataFrame и DataSet, мы писали здесь и здесь. Сегодня именно DataFrame считается самой популярной структурой данных, которую чаще всего используют разработчики Apache Spark.

Тем не менее, хотя DataSet и DataFrame предоставляют разработчику более высокий уровень абстракции с экономией места на диске и в памяти, RDD предлагает глубокий контроль за счет низкоуровневых функциональных возможностей обработки данных. В частности, за счет возможности управлять партиционированием записей в разделах (partition), которые являются базовыми единицами параллелизма. Об этом мы рассказывали здесь и здесь. Метод .mapPartitions() позволяет применять UDF не по строкам, а по разделам RDD. Это сразу решает проблему миллиарда сеансов, т.к. количество сеансов равно количеству разделов. Число разделов разработчик Spark-приложения может определить самостоятельно, используя метод .repartition() в DataFrame API.

Обратной стороной этого решения является необходимость поиска компромисса: меньше разделов – это не только меньше сессий, но и меньше параллелизма. Кластеры Apache Spark могут обрабатывать множество разделов одновременно, в зависимости от текущего состояния параллелизма и количества ресурсов в целом: ядер ЦП на всех узлах и запущенных заданий. Ограничение числа разделов до максимальной емкости кластера может привести к снижению общей скорости вычислений [1]. Поэтому необходим баланс с учетом возможностей AWS S3, об ограничениях которых мы поговорим далее.

Ограничения и возможности AWS S3

На практике при работе с AWS S3 можно столкнуться с ошибкой 503 Slow Down, которую возвращает корзина этого объектного хранилища. Решить эту проблему можно с помощью механизма повтора с экспоненциальным откатом. Если ошибка сохраняется после выполнения повторных попыток, можно постепенно увеличить рабочие нагрузки запросов S3, распределив объекты и запросы между несколькими префиксами. Вообще возможности AWS S3 ограничены следующим числом запросов на операции с файлами [2]:

  • 3500 запросов PUT/COPY/POST/DELETE;
  • 5500 запросов GET /HEAD в секунду на префикс в сегменте S3.

Однако, число префиксов в корзине никак не ограничено. Приложения могут легко выполнять тысячи транзакций в секунду при выполнении запросов для загрузки и извлечения данных из хранилища Amazon S3, которое автоматически масштабируется до высокой частоты запросов. Повысить производительность чтения или записи можно за счет распараллеливания чтения, например, создав 10 префиксов в корзине S3 для распараллеливания операций чтения, его производительность масштабируется до 55 000 запросов в секунду. При этом не требуется случайный порядок именования префиксов для повышения производительности: они могут быть названы последовательно на основе даты [3].

С учетом возможностей разделения корзины на префиксы, вернемся к кейсу iFood. Пределы 3500 запросов PUT/COPY/POST/DELETE и 5500 запросов GET/HEAD в секунду на каждый префикс в корзине никогда не будут достигнуты, если разделить датафрейм по префиксам файлов при относительно сбалансированных разделах. Таким образом, ядро ЦП каждого узла будет работать с одним префиксом, избегая одновременных запросов на один и тот же префикс. Преимущество такого простого подхода в том, что его можно применить к любым сценариям запросов API [1].

Однако, ограничения на число запросов к AWS S3 – не единственная проблема, с которой столкнется разработчик Spark или Hadoop при отправке данных и заданий в облачное объектное хранилище. О том, какие еще трудности с файловыми операциями могут возникнуть в этом случае и как их решить с помощью коммиттеров S3A, мы поговорим завтра. А про основы разработчик Spark-приложений читайте в нашей новой статье.

Освойте практику разработки высокоскоростных приложений аналитики больших данных с Apache Spark и AirFlow на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://towardsdatascience.com/leveraging-apache-spark-to-execute-billions-of-operations-on-aws-s3-2f62930d19fd
  2. https://aws.amazon.com/ru/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/
  3. https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html