Подсчет записей в CSV-файлах средствами Apache Spark

Автор Категория ,
Подсчет записей в CSV-файлах средствами Apache Spark

Чтобы сделать наши курсы по Apache Spark еще более полезными, сегодня разберем 2 варианта решения типовой задачи инженерии данных. Как быстро и эффективно считать данные из множества CSV-файлов с одинаковой схемой за несколько строк кода на PySpark.

Постановка задачи: рутинная работа с CSV-файлами

Наряду с JSON-файлами, про которые мы писали в прошлый раз, дата-инженеру часто приходится работать и с другим псевдо-табличным форматом CSV (Comma Separated Value), где данные разделены запятыми или другим символьным разделителем. Предположим, в облачном хранилище Azure Data Lake Storage (ADLS) есть набор CSV-файлов с данными, которые имеют одну и ту же схему. Каждый файл содержит разное количество записей. Необходимо объединить все данные, сохранив сведения об источнике. В данном случае это означает, что надо считать все файлы в один датафрейм с указанием имени файла в качестве дополнительного столбца в целевой таблице. Результатом будем считать таблицу с количеством записей в каждом файле.

PySpark пример, обучение Apache Spark на практических примерах, курсы дата-инженеров Spark SQL
Как посчитать количество записей во множестве CSV-файлов: прикладная дата-инженерия

Наиболее распространенным решением является монтирование ADLS и просмотр файла за файлом, чтобы создать единый датафрейм с количеством записей в каждом файле. Для этого можно использовать функцию count() в API DataFrame, которая считает число строк в таблице.

Для реализации этой идеи средствами Apache Spark сперва следует подключиться к ADLS. К примеру, если Spark используется на платформе Databricks, это можно сделать разными способами: через токен SAS, секреты из хранилища ключей или сквозную передачу учетных данных. Ниже приведен фрагмент кода для монтирования контейнера хранилища в блоки данных:

##Mount a ADLS gen2 storage container with databricks#Don't change configs
configs = {
  "fs.azure.account.auth.type": "CustomAccessToken",
  "fs.azure.account.custom.token.provider.class": spark.conf.get("spark.databricks.passthrough.adls.gen2.tokenProviderClassName")
}"""
One need following details from ADLS
1. Your container Name (Optionally, corresponding directory name)
2. Your Storage account Name
"""# Optionally, you can add <directory-name> to the source URI of your mount point.
dbutils.fs.mount(
  source = "abfss://<your Container Name>@<Your storage account name>.dfs.core.windows.net/<Optional - Specific Directory under container>",
  mount_point = "/mnt/<Desired name for your mount - must be unique>",
  extra_configs = configs)#List mounts
dbutils.fs.mounts()

Далее рассмотрим 2 варианта решения поставленной задачи по подсчету количества строк в каждом CSV-файле.

Типичное решение на Python

Сперва рассмотрим способ, который чаще всего выбирают начинающие Puthon-разработчики, которые привыкли иметь дело с небольшими объемами данных. В частности, можно просматривать из ранее смонтированного каталога файл за файлом, добавляя дополнительный столбец с именем файла к основному датафрейму. Здесь пригодится SQL-функция PySpark lit() для добавления нового столбца в датафрейм через присвоение постоянного значения. Функция возвращает тип столбца в качестве результата.

    ## import lit from sql functions - useful to add withcolumn a constant value
from pyspark.sql.functions import lit
    ## Provide mount with directory where the files exists
    mount_path = '/mnt/<Your mount name>/<directory>'
    ## loop through the files 
    for file in dbutils.fs.ls(mount_path):
        ## This could be better with defining a schema
        if 'flights1.csv' in file.name:
            df1 = spark.read.csv(f'{mount_path}/{file.name}', header = True, inferSchema = True)
            df1 = df1.withColumn('filename',lit(f"{mount_path}/{file.name}"))
            uniondf = df1
        else:
            df2 = spark.read.csv(f'{mount_path}/{file.name}', header = False, inferSchema = True)
            df2 = df2.withColumn('filename',lit(f"{mount_path}/{file.name}"))
            uniondf = uniondf.union(df2)
    ## Register a temp view
    uniondf.createOrReplaceTempView("flights_data")
    ## run a group by command on temp view to get number of records per file - This could be done with data frame groupBy as well
    resultdf  = spark.sql("select filename, count(*) from flights_data group by filename")
    resultdf.display()

Такое решение вполне жизнеспособно, но не очень масштабируемо, когда в каталог записывается много небольших файлов. Временные и вычислительные затраты на прохождение по всему каталогу файлов и создание датафреймов будут расти пропорционально количеству файлов.  Поэтому имеет смысл поискать другой вариант, что мы и рассмотрим далее.

Ускоренная альтернатива в Apache Spark

Создадим датафрейм, используя функцию SQL input_file_name(), которая создает строковый столбец для имени файла текущей задачи Spark:

## Provide mount with directory where the files exists 
mount_path = '/mnt/<mount name>/<directory>'
spark.sql(f"create table flights_data_2 using csv location '{mount_path}/*.csv' options(header 'true', inferSchema 'true', sep ',')")
## run a group by command on registered table
resultdf  = spark.sql("select input_file_name() as filename, count(*) from flights_data_2 group by filename")
resultdf.display()

В этом решении применяется встроенная функция Big Data фреймворка, которая выбирает метаданные таблицы и базовые данные. Такой способ быстрее и отлично масштабируется на любое количество данных.

Узнайте больше про применение Apache Spark для задач дата-инженерии, разработки распределенных приложений и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://insightsndata.com/interesting-spark-sql-function-fc223c603657
  2. https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.input_file_name.html