Что такое PySpark SQL и как он работает: несколько примеров

В прошлый раз мы говорили о том, как установить PySpark в Google Colab, а также скачали датасет с помощью Kaggle API. Сегодня на примере этого датасета покажем, как применять операции SQL в PySpark в рамках анализа Big Data. Читайте далее про вывод статистической информации, фильтрацию, группировку и агрегирование больших данных в PySparkSQL.

Датасет с домами на продажу

Датасет Kaggle содержит данные о домах на продажу в Бруклине с 2003 по 2017 года и доступен для скачивания. Он содержит 111 атрибутов (столбцов) и 390883 записей (строк). В атрибуты включены: дата продажи, дата постройки, цена на дом, налоговый класс, соседние регионы, долгота, ширина и др.

Итак, если у вас установлен PySpark, вам нужно только скачать датасет и прочитать его. Ниже представлен код в Python для представления данных в DataFrame. В методе для чтения CSV-файла мы указали inferSchema=True, чтобы PySpark сам произвел приведение типов для каждого атрибута, т.е., например, цена должна иметь тип int, а дата продажи — timestamp (формат даты в Python).

# Если у вас Google Colab, то раскомментируйте
# import findspark
# findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
data = spark.read.csv(
    'brooklyn_sales_map.csv',
    inferSchema=True, header=True)

Если же у вас не установлен PySpark, то прочитайте здесь.

Получаем основную информацию о датасете

Прежде всего следует узнать, что за данные будут анализироваться: какие именно столбцы, а также сколько строк. Для этого можно вызвать атрибут columns, который вернет список столбцов, а метод count вернет количество строк. Вот так это выглядит в Python:

data.count()
# 390883
data.columns
# ['_c0', 'borough1', 'neighborhood', 'tax_class' ... остальные 107 столбцов]

Также можно проверить, правильно ли PySpark определяет типы данных. В метод select нужно передать название столбца или список столбцов, а затем вызвать dtypes. Следующий код показывает типы данных для даты (sale date) и цены (sale price) продажи:

data.select(['sale_date', 'sale_price']).dtypes
# [('sale_date', 'timestamp'), ('sale_price', 'decimal(9,0)')]

Как видим, PySpark правильно определил типы данных этих столбцов. А чтобы получить статистическую информацию, можно вызвать метод describe, который выдаст:

  • количество записей (count);
  • среднее значение (mean);
  • стандартное отклонение (stddev);
  • максимальное значение (max);
  • минимальное значение (min).
data.describe().show()
+-------+-----------------+
|summary|       sale_price|
+-------+-----------------+
|  count|           390883|
|   mean|      506754.4777|
| stddev|2353964.664224616|
|    min|                0|
|    max|        499401179|
+-------+-----------------+

Ленивые вычисления (lazy evaluation) PySpark SQL

В коде выше, помимо describe, мы также вызвали show, который показывает сам DataFrame. В Pandas достаточно было бы использовать только describe. Но в PySpark вычисления осуществляются только в тот момент, когда нужны данные, а до этого момента они откладываются. Это называется отложенные или ленивые вычисления (lazy evaluation). В нашем случае вычисления происходят только при вызове show. Этот метод принимает в качестве аргумента количество строк, которые нужно показать (по умолчанию стоит 10).

Сортировка данных

В SQL данные сортируются по ключевому слову ORDER BY. В PySparkSQL есть аналогичный метод orederBy, который выполняет ту же операцию, что и SQL. Например, вот так в Python можно отсортировать данные по цене продажи в порядке возрастания:

data.orderBy('sale_price').show(3)
+----------+----------+------------+
|sale_price|year_built|year_of_sale|
+----------+----------+------------+
|         0|      1925|        2011|
|         0|      1925|        2011|
|         0|      1931|        2011|
+----------+----------+------------+

Кроме того, чтобы сортировать сразу по нескольким атрибутам, нужно передать их в виде списка, а также дополнительно можно указать порядок сортировки для каждого из них. Ниже код сортирует данные по цене продажи в порядке убывания, а по дате постройки – в порядке возрастания.

ordered = data.orderBy(['sale_price', 'year_built'], ascending=[0, 1])
ordered.select(['sale_price', 'year_built']).show(3)
+----------+----------+
|sale_price|year_built|
+----------+----------+
| 499401179|      2002|
| 345000000|         0|
| 340000000|      1924|
+----------+----------+

Группировка и агрегирование данных

Как и в классическом SQL, в PySpark можно сгруппировать данные методом groupBy. После группировки можно выполнить агрегирование данных. Следующий код на Python группирует датасет по соседним регионам (neighborhood) и по классу постройки (building class category):

grouped = data.groupBy(['neighborhood', 'building_class_category'])

Теперь можем применять методы агрегирования данных.

  • Вывод максимальной цены продажи для каждой группы:
    grouped.max('sale_price').show(3)
    +-----------------+-----------------------+---------------+
    |     neighborhood|building_class_category|max(sale_price)|
    +-----------------+-----------------------+---------------+
    | BROOKLYN HEIGHTS|    22  STORE BUILDINGS|       12000000|
    |          MIDWOOD|   05  TAX CLASS 1 V...|        7881412|
    |WILLIAMSBURG-EAST|   08 RENTALS - ELEV...|        1669345|
    +-----------------+-----------------------+---------------+
  • Вывод средней цены продажи для каждой группы:
    grouped.mean('sale_price').show(3)
    +-----------------+-----------------------+---------------+
    |     neighborhood|building_class_category|avg(sale_price)|
    +-----------------+-----------------------+---------------+
    | BROOKLYN HEIGHTS|    22  STORE BUILDINGS|   1748589.4138|
    |          MIDWOOD|   05  TAX CLASS 1 V...|    455718.3492|
    |WILLIAMSBURG-EAST|   08 RENTALS - ELEV...|    834672.5000|
    +-----------------+-----------------------+---------------+
  • Количество записей в каждой группе:
    grouped.count().show(3)
    +-----------------+-----------------------+-----+
    |     neighborhood|building_class_category|count|
    +-----------------+-----------------------+-----+
    | BROOKLYN HEIGHTS|    22  STORE BUILDINGS|   29|
    |          MIDWOOD|   05  TAX CLASS 1 V...|   63|
    |WILLIAMSBURG-EAST|   08 RENTALS - ELEV...|    2|
    +-----------------+-----------------------+-----+

Фильтрация данных

Не менее важным этапом анализа больших данных является их фильтрация, т.е. получение записей, которые удовлетворяют условию. В SQL обычно применяется ключевое слово WHERE, в PySpark: для этого есть filter и where (where является синонимом filter, поэтому они работают одинаково). Например, вот так в Python можно вывести все цены, которые больше чем $200.000.000:

filtered = data.filter("sale_price > 200000000")
filtered.select('sale_price').show()
+----------+
|sale_price|
+----------+
| 499401179|
| 345000000|
| 340000000|
| 276947000|
| 202500000|
| 205020000|
| 240000000|
+----------+

Фильтрацию можно осуществлять и для строковых значений. Например, метод isin выберет в столбце те записи, которые входят в один из элементов переданного списка. Полученные записи можно передать в filter. Ниже код оставляет только те записи с домами, которые имеют соседние регионы Midwood или Williamsburg Eeast.

filtered = data['neighborhood'] \
    .isin(['WILLIAMSBURG-EAST', 'MIDWOOD'])
data.filter(filtered).show(3)
+---+-----------------+----------+-------------------+
|_c0|     neighborhood|sale_price|          sale_date|
+---+-----------------+----------+-------------------+
| 59|          MIDWOOD|  67486441|2014-08-07 00:00:00|
|128|WILLIAMSBURG-EAST|  43250000|2016-07-21 00:00:00|
|143|WILLIAMSBURG-EAST|  39500000|2016-08-23 00:00:00|
+---+-----------------+----------+-------------------+

 

 

О том, как на практике применять SQL-операции для анализа больших данных в PySpark вы узнаете на специализированном курсе «Анализ данных с Apache Spark» в нашем лицензированном учебном центре обучения и повышения квалификации разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве.

Поиск по сайту