Как создать свой коннектор Apache Spark: пример интеграции с Tableau

Автор Категория , ,
Как создать свой коннектор Apache Spark: пример интеграции с Tableau

Говоря про практическое обучение Apache Spark для дата-инженеров, сегодня рассмотрим особенности разработки собственного коннектора для этого фреймворка на примере его интеграции с BI-системой Tableau. Читайте далее, как конвертировать Spark RDD в нужный формат и сделать свой коннектор удобным для пользователей.

Интеграция Spark с внешними источниками данных через коннекторы

Apache Spark – отличный инструмент для быстрой обработки Big Data, однако он не является хранилищем, в отличие от озера данных на Hadoop HDFS или СУБД, таких как Cassandra, Mongo DB, Greenplum, Elasticsearch и прочие базы данных. Чтобы считывать информацию из Data Lake, СУБД или другого источника, в Спарк используются специальные коннекторы в виде интерфейсов для одной из основных структур данных этого фреймворка – RDD (Resilient Distributed Dataset, надежная распределенная коллекция данных типа таблицы). Полученная информация, как правило, подвергается аналитической обработке и визуализации, например, в BI-инструментах или других системах аналитики больших данных.

Фреймворк предоставляет множеств готовых коннекторов в Spark SQL API. Например, коннектор записи write позволяет легко конвертировать структуру данных DataFrame в CSV с помощью всего одной строки кода: dataframe.write.csv(‘mycsv.csv‘). Также готовые коннекторы Спарк поддерживают и другие распространенные форматы: текстовые файлы, JSON, Parquet, ORC и т.д. Крупные компании-разработчики программного обеспечения уровня Microsoft, MongoDB Inc.  Pivotal Software и другие вендоры популярных систем, предлагают собственные форматы хранения больших данных и коннекторы для их интеграции со Спарк. К примеру, популярный продукт для бизнес-аналитики Tableau использует Tableau Data Extract (.tde) илиHyper (.hyper) в качестве форматов хранения своих таблиц [1].

Разработчик аналитической MPP-СУБД Greenplum, на основе которой создана отечественная Arenadata DB, корпорация Pivotal Software также предлагает готовый Greenplum-Spark Connector в виде JAR-файла. Начать работу с ним можно выполнив команду spark-shell с параметром –jars, который определяет путь файловой системы к JAR-файлу Greenplum-Spark Connector [2]:

[email protected]$ export GSC_JAR=/path/to/greenplum-spark_-.jar

[email protected]$ spark-shell –jars $GSC_JAR

СУБД Mongo DB также предлагает свой коннектор для Spark, предоставляя доступ ко всем библиотекам этого Big Data фреймворка, включая Scala, Java, Python и R. Данные MongoDB материализованы в виде DataFrames и Datasets для анализа с помощью машинного обучения, графического, потокового и SQL API [3].

При всем многообразии готовых и уникальных коннекторов для Apache Spark, все они работают по одному принципу, который мы рассмотрим далее.

Как устроены коннекторы Спарк и при чем здесь RDD

Работу Spark-коннекторов можно представить в виде 3-х этапов [1]:

  1. преобразование датафрейма в целевой формат. При этом стоит помнить про особенности обработки больших данных в этом фреймворке. В частности, RDD распределены по разделам (parttion), не доступным напрямую драйверу кластера, в котором выполняется код Python. Поэтому для каждого раздела RDD нужно собрать данные для драйвера с помощью метода .collect(), а затем просмотреть собранный раздел и вставить его в целевой файл построчно. Пример такой манипуляции показывает следующий код на Python:

def add_partition_index(partition_index, partition_rows):   

yield (partition_index, list(partition_rows))

partitioned_rdd = rdd.mapPartitionWithIndex(add_partition_index)

for current_partition_index in range(rdd.getNumPartitions()):  

[(_, current_partition_rows)] = partitioned_rdd \       

.filter(lambda x: x[0] == current_partition_index) \       

.collect()    for row in current_partition_rows:       

convert_and_insert(row)

Функция convert_and_insert() будет предоставлена ​​поставщиками данных. К примеру, в случае BI-системы Tableau можно обратиться к Tableau SDK (для формата .tde) или Tableau API 2.0 (для формата .hyper), которые имеют C ++, Java и Python API.

Это предполагает полный контроль разработчика Big Data над следующими факторами:

  • память драйвера, которую можно настроить для текущего сеанса Спарк через spark.driver.memory, потому что все разделы, которые будут собраны в этот драйвер один за другим, должны соответствовать;
  • разбиение исходного датафрейма, поскольку ни один раздел не должен превышать объем памяти драйвера, чтобы избежать ошибок OutOfMemory, о которой мы говорили вчера.
  1. Экспорт исходного файла в облако, чтобы сделать данные доступными для пользователей. Например, в случае Tableau это выполняется с помощью REST API для публикации файлов этой BI-системы на ее сервере. Как правило, каждый вендор предоставляет разработчикам выделенные API для публикации данных в облаке. Однако, поскольку не все API достаточно подробно документированы, имеет смысл просмотреть код напрямую, чтобы понять, реализована нужная вам функция.
  2. Упаковка готового коннектора в удобный для пользователей вид. Например, можно реализовать интерфейс командной строки на Python поверх коннектора. Это позволит пользователю выбирать исходную и целевую среды, между которыми нужно передать данные. С учетом заданных параметров и файла конфигурации, запускаются сервисы преобразования и экспорта для публикации форматированных данных в облаке.
Apache Spark Connector
Принцип работы коннектора Apache Spark

Подробное видео о разработке собственного коннектора Спарк для MongoDB на Java, Python и R доступно для просмотра в [4]. Примечательно, что рассмотренный пример можно легко адаптировать для интеграции Apache Spark с другим источником данных благодаря универсальной последовательности действий по разработке уникального коннектора [4]:

  • привести распределенные данные к подходящему формату;
  • реализовать код коннектора на Scala или Java;
  • предоставить коннектору доступ к нужным функциям фреймворка Спарк, например, потоковая передача Spark Streaming и Spark SQL;
  • протестировать работу созданного коннектора;
  • расширить разработанный коннектор для поддержки Python и R (если необходимо);
  • опубликовать готовый коннектор для общего использования.

В следующей статье мы продолжим разговор про коннекторы Спарк с целью обучения инженеров больших данных и рассмотрим, как устроен Greenplum-Spark Connector, а также кейсы и примеры его практического использования.

А освоить практические особенности эксплуатации Apache Spark для аналитики больших данных вы сможете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

 

 

Источники

  1. https://www.sicara.ai/blog/2018-12-12-publish-data-outside-data-lake-spark-connector
  2. https://greenplum.org/introducing-pivotal-greenplum-spark-connector-integrating-apache-spark/
  3. https://www.mongodb.com/products/spark-connector
  4. https://databricks.com/session/how-to-connect-spark-to-your-own-datasource