Аналитика больших данных с Apache Spark: UDF на Pyspark для вызова внешних REST API

Автор Категория ,
Аналитика больших данных с Apache Spark: UDF на Pyspark для вызова внешних REST API

Сегодня рассмотрим, как загружать большие объемы данных из REST API-сервисов с Apache Spark, написав на PySpark собственную UDF-функцию с преобразованием withColumn(), чтобы воспользоваться всеми преимуществами распределенных вычислений этого фреймворка.

Локальное исполнение на драйвере и распараллеливание REST-API вызовов в Apache Spark

Мы уже рассказывали, что конвертация Python-скрипта в распределенный код Apache Spark – не самая тривиальная задача. Например, выполнение кода Python вне контекста Dataframe на драйвере выполняется локально, а не распараллеливается по узлам кластера. Это становится проблемой при работе с большими объемами данных, например, когда нужно загрузить большие объемы данных через службу REST API.

Рассмотрим пример:

import requests
import jsonres = Nonetry:
  res = requests.get(url, data=body, headers=headers)
    
except Exception as e:
  print(e)if res != None and res.status_code == 200:
 print(json.loads(res.text))

Если просто запустить этот код на исполнение, он будет выполнен локально на драйвере, не используя преимущества распределенного фреймворка, такие как параллелизм и масштабирование. Это превращает код PySpark в просто однопоточную программу Python. Поэтому, чтобы использовать все возможности Apache Spark, необходимо найти альтернативное решение, например, написав пользовательскую функцию (UDF, User Defined Function) с оператором withColumn. В частности, можно создать DataFrame, в котором каждая строка представляет собой один запрос к службе REST. UDF используется для инкапсуляции HTTP-запроса, возвращая структурированный столбец, представляющий REST-ответ от API внешнего сервиса. К этому ответу можно применить синтаксический анализ (парсинг), чтобы разнести возвращаемые поля ключ-значения по столбцам датафрейма. Если возвращаемый JSON-файл содержит вложенные структуры типа массивов и объектов, их следует распарсить дополнительно, подобно тому, как мы писали здесь. Можно самостоятельно реализовать подобное решение, преобразовав многоуровневые сложные иерархические столбцы в неиерархическую версию самих себя. В преобразованном датафрейме не будет столбцов с данными сложного типа: всем вложенным атрибутам назначится собственный столбец.

Рассмотрим предложенное решение на примере бесплатной службы REST API правительства США, которая возвращает марки и модели транспортных средств, зарегистрированных в этой стране: https://vpic.nhtsa.dot.gov/api/vehicles/getallmakes?format=json.

Реализация UDF-функции на PySpark

Для отправки HTTP-запросов можно использовать Python-библиотеку Requests, которая избавляет от необходимости вручную добавлять строки запроса к URL-адресам или кодировать данные в запросе POST. Предположим, нужно использовать данные из REST API, вызываемый несколько раз, чтобы получить требуемые данные. Чтобы воспользоваться параллелизмом Apache Spark, каждый вызов REST API будет инкапсулирован UDF-функцией, которая привязана к DataFrame. Каждая строка в DataFrame будет представлять собой один вызов службы REST API. После выполнения действия в DataFrame результат каждого отдельного вызова REST API будет добавлен к каждой строке в виде структурированного типа данных.

Используем библиотеку Requests для выполнения HTTP-запроса get или post. Ответ RESTful-сервиса передается обратно в виде объекта JSON.

import requests
import json
from pyspark.sql.functions import udf, col, explode
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType
from pyspark.sql import Row
def executeRestApi(verb, url, headers, body):
  #
  headers = {
      'content-type': "application/json"
  }  res = None
  # Make API request, get response object back, create dataframe from above schema.
  try:
    if verb == "get":
      res = requests.get(url, data=body, headers=headers)
    else:
      res = requests.post(url, data=body, headers=headers)
  except Exception as e:
    return e  if res != None and res.status_code == 200:
    return json.loads(res.text)  return None

Apache Spark позволяет выбирать, какие значения нужны из JSON, возвращаемого вызовом REST API. Следует всего лишь при объявлении схемы данных определить, какие части JSON нужны.

schema = StructType([
StructField("Count", IntegerType(), True),
StructField("Message", StringType(), True),
StructField("SearchCriteria", StringType(), True),
StructField("Results", ArrayType(
StructType([
StructField("Make_ID", IntegerType()),
StructField("Make_Name", StringType())
])
))
])

Потоковая обработка в Apache Spark

Код курса
SPOT
Ближайшая дата курса
19 сентября, 2022
Длительность обучения
16 ак.часов
Стоимость обучения
40 000 руб.

Затем необходимо объявить UDF, убедившись, что тип возвращаемого значения установлен в соответствии с заявленной схемой. Это гарантирует, что новый столбец, который используется для выполнения пользовательской функции, в конечном итоге будет содержать данные в виде структурированного объекта, а не простого текста в формате JSON. Действие аналогично использованию функции from_json, которая принимает схему в качестве второго параметра.

udf_executeRestApi = udf(executeRestApi, schema)

Наконец, необходимо создать DataFrame, где каждая строка представляет собой один вызов REST API. Количество столбцов в датафрейме может быть любым, один из которых должен содержать URL-адрес и/или параметры, необходимые для выполнения вызова REST API. В нашем примере создадим датафрейм следующим образом:

from pyspark.sql import Rowheaders = {
    'content-type': "application/json"
}body = json.dumps({
})RestApiRequestRow = Row("verb", "url", "headers", "body")
request_df = spark.createDataFrame([
RestApiRequestRow("get", "https://vpic.nhtsa.dot.gov/api/vehicles/getallmakes?format=json", headers, body)
          ])

Класс Row используется для определения столбцов датафрейма, и с помощью метода createDataFrame экземпляр RestApiRequestRow объявляется для каждого отдельного вызова API. Далее можно использовать метод withColumn() в DataFrame для выполнения UDF и REST API.

result_df = request_df \
.withColumn("result", udf_executeRestApi(col("verb"), col("url"), col("headers"), col("body")))

Поскольку Apache Spark поддерживает ленивые или отложенные вычисления для преобразований, UDF для датафрейма будет выполняться после материализации запроса и вызове какого-либо действия, например, show(). При этом создается план запроса, но сами данные все еще находятся в хранилище и ожидают обработки. Spark будет распределять вызовы API между всеми рабочими процессами, прежде чем возвращать такие результаты HTTP-вызова с заголовками и телом результата. Если выполнить парсинг возвращаемого от RESTful-сервиса JSON-ответа с пользовательской UDF-функцией Collapse_columns, записывая в датафрейм только нужный атрибут, который определен как result.Results, создание итогового датафрейма будет выглядеть так:

df = request_df.select(explode(col("result.Results")).alias("results"))
df.select(collapse_columns(df.schema)).show() 

Хотите узнать, сколько времени занимают вызовы API из Apache Spark при обращении к Kafka и S3? Читайте об этом в нашей новой статье

Узнайте больше про использование Apache Spark для задач дата-инженерии, разработки распределенных приложений и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://medium.com/geekculture/how-to-execute-a-rest-api-call-on-apache-spark-the-right-way-in-python-4367f2740e78
  2. https://github.com/jamesshocking/collapse-spark-dataframe
  3. https://docs.python-requests.org/en/latest/