Парсинг JSON-файлов в Apache Spark

Apache Spark для аналитиков данных и дата-инженеров примеры курсы обучение, курсы примеры обучение Spark SQL PySpark, обучение Spark курсы, примеры Spark парсинг JSON, примеры Spark для разработчиков курсы обучение, обучение большим данным, Школа Больших Данных Учебный Центр Коммерсант

Недавно мы писали про сложности обработки вложенных структур данных в JSON-файлах при работе с Apache Hive и Spark. В продолжении этой темы про парсинг, сегодня поговорим, как быстро преобразовать данные формата JSON в простой читаемый файл CSV или плоскую таблицу, чтобы анализировать их с помощью типовых методов DataFrame API или запросов Spark SQL.

Типы и структуры данных в JSON

На практике парсинг или синтаксический анализ данных – одна из самых частых задач дата-инженера и аналитика данных. Например, необходимо определить некоторые особенности потребительского поведения, проанализировав данные о действиях пользователя сайта. Это можно сделать, проанализировав исходные данные о событиях пользовательского поведения из систем веб-аналитики в полуструктурированном формате JSON.

Но прежде чем перейти к парсингу сложных структур данных в JSON-файлах, вспомним ключевые принципы этого формата. JSON (JavaScript Object Notation) – это краткая форма нотации объектов JavaScript в виде текста из упорядоченных наборов пар ключ-значение, разделенных запятой. Благодаря понятности для человека, легкости, совместимости с большинством языков и API, а также легковесности при передаче данных по сети этот формат сегодня очень популярен.

Ключом может быть только строка, а в качестве значений в JSON могут выступать следующие типы данных:

  • целое или вещественное число (Number). {«age» : 63, «height»:6.5}
  • логические переменные (Boolean) – true (истина), false (ложь). {«deadFlag» : false}
  • пустое значениеnull;
  • строка (String)– упорядоченное множество из символов юникода в «». Символы могут быть указаны с использованием escape-последовательностей, начинающихся с \, \”, \\, \/, \t, \n, \r, \f и \b, или записаны шестнадцатеричным кодом в кодировке Unicode в виде \uFFFF. {«firstName» : «Jason Vorhees»}
  • одномерный массив (Array)– упорядоченное множество однотипных или разнотипных значений, разделенных запятыми. Массив заключается в []. Значения разделяются запятыми. К примеру, массив может выглядеть так: {
    «victims» : [«Alice Hardy», «Whitney Miller», «Sandra Dier»]
    }
  • запись или объект (Object)– неупорядоченное множество пар ключ/значение, отделенных друг от друга запятыми. Каждая запись заключается в {}. Ключ описывается строкой, между ним и значением стоит двоеточие. Пример объекта: {
    ”stats” : {”firstName” : ”Jason Vorhees”, “age” : 41,
    “victims” : [”Alice Hardy”, “Whitney Miller”]}
    }

Каждая строка должна содержать отдельный автономный действительный объект JSON. Благодаря поддержке различных типов данных, JSON позволяет создавать таблицы со структурами, которые упрощают доступ к ним. Для работы с JSON-файлами в Apache Spark есть готовые функции, некоторые из них мы рассмотрим далее, уделив особое внимание синтаксическому анализу содержимого.

Функции Apache Spark для чтения сложных структур данных

Следующий код на Spark Scala читает содержимое исходного JSON-файла в датафрейм и отображает его на экране:

val sample_one_raw = spark.read
.format("json")
.load("dbfs:/mnt/slasher/parsing/json_sampleOne.json")
display("sample_one_raw")

Spark разделил все строковые данные JSON верхнего уровня на отдельные атрибуты, но сложные типы данных по-прежнему отображаются в виде структур String, Array или Object. Чтобы разделить их и преобразовать в отдельные атрибуты, можно написать собственную функцию с помощью Spark SQL. Однако, даже после преобразований некоторые данные имеют несколько строк в каждой строке. Чтобы прочитать их, следует включить многострочный режим:

val sample_two_raw = spark.read
.format("json")
.option("multiline", true)
.load("dbfs:/mnt/slasher/parsing/json_sampleTwo.json")
display("sample_two_raw")

Часто бывают случаи, когда из множества значений массива нужно выбрать и проанализировать лишь некоторые данные. В этом случае Array следует разбить на столбцы с помощью функции explode() из модуля spark.sql.functions, который прежде надо импортировать:

import org.apache.spark.sql.functions.explode
val explode_sample_two = sample_two_raw
.withColumn("data", explode($"data")

В результате функции explode(col) все элемента из массива в столбце col возвращены в виде новой строки, которую можно разбить по столбцам, включив мультистрочный режим, как было рассмотрено ранее. Иногда может понадобиться explore_outer(col), которая в отличие от explode(), возвращает null-значения для пустых исходных и использует имя столбца по умолчанию для элементов массива, а также ключ и значение для элементов, если не указано иное. Explode() по умолчанию игнорирует пустые строки.

Обратной функцией является flatten(col), которая объединяет данные из массива массивов. Если структура вложенных массивов глубже двух уровней, удаляется только один уровень вложенности. В качестве параметра col может быть столбец или строка.

Записать преобразованный датафрейм в виде CSV-файла поможет следующий код:

df.write
.format("csv")
.options("header", true)
.save("path")

В заключение отметим, что функции чтения и записи данных в Apache Spark включают много различных опций, позволяющих получить значения отдельных свойств. Например, свойство timeZone задает строку, указывающую идентификатор часового пояса для форматирования меток времени в источниках данных JSON или значениях разделов. А свойство mode разрешает режим работы с поврежденными записями во время парсинга, т.е. синтаксического анализа, и может принимать следующие значения:

  • PERMISSIVE – при обнаружении поврежденной записи помещает искаженную строку в поле, настроенное с помощью columnNameOfCorruptRecord, и устанавливает для искаженных полей значение null. Чтобы сохранить поврежденные записи, пользователь может задать поле строкового типа с именем columnNameOfCorruptRecord в определяемой пользователем схеме. Если в схеме нет поля, поврежденные записи удаляются во время парсинга. При выводе схемы он неявно добавляет поле columnNameOfCorruptRecord в выходную схему.
  • DROPMALFORMED — игнорирует все поврежденные записи;
  • FAILFAST – выдает исключение при обнаружении поврежденных записей.

Как подобный прием с парсингом JSON-файлов использовать в задачах обращения к внешним сервисам через запросы REST API, мы описываем в этом материале. Про обработку другого популярного формата, CSV-файлов, средствами Spark SQL, читайте в нашей новой статье. Про сложности обработки вложенных структур данных при переносе данных из Apache Kafka в реляционные СУБД и способы их обхода мы рассказываем здесь. А все практические подробности применения Apache Spark для разработки распределенных ML-приложений и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://medium.com/@dataslasher/flatten-json-in-apache-spark-in-no-time-b7e618fae43b
  2. https://ru.wikipedia.org/wiki/JSON
  3. https://spark.apache.org/docs/latest/sql-data-sources-json.html
Поиск по сайту