Обработка вложенных структур в JSON-файлах для Hive Metastore c Apache Spark

Автор Категория , ,
Обработка вложенных структур в JSON-файлах для Hive Metastore c Apache Spark

Чем хороши JSON-файлы и как с ними работать в Apache Spark и Hive: проблемы обработки вложенных структур данных и способы их решения на практических примерах. Как автоматизировать переименование некорректных названий полей во вложенных структурах данных JSON-файлов на любом количестве таблиц со множеством полей, чтобы создать таблицу в Hive Metastore и обеспечить пользователям доступ к данным с помощью обычных SQL-запросов.

Про JSON-файлы и их проблемы в Apache Hive и Spark

Полу-структурированный формат JSON широко используется в области Data Science: он понятен для человека и отлично совместим с Apache Spark и Hive. Spark SQL может автоматически определять схему набора данных JSON и загружать ее как Dataset[Row]. Это преобразование можно выполнить с помощью функции read.json()для Dataset[String] или файла JSON. Напомним, JSON-текст в закодированном виде представляет собой упорядоченный набор значений или набор пар ключ/значение. Ключом может быть только строка, при этом хотя регистрозависимость не регулируется обычно имена с буквами в разных регистрах считаются разными. А в качестве значений в JSON могут выступать следующие типы данных:

  • Запись или объект (Object) – неупорядоченное множество пар ключ/значение, отделенных друг от друга запятыми. Каждая запись заключается в {}. Ключ описывается строкой, между ним и значением стоит двоеточие.
  • одномерный массив (Array) – упорядоченное множество однотипных или разнотипных значений, разделенных запятыми. Массив заключается в []. Значения разделяются запятыми.
  • целое или вещественное число (Number);
  • логические переменные (Boolean) – true (истина), false (ложь);
  • пустое значение null;
  • строка (String) – упорядоченное множество из символов юникода в «». Символы могут быть указаны с использованием escape-последовательностей, начинающихся с \, \”, \\, \/, \t, \n, \r, \f и \b, или записаны шестнадцатеричным кодом в кодировке Unicode в виде \uFFFF.

Стоит помнить, что JSON-файл не всегда является типичным для этого формата. Каждая строка должна содержать отдельный автономный действительный объект JSON. Будучи полу-структурированным, формат JSON позволяет создавать таблицы со структурами данных, которые упрощают доступ к ним. Например, можно создать таблицу в мета-хранилище Spark, которая указывает на определенное местоположение (LOCATION), где хранится множество файлов JSON одинаковой структуры, но с разными данными:

json_path:

— file1.json

file2.json

file3.json

Можно даже создать таблицу с местоположением, указывающим на путь со структурой разделов. Spark автоматически обнаружит разделы в пути и создаст партиционированную таблицу, добавив разделы в качестве последних полей в CREATE TABLE для Hive Metastore. Поэтому создание таблицы для инкапсуляции доступа к нескольким файлам JSON значительно упростит доступ к данным и позволит пользователям запрашивать данные с помощью обычных SQL-запросов. Однако, файлы JSON могут иметь очень сложные вложенные структуры, при попытке определить которые часто случаются ошибки. В частности, невозможно создать таблицу с вложенным столбцом, имя которого содержит недопустимые символы (‘,’, ‘:’, ‘;’) в хранилище метаданных Hive. Как это выглядит на практике и каким образом можно обойти эту проблему, рассмотрим далее.

Обработка вложенных структур: проблемы и решения

Предположим, файл JSON содержит имена полей с символами, недопустимыми в хранилище метаданных Apache Hive. Узнать об этой ошибке получается не при чтении данных из файлов JSON и загрузке их в датафрейм, а только при попытке создать таблицу в Hive Metastore или при сохранении датафрейма в формате Delta/Parquet. Простым переименованием столбца эту проблему не решить, поскольку файл JSON представляет собой вложенную структуру. В частности, схема данных в PySpark – это тип структура (StructType), который содержит список из полей типа структура (StructField), каждое из которых может содержать примитивный тип данных или другую структуру. А с массивами (ArrayType) все может стать еще сложнее.

Вот пример SQL-запроса создания таблицы (CREATE TABLE), который вернет такую ошибку, когда имена некоторых полей с недопустимыми символами, причем они вложены в другие поля:

CREATE TABLE `default`.`test_table` (
`acknowledgment` BOOLEAN,
`host` STRING,
`device` STRING,
`closed_time` BIGINT,
`created_time` BIGINT,
`description` STRING,
`properties` STRUCT<
`person_name`: STRING,
`number`: BIGINT,
`cause`: STRING,
`references`: STRING,
`from_id`: STRING,
`incident:x-345-`: BIGINT,
`incident_severe:x-35-`: BIGINT,
`techniques`:ARRAY<STRUCT<
`name:anidated`: STRING,
`tactic`: STRING,
`technique`: STRING>>,
`priority`: STRING,
`valid`: BOOLEAN,
`to_id`: STRING,
`usernames`: STRING>,
`dst` STRING,
`src` STRING,
`date_uploaded` DATE)
USING JSON
LOCATION '<PATH_TO_TABLE>'

Ошибка случилась из-за двоеточия в полях `incident:x-345-`,`incident_severe:x-35-` и `name:anidated`. Если заменить названия этих полей на `incident_x-345-`, `incident_severe_x-35-` и `name_anidated` соответственно, все будет корректно и получится создать таблицу с вложенным столбцом в хранилище метаданных Hive. Предупредить подобную ситуацию с недопустимыми символами в названиях полей можно следующими способами:

  • создать структуру таблицы вручную, удалив специальные символы. Этот вариант подходит для небольших JSON-файлов с количеством полей не более 100. Кроме того, он не масштабируется.
  • непосредственно переименовать некорректные названия полей в файле JSON. Это можно автоматизировать, выполнив регулярное выражение для удаления специальных символов при создании JSON-файлов. Регулярное выражение будет сложным, поскольку нужно удалять недопустимые символы только в именах полей, а не во всем файле JSON. Кроме того, придется добавлять этап обработки при создании самих файлов JSON.
  • непосредственно переименовать полученную схему данных перед сохранением или созданием таблицы в хранилище метаданных Hive. Этот вариант масштабируется на любое количество таблиц с любым количеством полей, не добавляет дополнительный шаг обработки и не влияет на формат исходных данных.

Автоматизировать 3-ий вариант с переименованием схемы данных поможет следующий PySpark-скрипт:

import pyspark.sql.types as sql_types

path_table = "<PATH_TO_DATA>"
table_name = "<TABLE_NAME>"

def recur_rename(schema: StructType, old_char, new_char):
    schema_new = []
    for struct_field in schema:
        if type(struct_field.dataType)==sql_types.StructType:
            schema_new.append(sql_types.StructField(struct_field.name.replace(old_char, new_char), sql_types.StructType(recur_rename(struct_field.dataType, old_char, new_char)), struct_field.nullable, struct_field.metadata))
        elif type(struct_field.dataType)==sql_types.ArrayType: 
            if type(struct_field.dataType.elementType)==sql_types.StructType:
                schema_new.append(sql_types.StructField(struct_field.name.replace(old_char, new_char), sql_types.ArrayType(sql_types.StructType(recur_rename(struct_field.dataType.elementType, old_char, new_char)),True), struct_field.nullable, struct_field.metadata)) # Recursive call to loop over all Array elements
            else:
                schema_new.append(sql_types.StructField(struct_field.name.replace(old_char, new_char), struct_field.dataType.elementType, struct_field.nullable, struct_field.metadata)) # If ArrayType only has one field, it is no sense to use an Array so Array is exploded
        else:
            schema_new.append(sql_types.StructField(struct_field.name.replace(old_char, new_char), struct_field.dataType, struct_field.nullable, struct_field.metadata))
    return schema_new

def rename_columns(schema: StructType, old_char, new_char):
    return sql_types.StructType(recur_rename(schema, old_char, new_char))

df = spark.read.format("json").load(path_table) # Read data whose schema has to be changed.
newSchema = rename_columns(df.schema, ":", "_") # Replace special characters in schema (More special characters not allowed in Spark/Hive meastore: ':', ',', ';')
df2= spark.read.format("json").schema(newSchema).load(path_table) # Read data with new schema.
df2.write.saveAsTable(table_name) # // Save dataframe as table (aux table creation. It will be created as a internal table in Parquet/Delta format) to generate SQL Create Table.
create_table = spark.sql("SHOW CREATE TABLE {0}".format(table_name)).take(1)[0][0]
spark.sql("DROP TABLE {0}".format(table_name)) # Remove aux table
spark.sql(create_table.replace("USING delta","USING JSON LOCATION '{0}'".format(path_table))) # Create new table in Spark/Hive metastore with 

Читайте нашу новую статью про функции парсинга JSON-файлов в Apache Spark. А освоить все тонкости работы с Apache Hive и Spark для эффективной аналитики больших данных вам помогут специализированные курсы в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://towardsdev.com/create-a-spark-hive-meta-store-table-using-nested-json-with-invalid-field-names-505f215eb5bf
  2. https://ru.wikipedia.org/wiki/JSON
  3. https://spark.apache.org/docs/latest/sql-data-sources-json.html