5 лайфхаков по Apache Spark для разработчиков и дата-аналитиков

Автор Категория ,
5 лайфхаков по Apache Spark для разработчиков и дата-аналитиков

Специально для разработчиков распределенных приложений, Data Scientist’ов и аналитиков больших данных, работающих с Apache Spark, в этой статье мы собрали несколько полезных советов по ежедневным операциям в этом фреймворке. Читайте далее, как добавить библиотеку TypeSafe в файл sbt-конфигурации Spark-приложения, получить датафреймы из JSON-массивов и структур, а также обработать CSV-формат с разделителем из нескольких символов и отсутствующими значениями.

Про сборку Spark-приложений с SBT и файл конфигурации Typesafe

Хотя официальным инструментом сборки приложений Apache Spark является Maven, у разработчиков также распространена его альтернатива – open-source проект SBT для проектов Scala и Java. Главной особенностью SBT является встроенная поддержка компиляции кода Scala и интеграции со многими тестовыми Scala-платформами. Поэтому он широко используется для разработки распределенных приложений, ускоряя итеративную компиляцию. Сборка SBT основана на файлах объектной модели проекта Maven (Project Object Model, POM), поэтому для управления сборкой SBT можно установить те же профили и переменные Maven [1].

Поскольку Spark-приложение зависит от API этого фреймворка, в конфигурацию приложения необходимо включать файл sbt-конфигурации, build.sbt, который содержит эти зависимости. Для этого следует расположить Spark-приложение и build.sbt согласно типовой структуре каталогов, чтобы затем создать JAR-пакет с кодом приложения и использовать скрипт spark-submit для его запуска [2].

Конфигурации приложения позволяют использовать его с разными параметрами для различных сред: разработки, тестирования и production. Для этого активно используется open-source библиотека Typesafe Config, которая обеспечивает типобезопасность и поддерживает JVM-языки (Scala, Java), используемые в т.ч. и в Spark-приложениях [3]. Например, следующие действия помогут добавить библиотеку TypeSafe в файл build.sbt, а также создать и прочитать файл conf [4]:

  • создать файл application.conf, который содержит конфигурации Spark-приложения в виде пары ключ-значение, например,

app {

appname: “Test app”

owner : “Test app owner”

}

  • добавить типизированную зависимость в файл build.sbt

libraryDependencies ++= Seq(

“com.typesafe” % “config” % “1.3.3”

)

  • загрузить ранее созданный файл application.conf

import com.typesafe.config.{Config, ConfigFactory}

val applicationConf: Config = ConfigFactory.load(“appliccation.conf”)

  • прочитать файл конфигурации

val appname = applicationConf.getString(“app.owner”)

print(appname)

Разделители в CSV и 3 способа обработать NULL-значения

Одним из наиболее распространенных форматов, с которыми имеет дело Data Scientist, является CSV (Comma Separated Value). Однако, обработать этот текстовый формат для представления табличных данных, значения в котором разделены запятыми, иногда бывает не просто, т.к. в качестве разделителя запятая может использоваться не только 1 символ запятой, но и несколько символов, например, ||. В более ранних версиях Apache Spark (до 3.0) применение нескольких символов в качестве разделителя было запрещено. К примеру, следующий код

val df  = sparkSession.read

      .option(“delimiter”,”||”)

      .option(“header”,”true”)

      .csv(“src/main/resources/multicharacterseperator.csv”)

Вызывал ошибку, выбрасывая исключение throws java.lang.IllegalArgumentException: Delimiter cannot be more than one character: ||

Однако, начиная с версии Apache Spark 3.0, это исправлено и теперь можно использовать несколько символов в качестве разделителя значений в файле CSV-формата [5].

Впрочем, это не единственная проблема, с которой может столкнуться Data Scientist при обработке наборов Big Data с целью аналитики больших данных средствами Spark. Например, очень часто датасеты содержат столбцы с пропущенными значениями, что неприемлемо для моделирования и анализа. Самыми простыми способами обработать отсутствующие значения в наборе данных средствами Apache Spark являются следующие [6]:

  • Drop() – отбрасывает строки, содержащие значения NULL или NaN;
  • fill() – возвращает новый датафрейм, который заменяет значения NULL или NaN в столбцах, указанных в аргументе этой функции;
  • coalesce() – возвращает первые ненулевые значения из набора столбцов. Функция coalesce(e: Column*): Column возвращает первое ненулевое значение среди заданных столбцов или NULL. Для coalesce() требуется хотя бы один столбец, а если их несколько, то все они должны быть одного или совместимого типа. Внутри coalesce создает столбец с выражением Coalesce: дочерние элементы являются выражениями входного столбца [7].

Примеры применения этих функций к небольшому датафрейму на Spark Scala подробно показаны в источнике [6].

Пара тонкостей аналитики больших данных в JSON-файлах с примерами на Spark Scala

JSON – также очень популярный формат представления данных, с которым часто работают Data Scientist’ы и аналитики больших данных. Несмотря на его дружественность к человеку и достаточное простое представление информации в виде пары ключ/значение, автоматическая обработка JSON-файлов в Apache Spark довольно утомительна. В частности, нужно распарсить данные, извлечь требуемые значения и перевести их в структуру данных для дальнейшей обработки. Рассмотрим типовой пример анализа данных в JSON-файле по названием JSFileData1.json. Пусть данные в нем имеют следующую структуру:

{
“student_name”:”Ivan Ivanov”,

 “DOB” : “01–01–2000”

“address”:{“city”:”Moscow”},

“contact_number”:{“primary”:1234567,”secondary”:890 }

}

Следующая команда на Scala поможет прочитать этот файл и создать датафрейм:

val tempDF:DataFrame=sparkSession.read.option(“multiline”, “true”).json(“Data/JSFileData.json”)

Опция multiline позволяет работать с сообщениями, которые занимают более одной строки.

Далее следует посмотреть схему данных и проверить их типы. В частности, в этом примере поле contact_number является структурой из нескольких столбцов. Определить количество этих столбцов и их типы данных поможет следующий код на Spark Scala [8]:

val columns:Array[String]=tempDF.columns;

var col_names:ListBuffer[String]=ListBuffer()

for(col<-columns)

{

if(tempDF.schema(col).dataType.isInstanceOf[StructType])

{

               for(field1<-tempDF.schema(col).dataType.asInstanceOf[StructType].fields)

{    

 col_names += col+”.”+field1.name

                      }

}

Else

col_names += col

}

Далее можно заменить в полученном датафррейме точку другим символом, разделяющим различные столбцы друг от друга с помощью команды

val colList=col_names.map(name=>col(name).as(name.replace(‘.’,’_’)))

Наконец, чтобы просмотреть схему данных и сам датафрейм, сперва следует выбрать сформированные столбцы:

val outputDF=tempDF.select(colList:_*)

outputDF.printSchema()

outputDF.show()

А если одно из полей исходного JSON-файла содержит массив значений, его нужно сперва распределить по структурам с помощью функции explode(), а затем получить из этих структур данных столбцы с однотипными значениями. Explode принимает столбец, состоящий из массивов, и создает одну строку для каждого значения в массиве.

Например, структура данных в исходном JSON-файле JSFileData2.json выглядит следующим образом [9]:

{
“clients”:[{“name”: “Ivan”,”surname”:”Ivanov”,”phone”: 1234567},

{“name”: “Petr”,”surname”:”Petrov”,”phone”: 9876543},

{“name”: “Alex”,”surname”:”Alexov”,”phone”: 97531}]

 }

Создадим исходный датафрейм с данными из файла JSFileData2.json:

val rawDF:DataFrame= sparkSession.read.option(“multiline”,”true”).json(“data/ JSFileData2.json”);

 

Применим функцию explode() к массиву clients:

val tempDF:DataFrame=rawDF.select(explode(col(“clients“)).as(“clients”))

 

В полученной структуре следует определить количество столбцов и их типы данных, а затем разделить их значения друг от друга, как мы рассматривали выше, чтобы сформировать итоговый датафрейм для анализа:

val finalDF:DataFrame=tempDF.select(col(“clients.*”))

О других полезных тонкостях обработки данных в рамках этого фреймворка читайте в нашей новой статье. А все узнать практические подробности эксплуатации Apache Spark для разработки распределенных приложений и аналитики больших данных вам помогут специализированные курсы в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

 

 

Источники

  1. https://spark.apache.org/docs/latest/building-spark.html
  2. https://spark.apache.org/docs/latest/quick-start.html
  3. https://github.com/lightbend/config
  4. https://parmanand.medium.com/how-to-read-a-config-file-in-spark-scala-sbt-680c27dea660
  5. http://blog.madhukaraphatak.com/spark-3-introduction-part-1/
  6. hhttps://parmanand.medium.com/handling-null-values-in-spark-scala-ec0744b541af
  7. https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-functions-regular-functions.html#coalesce
  8. https://parmanand.medium.com/how-to-handle-nested-json-using-scala-spark-fe7bf8838a12
  9. https://parmanand.medium.com/explode-function-in-spark-scala-1aa050eb3b2a