Введение в PySpark

Автор Категория ,
Введение в PySpark

Python считается из основных языков программирования в областях Data Science и Big Data, поэтому не удивительно, что Apache Spark предлагает интерфейс и для него. Data Scientist’ы, которые знают Python, могут запросто производить параллельные вычисления с PySpark. Читайте в нашей статье об инициализации Spark-приложения в Python, различии между Pandas и PySpark, доступных форматов для чтения и записи, а также интеграция с базами данных.

Инициализация через SparkContext, SparkConf и SparkSession

В первую очередь, Spark создает SparkContext – объект, который определяет, как получить доступ к кластеру в момент выполнения программы. Также определяются параметры конфигурации через SparkConf. К ним может относиться кластерный менеджер (master), с которым соединяется приложение через URL, название приложения, количество ядер и т.д (с полным списком можно ознакомиться в документации). Вот так может выглядеть инициализация Spark:

conf = SparkConf().setAppName('appName').setMaster('local[*]')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

Начиная со Spark 2.0, был представлен SparkSession, который служит единой точкой входа и устраняет необходимость явно создавать SparkConf и SparkContext, поскольку они инкапсулируются в SparkSession. Все функции, которые были доступны в SparkContext, теперь доступны в SparkSession. Например, код выше может быть переписан следующим образом:

spark = SparkSession.builder \
	.master('local[*]') \
	.appName('appName') \
	.getOrCreate()

Pandas и PySpark

Python-библиотека Pandas – главный инструмент Data Scientist’а при работе с данными. Интерфейсы Pandas и PySpark имеют множество сходств, поэтому тем, кто уже знаком с Pandas, не трудно понять и PySpark. В частности, оба используют DataFrame для представления табличных данных, отсюда следует множество схожих методов. При разных названиях, они пересекаются с операциями SQL: методы distinct, where в PySpark соответствуют методам unique, filter в Pandas.

Главное отличие между PySpark и Pandas состоит в режиме выполнения. PySpark реализует lazy execution (ленивое выполнение), в то время как Pandas – eager execution (мгновенное выполнение). Допустим, требуется загрузить данные на диске и применить к ним операции трансформации (map). Pandas выполнит их тут же. PySpark же сохранит всю последовательность необходимых операций и выполнит их в том случае, когда данные понадобятся. Ниже приведены примеры с сортировкой данных в PySpark и Pandas.

import pandas as pd
data = pd.read_csv(“file.csv”)
data = data.apply(lambda x: sorted(x)) # сразу же изменяются данные
data = spark.read.csv(“file.csv”)
def sort_cols_asc(input_df):
    return input_df.select(*sorted(input_df.columns))
data = data.transform(sort_cols_asc) # в ожидании выполнения операций
data.show() # вот теперь применит

Отметим также – очень просто перейти от PySpark к Pandas и наоборот:

# Из Pandas в PySpark 
spark_df = spark.createDataFrame(pandasDF)

# Из PySpark в Pandas 
pandasDF = spark_df.toPandas()

Доступные форматы для чтения и записи

PySpark поддерживает такие основные форматы, как CSV, JSON, ORC, Parquet. Разберемся с синтаксисом чтения и записи, который практически одинаковый.

  • CSV (Comma-separated values, значения, разделенные запятыми) – наиболее часто используемый формат для хранения датасетов:
    data = spark.read.csv(“file.csv”) # прочитать
    data.write.csv(”file_dir”, sep=',') # записать
    	
  • JSON (JavaScript Object Notation) применяется для сериализации данных, используется также в MongoDB:
    data = spark.read.json(“file.json”)
    data.write.json(”file_dir”)
    
  • ORC (Optimized Row Columnar) – формат хранения данных экосистем Apache Hadoop:
    data = spark.read.orc(“orc_file”)
    data.write.orc()
    
  • Parquet- еще один формат экосистем Apache Hadoop, который может изменяться в соответствии с изменением данных, а также поддерживает слияние схем:
    data = spark.read.parquet(“parquet_file”)
    data.write.parquet(”file_dir”)
    

Такие примеры записи написаны больше в Python-стиле. PySpark также поддерживает функциональный стиль программирования. Вот так, например, будет выглядеть чтение CSV-файла:

data = spark.read \
	.format(“json”) \
	.load(“file.json”) \
	.option("header", True)

Доступные базы данных

Также PySpark может взаимодействовать с SQL и NoSQL базами данных. Рассмотрим также доступные базы данных и их синтаксис взаимодействия. Ниже будет уже подразумеваться, что все базы данных подключены, поэтому остается только установить взаимодействие с PySpark.

  • Реляционные СУБД, например, MySQLили PostgreSQL. Для чтения необходимо указать соединение с базой данных, соответствующую таблицу и пароль:
    dbURL = "jdbc:mysql://localhost/bigdataschool"
    data = spark.read. \
    	.format("jdbc") \
    	.options(
    		url=dbURL, database="bigdataschool", 
    		dbtable='some_table', user="root",
    		password="your_pass") \
    	.load()
    
  • Cassandra- распределенная NoSQL СУБД с упором на надежность и работу с большими данными (Big Data). Здесь необходимо указать таблицу, а также пространство ключей:
    data = spark.read \
    	.format("org.apache.spark.sql.cassandra") \
    	.option(table="t2", keyspace="test") \
    	.load()
    
  • MongoDB- также является NoSQL-СУБД, которая использует формат JSON. Нужно указать соединение с коллекцией:
    data = spark.read \
    	.format("com.mongodb.spark.sql.DefaultSource")
    	.option("uri","mongodb://127.0.0.1/bigdataschool.courses") \
    	.load()
    
  • ApacheHive – СУБД на основе платформы Hadoop. В методе table указывается названии таблицы в формате <база_данных>.<таблица>:
    courses = spark.table("bigdataschool.courses")
    

Подобным образом, DataFrame можно записать в базу данных. Правда, для Apache Hive при инициализации SparkSession необходимо включить его поддержку, вызвав метод enableHiveSupport [1]:

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", "spark-warehouse") \
    .enableHiveSupport() \
	.getOrCreate()

В следующий раз рассмотрим пример выполнения SQL-операций PySpark на конкретном датасете в Google Colab. А как на практике использовать PySpark в проектах аналитики больших данных, вы узнаете на специализированном курсе «Анализ данных с Apache Spark» в нашем лицензированном учебном центре обучения и повышения квалификации разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве.

Источники
  1. https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html