Оконные функции PySpark в Google Colab: пара примеров

Автор Категория ,
Оконные функции PySpark в Google Colab: пара примеров

Специально для обучения начинающих аналитиков данных и дата-инженеров сегодня рассмотрим примеры выполнения простых SQL-запросов и оконных функций в Apache Spark на Google Colab. Как быстро проанализировать датафрейм из CSV-файлов с помощью нескольких строк на PySpark.

Запуск и использование PySpark в Google Colab

Предположим, необходимо определить потенциальный доход от проведения обучающих курсов по технологиям Big Data. Обезличенные заявки на обучение выгружены из CRM-системы в виде CSV-файла под названием apps.csv. Сведения о стоимости курсов хранятся в отдельном CSV-файле под названием price.csv. Если представить структуру данных каждого файла в виде сущности, можно составить ER-диаграмму, которая показывает связь 1-к-1 между сущностью Заявка (app) и Стоимость (price) по ключу кода курса (course в таблице app и course_pr в таблице price). Загрузим эти CSV-файлы в интерактивную среду анализа и визуализации данных Google Colab, разместив их в папке data.

Google Colab анализ CSV-файлов
Исходные данные для анализа в CSV-файлах и их загрузка в Google Colab

Вычислить потенциальный доход по каждой заявке в Google Colab позволит следующий код на PySpark, который соединяет датафрейм заявок (df1) с датафреймом цен (df2), добавляя в df1 соответствующий столбец (income) и удаляет ненужный столбец c повтором стоимости курса (course_pr’). Также в коде выполнено преобразование поля date из строкового типа в дату.

!pip install pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("SpApp").config('spark.ui.port', '4050').getOrCreate()

from pyspark.sql.types import *
schema1 = StructType([
StructField("date", StringType(), True),
StructField("course", StringType(), True),
StructField("number_of_students", ShortType(), True)])

df1 = spark.read.format("csv").schema(schema1).option("header", True).load("data/apps.csv")
from pyspark.sql.functions import unix_timestamp, from_unixtime
df1.withColumn("date", from_unixtime(unix_timestamp("date", "dd/MM/yyy")).alias("date"))

schema2 = StructType([
StructField("course_pr", StringType(), True),
StructField("price", ShortType(), True)])
df2 = spark.read.format("csv").schema(schema2).option("header", True).load("data/price.csv")

df1=df1.join(df2,df1.course==df2.course_pr,"inner")

from pyspark.sql.functions import lit
df1=df1.drop('course_pr')
df1=df1.withColumn("income", df1.number_of_students*(df1.price))

Поскольку в данном примере используются функции PySpark, сперва выполняется его установка и импорт нужных классов из SQL-модуля. Также создан объект сеанса SparkSession, который является точкой входа в Spark-приложение под названием SpApp. Подробно об этом мы писали здесь.

Оценить потенциальный доход от каждого курса поможет простой запрос с группировкой датафрейма df1 по этому полю: df1.groupBy(‘course’).sum(‘income’).show().

Если необходимо привязаться к дате подачи заявки, т.к. в одни день может быть подано несколько заявок по одному и тому же курсу, группировка будет не только по названию курса, но и по дате:

df1.groupBy(‘course’, ‘date’).sum(‘income’).show().

PySpark join dataframe пример
Результаты соединения датафреймов и сумма дохода по каждому курсу

Оконные функции для анализа данных в Apache Spark вместо SQL-запросов

Оконные функции PySpark выполняют вычисления для набора строк, которые связаны друг с другом. Но, в отличие от агрегатных функций, таких как вычисление среднего значения или суммы сгруппированных строк, что мы рассмотрели выше, оконные функции не сворачивают результат строк в одно значение. Вместо этого все строки сохраняют свою исходную идентичность, и вычисленный результат возвращается для каждой строки.

В нашем примере сравним потенциальный доход каждой заявки с максимальным возможным по конкретному курсу. Разумеется, чем больше число студентов в заявке, тем выше потенциальный доход от ее реализации. Чтобы выполнить операцию над группой заявок по каждому курсу, следует сперва разбить данные с помощью PySpark-функции Window.partitionBy(), указав в параметрах этого метода соответствующее поле – course. А номер строки и ее ранг в разделе можно получить через упорядочивание с помощью предложения orderBy(), в параметрах которого определим поле доход (income).

Однако, в практическом смысле нам интересен не столько номер строки в разделе окна, сколько накопительное или кумулятивное распределение потенциального дохода в сравнении от максимального. Определить это поможет оконная функция cume_dist(), по смыслу аналогичная функции DENSE_RANK в SQL.

from pyspark.sql.window import Window
windowSpec = Window.partitionBy("course").orderBy("income")

from pyspark.sql.functions import cume_dist    
df1.withColumn("cume_dist",cume_dist().over(windowSpec)) \
   .show()

Чтобы получить сводную статистику с минимальным, максимальным и средним значением, а также общей суммой по каждому курсу, который является ключом раздела, используем оконные PySpark-функции Aggregate и WindowSpec. При этом уже не нужно использовать предложение order by().

from pyspark.sql.functions import row_number
windowSpecAgg  = Window.partitionBy("course")

from pyspark.sql.functions import col,avg,sum,min,max,row_number 
df1.withColumn("row",row_number().over(windowSpec)) \
  .withColumn("avg", avg(col("income")).over(windowSpecAgg)) \
  .withColumn("sum", sum(col("income")).over(windowSpecAgg)) \
  .withColumn("min", min(col("income")).over(windowSpecAgg)) \
  .withColumn("max", max(col("income")).over(windowSpecAgg)) \
  .where(col("row")==1).select("course","avg","sum","min","max") \
  .show()
window functions PySpark example, обучение Spark примеры курсы обучение, анализ данных Spark PySpark
Результат выполнения оконных функций PySpark

Больше практических деталей по применению Apache Spark для задач дата-инженерии, разработки распределенных приложений и аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.sql.html
  2. https://sparkbyexamples.com/pyspark/pyspark-window-functions/