Еще 4 полезных совета по Apache Spark для разработчиков и дата-аналитиков

Автор Категория ,
Еще 4 полезных совета по Apache Spark для разработчиков и дата-аналитиков

Сегодня в рамках обучения дата-аналитиков и разработчиков Spark-приложений, рассмотрим еще несколько особенностей этого фреймворка. Почему count() работает по-разному для RDD и DataFrame, как отличается уровень хранения при применении метода cache() для этих структур, когда использовать SortWithinPartitions() вместо sort(), а также парочка тонкостей обработки Parquet-таблиц в Spark SQL и кэширование метаданных в Apache Hive.

Полное кэширование и избирательность count() для разных структур данных Apache Spark

Как мы уже упоминали, вычислительные операции в Apache Spark можно разделить на 2 вида:

  • преобразования (transformations)– отложенные или ленивые вычисления, которые фактически не выполняются сразу, а после материализации запроса и вызове какого-либо действия. При этом создается план запроса, но сами данные все еще находятся в хранилище и ожидают обработки.
  • действия (actions)– функции, запрашивающие вывод. При создается план запроса и оптимизируется оптимизатором Spark, а физический план компилируется в RDD DAG, который делится на этапы (stages) и задачи (tasks), выполняемые в кластере. Оптимизированный план запроса генерирует высокоэффективный Java-код, который работает с внутренним представлением данных в формате Tungsten.

Кэширование данных с помощью методов cache() или persist(), о разнице между которыми мы рассказывали здесь, – это преобразование. Без вызова действия в кэшированном датафрейме, оно не будет материализовано. Однако, реализация этого принципа отличается в разных структурах данных Spark. Например, функция count(), которая вычисляет количество строк, примененная к RDD, где отсутствует оптимизатор Spark SQL Catalyst, будет оценивать каждую строку. А высокоуровневые структуры данных с более гибким API DataFrame и Dataset активно используют возможности оптимизатора Catalyst по умолчанию. Поэтому здесь метод count() работает более избирательно подобно SQL-запросу select count(*) from … без полного сканирования данных, если это поддерживает колоночный формат файла, например, Parquet. Тем не менее, кэширование в DataFrame и Dataset работает не частично, а полностью, распространяясь на всю структуру данных [1].

Примечательно, что метод cache() работает по-разному для RDD и DataFrame, отличаясь уровнем хранения (STORAGE_LEVEL). По умолчанию для RDD кэшированные данные хранятся только в памяти (MEMORY_ONLY), а для DataFrame – в памяти и на диске (MEMORY_AND_DISK) [2]. Подробнее об этом мы рассказывали в отдельной статье.

Тонкости обработки столбцов в датафрейме: пара полезных функций

Чтобы получить количество пустых записей во всех столбцах датафрейма можно применить функцию isNull() для каждого из них или использовать короткую команду:

df.select(df.columns.map(c => sum(col(c).isNull.cast(“int”)).alias(c)): _*).show()

О том, как удалить дубликаты в столбцах датафрейма, мы писали здесь. Из всех возможных методов удаления дублей в Apache Spark чаще всего на практике используются функции distinct() и DropDuplicates(). При том, что они обе удаляют повторяющиеся значения, для их корректного применения нужно знать некоторые особенности. Основное различие между Distinct() и DropDuplicates()  заключается только в подмножестве столбцов. В Spark SQL методу Distinct() нужно передать столбцы в SELECT-запросе до вызова функции удаления дублей. А dropDuplicates(colNames) вернет все столбцы в датафрейме, удалив дублирующиеся записи в указанных столбцах.

Особенности партиционирования

Поскольку Apache Spark является распределенным фреймворком, обычные аналитические функции дополняются методами, ориентированными на работу с разделами. В частности, SortWithinPartitions() отличается от метода сортировки sort() и применяется в каждом из разделов аналогично SORT BY в Apache Hive. Чтобы ускорить обработку данных, иногда целесообразно сперва выполнять сортировку внутри каждого раздела перед другими преобразованиями [2].

Уменьшить количество разделов поможет метод сoalesce(), который объединяет существующие разделы, чтобы избежать полного перемешивания данных. Coalesce() создает разделы разных размеров, т.е. с разным объемом данных, принимая только целое число в качестве аргумента. Можно рассматривать coalesce() как вариант repartition(), где для свойства SHUFFLE установлено значение FALSE. Однако, метод repartition() позволяет указать количество разделов, а также имя столбца, по которому будет идти перераспределение данных по узлам кластера. При перераспределении по конкретному столбцу используется свойство spark.sql.shuffle.partitions, по умолчанию равное 200. Это похоже на функцию DISTRIBUTE BY в Apache Hive.

Таким образом, с помощью repartition() в Spark можно уменьшить или увеличить количество разделов в датафрейме, повысив уровень параллелизма. При этом будет выполнено полное перемешивание данных и созданы новые разделы одинакового размера. Подробно о том, как работают coalesce() и repartition()  читайте в этом материале.

Spark SQL и данные в формате Parquet

Spark SQL кэширует метаданные Parquet для повышения производительности. Когда включено преобразование Parquet-таблицы, метаданные которой хранятся в Apache Hive, то эти метаданные также кэшируются. Если эти таблицы обновляются с помощью Hive или других внешних инструментов, необходимо обновить их вручную, чтобы обеспечить согласованность метаданных.

Примечательно, что подобно Protocol Buffer, Avro и Thrift, формат Parquet также поддерживает эволюцию схемы. Можно начать с простой схемы и постепенно добавлять в нее дополнительные столбцы по мере необходимости, чтобы получить несколько Parquet-файлов с разными, но взаимно совместимыми схемами. Источник данных Parquet будет автоматически определять их и объединять схемы всех этих файлов. Однако, объединение схем – относительно дорогая операция и в большинстве случаев не является обязательной. Поэтому начиная с версии 1.5.0 она отключена по умолчанию. Включить ее можно, установив в значение true один из следующих параметров [2]:

  • свойство источника данных mergeSchema при чтении файлов Parquet;
  • задать глобальный параметр sql.parquet.mergeSchema в настройках Spark SQL.

Читайте в нашей следующей статье, почему ApacheSpark плохо справляется с обработкой множества маленьких файлов и как это исправить. А еще больше практических подробностей эксплуатации Apache Spark для разработки распределенных приложений и аналитики больших данных вам помогут узнать специализированные курсы в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

 

Источники

  1. https://medium.com/analytics-vidhya/lesser-known-facts-short-cuts-in-spark-part1-77596e367676
  2. https://ajithshetty28.medium.com/lesser-known-facts-short-cuts-in-spark-part2-4dc801a83dfb