Аналитика больших данных с Apache Spark: UDF на Pyspark для вызова внешних REST API

Apache Spark для разработчиков аналитиков данных и дата-инженеров примеры курсы обучение, курсы примеры обучение Spark SQL PySpark, обучение Spark REST API UDF курсы, примеры Spark парсинг JSON, примеры Spark для разработчиков курсы обучение, обучение большим данным, Школа Больших Данных Учебный Центр Коммерсант

Сегодня рассмотрим, как загружать большие объемы данных из REST API-сервисов с Apache Spark, написав на PySpark собственную UDF-функцию с преобразованием withColumn(), чтобы воспользоваться всеми преимуществами распределенных вычислений этого фреймворка.

Локальное исполнение на драйвере и распараллеливание REST-API вызовов в Apache Spark

Мы уже рассказывали, что конвертация Python-скрипта в распределенный код Apache Spark – не самая тривиальная задача. Например, выполнение кода Python вне контекста Dataframe на драйвере выполняется локально, а не распараллеливается по узлам кластера. Это становится проблемой при работе с большими объемами данных, например, когда нужно загрузить большие объемы данных через службу REST API.

Рассмотрим пример:

import requests
import jsonres = Nonetry:
  res = requests.get(url, data=body, headers=headers)
    
except Exception as e:
  print(e)if res != None and res.status_code == 200:
 print(json.loads(res.text))

Если просто запустить этот код на исполнение, он будет выполнен локально на драйвере, не используя преимущества распределенного фреймворка, такие как параллелизм и масштабирование. Это превращает код PySpark в просто однопоточную программу Python. Поэтому, чтобы использовать все возможности Apache Spark, необходимо найти альтернативное решение, например, написав пользовательскую функцию (UDF, User Defined Function) с оператором withColumn. В частности, можно создать DataFrame, в котором каждая строка представляет собой один запрос к службе REST. UDF используется для инкапсуляции HTTP-запроса, возвращая структурированный столбец, представляющий REST-ответ от API внешнего сервиса. К этому ответу можно применить синтаксический анализ (парсинг), чтобы разнести возвращаемые поля ключ-значения по столбцам датафрейма. Если возвращаемый JSON-файл содержит вложенные структуры типа массивов и объектов, их следует распарсить дополнительно, подобно тому, как мы писали здесь. Можно самостоятельно реализовать подобное решение, преобразовав многоуровневые сложные иерархические столбцы в неиерархическую версию самих себя. В преобразованном датафрейме не будет столбцов с данными сложного типа: всем вложенным атрибутам назначится собственный столбец.

Рассмотрим предложенное решение на примере бесплатной службы REST API правительства США, которая возвращает марки и модели транспортных средств, зарегистрированных в этой стране: https://vpic.nhtsa.dot.gov/api/vehicles/getallmakes?format=json.

Реализация UDF-функции на PySpark

Для отправки HTTP-запросов можно использовать Python-библиотеку Requests, которая избавляет от необходимости вручную добавлять строки запроса к URL-адресам или кодировать данные в запросе POST. Предположим, нужно использовать данные из REST API, вызываемый несколько раз, чтобы получить требуемые данные. Чтобы воспользоваться параллелизмом Apache Spark, каждый вызов REST API будет инкапсулирован UDF-функцией, которая привязана к DataFrame. Каждая строка в DataFrame будет представлять собой один вызов службы REST API. После выполнения действия в DataFrame результат каждого отдельного вызова REST API будет добавлен к каждой строке в виде структурированного типа данных.

Используем библиотеку Requests для выполнения HTTP-запроса get или post. Ответ RESTful-сервиса передается обратно в виде объекта JSON.

import requests
import json
from pyspark.sql.functions import udf, col, explode
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType
from pyspark.sql import Row
def executeRestApi(verb, url, headers, body):
  #
  headers = {
      'content-type': "application/json"
  }  res = None
  # Make API request, get response object back, create dataframe from above schema.
  try:
    if verb == "get":
      res = requests.get(url, data=body, headers=headers)
    else:
      res = requests.post(url, data=body, headers=headers)
  except Exception as e:
    return e  if res != None and res.status_code == 200:
    return json.loads(res.text)  return None

Apache Spark позволяет выбирать, какие значения нужны из JSON, возвращаемого вызовом REST API. Следует всего лишь при объявлении схемы данных определить, какие части JSON нужны.

schema = StructType([
StructField("Count", IntegerType(), True),
StructField("Message", StringType(), True),
StructField("SearchCriteria", StringType(), True),
StructField("Results", ArrayType(
StructType([
StructField("Make_ID", IntegerType()),
StructField("Make_Name", StringType())
])
))
])

Потоковая обработка в Apache Spark

Код курса
SPOT
Ближайшая дата курса
16 мая, 2024
Продолжительность
16 ак.часов
Стоимость обучения
48 000 руб.

Затем необходимо объявить UDF, убедившись, что тип возвращаемого значения установлен в соответствии с заявленной схемой. Это гарантирует, что новый столбец, который используется для выполнения пользовательской функции, в конечном итоге будет содержать данные в виде структурированного объекта, а не простого текста в формате JSON. Действие аналогично использованию функции from_json, которая принимает схему в качестве второго параметра.

udf_executeRestApi = udf(executeRestApi, schema)

Наконец, необходимо создать DataFrame, где каждая строка представляет собой один вызов REST API. Количество столбцов в датафрейме может быть любым, один из которых должен содержать URL-адрес и/или параметры, необходимые для выполнения вызова REST API. В нашем примере создадим датафрейм следующим образом:

from pyspark.sql import Rowheaders = {
    'content-type': "application/json"
}body = json.dumps({
})RestApiRequestRow = Row("verb", "url", "headers", "body")
request_df = spark.createDataFrame([
RestApiRequestRow("get", "https://vpic.nhtsa.dot.gov/api/vehicles/getallmakes?format=json", headers, body)
          ])

Класс Row используется для определения столбцов датафрейма, и с помощью метода createDataFrame экземпляр RestApiRequestRow объявляется для каждого отдельного вызова API. Далее можно использовать метод withColumn() в DataFrame для выполнения UDF и REST API.

result_df = request_df \
.withColumn("result", udf_executeRestApi(col("verb"), col("url"), col("headers"), col("body")))

Поскольку Apache Spark поддерживает ленивые или отложенные вычисления для преобразований, UDF для датафрейма будет выполняться после материализации запроса и вызове какого-либо действия, например, show(). При этом создается план запроса, но сами данные все еще находятся в хранилище и ожидают обработки. Spark будет распределять вызовы API между всеми рабочими процессами, прежде чем возвращать такие результаты HTTP-вызова с заголовками и телом результата. Если выполнить парсинг возвращаемого от RESTful-сервиса JSON-ответа с пользовательской UDF-функцией Collapse_columns, записывая в датафрейм только нужный атрибут, который определен как result.Results, создание итогового датафрейма будет выглядеть так:

df = request_df.select(explode(col("result.Results")).alias("results"))
df.select(collapse_columns(df.schema)).show() 

Хотите узнать, сколько времени занимают вызовы API из Apache Spark при обращении к Kafka и S3? Читайте об этом в нашей новой статье

Узнайте больше про использование Apache Spark для задач дата-инженерии, разработки распределенных приложений и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://medium.com/geekculture/how-to-execute-a-rest-api-call-on-apache-spark-the-right-way-in-python-4367f2740e78
  2. https://github.com/jamesshocking/collapse-spark-dataframe
  3. https://docs.python-requests.org/en/latest/
Поиск по сайту