Интеграция Neo4j с Apache Spark: обзор коннектора 4.1

Автор Категория , ,
Интеграция Neo4j с Apache Spark: обзор коннектора 4.1

Продвигая наш новый курс по графовой аналитике больших данных в бизнес-приложениях, сегодня заглянем под капот коннектора Neo4j к Apache Spark. Сценарии использования, принципы работы, поддержка потоковой передачи Spark и другие новинки версии 4.1 для построения эффективных аналитических коннекторов с помощью алгоритмов на графах.

Как работает коннектор Neo4j к Apache Spark: краткий обзор

Осенью 2021 года Neo4j выпустил новую версию коннектора к Apache Spark, основными новинками которой стала доступность потоковой передачи Spark, как источника, так и приемника. Этот коннектор работает с Neo4j 3.5 и всей серией 4+, независимо от того, запускается ли он как отдельный экземпляр, в режиме кластера или как управляемая служба в облачном решении AuraDB. Neo4j Connector для Apache Spark версии 4.1 поддерживает Spark 2.4.5+ с Scala 2.11 и Scala 2.12 и Spark 3.0+ с Scala 2.12. В зависимости от комбинации версий Spark и Scala могут понадобиться разные файлы JAR-пакетов, которые имеют вид neo4j-connector-apache-spark _ $ {scala.version} _ $ {spark.version} _ $ {connector.version}.

Коннектор позволяет создавать или потреблять данные в Neo4j AuraDB и из него с любым источником, совместимым с Apache Spark. Коннектор Neo4j для Apache Spark упрощает интеграцию графов со Spark, позволяя считывать любой набор узлов или отношений как датафрейм в Spark или, наоборот, записывать любой DataFrame в Neo4j как набор узлов или отношений. Можно также использовать операторы языка запросов Cypher для обработки записей в датафрейм в выбранном пользователем шаблоне графа.

Примечательно, что этот коннектор не основан на Cypher для Apache Spark/Morpheus, предусматривающий предоставление интерпретатора, выполняющий запросы Cypher в среде Spark. Коннектор предоставляет собственное графовое представление для Spark и выполняет роль средства интеграции между Neo4j и Spark, фокусируясь на чтении и записи. Через этот коннектор весь код Cypher выполняется строго внутри Neo4j, а среда Spark работает как обычно, с датафреймами, причем коннектор не предоставляет примитивы графового API для Spark.

Поскольку коннектор основан на новом Spark DataSource API, другие интерпретаторы для Python и R, также будут работать: API остается прежним, потребуются лишь незначительные изменения синтаксиса, чтобы учесть различия между Python и Scala.

Изначально коннектор Spark записывал данные в Neo4j пакетами. Поскольку Neo4j является транзакционной СУБД, все изменения производятся внутри транзакции, которые имеют накладные расходы. Уменьшить эти накладные расходы и повысить производительность можно следующими способами:

  • увеличить размер пакета (size) – чем больше пакет, тем меньше транзакций выполняется для записи всех данных и тем меньше накладных расходов на транзакции;
  • установить в экземпляре Neo4j достаточный размер свободной кучи и кэша страниц, т.к. недостаток памяти не позволят зафиксировать большие пакеты, замедляя импорт.

Ранее коннектор Neo4j для Apache Spark позволял выполняться пакетную загрузку и перемещение данных из таких систем, как BigQuery, Snowflake, Azure Synapse и других в графы этой NoSQL-СУБД. Коннектор позволяет проектировать конвейеры графовой аналитики больших данных для многих вариантов использования. Однако, пакетные операции не работают с другими источниками потоковой передачи, такими как AWS Kinesis, Google Pubsub, Apache Kafka, Flume и пр. Поэтому в новой версии поддерживается потоковая передача Spark, которая пригодится для множества бизнес-сценариев. О них мы поговорим далее.

Поддержка потоковой передачи Spark в новой версии

Двунаправленная потоковая передача с помощью Spark актуальна для следующих бизнес-сценариев:

  • потоковый ETL, когда необходимо непрерывно собирать данные, очищать, агрегировать их и помещать в граф знаний Neo4j, чтобы обнаружить аномальное поведение почти в реальном времени;
  • обогащение данных – потоковая передача Spark может обогатить живые данные и связать их с другими статическими датасетами, добавляя нужные значения к графу с помощью Cypher;
  • обнаружение триггерных событийSpark Streaming позволяет быстро обнаруживать редкие или необычные («триггерные») события, которые могут указывать на потенциально серьезную проблему, и быстро реагировать на них. Например, банки используют триггеры для обнаружения мошеннических транзакций и предотвращения их, а в производстве часто приходится отправлять автоматические предупреждения, чтобы предотвратить поломки оборудования. Подробнее о применении графовых алгоритмах в бизнес-приложениях мы писали здесь.
  • машинное обучениедля ML-процессов, реализованных в Spark, потоковая передача изменений данных из графа знаний поможет в развертывании моделей онлайн-прогнозирования.

Поскольку потоковая передача работает в обоих направлениях, можно использовать поддержку Spark Streaming, чтобы добавить возможности графа в любой поток потоковых данных. Достаточно передать данные в Neo4j, обогатить их или преобразовать с помощью Cypher и других подходов, а затем быстро передать в нужное место назначения.

Neo4j Apache Spark интеграция коннектор примеры обучение курсы
Потоковая интеграция с Neo4j с Apache Spark

Коннектор просто повторно использует API Spark Streaming, рассматривая Neo4j как источник потоковой передачи. При наличии опыта работы с коннектором чтения/записи данных из/в Neo4j с помощью пакетного API, разработчик может использовать функции Apache Spark Structured Streaming, добавив несколько дополнительных параметров, например, указав момент считывания потока.

Например, следующий код на Scala показывает чтение данных о событиях пользовательского поведения из Neo4j и их запись в Spark датафрейм. Сперва создается поток, к которому запускается запрос, а результат его выполнения записывается во временную таблицу в памяти под названием «testReadStream». Далее идет запрос к этой таблице, чтобы увидеть, какие данные возвращаются.

graph_stream = (

  spark.readStream

    .format(“org.neo4j.spark.DataSource”)

    .option(“authentication.type”, “basic”)

    .option(“url”, url)

    .option(“authentication.basic.username”, user)

    .option(“authentication.basic.password”, password)

    .option(“streaming.property.name”, “lastUpdated”)

    .option(“streaming.from”, “ALL”)

    .option(“labels”, “Person”)

    .load()

)

query = (graph_stream.writeStream

    .format(“memory”)

    .queryName(“testReadStream”)

    .start())

spark.sql(“select * from testReadStream”).show(1000, False)

 

А запись потока данных о событиях пользовательского поведения из AWS Kinesis в Neo4j с помощью Apache Spark Structured Streaming следующим Scala-кодом:

val kinesisQuery = kinesisStream

 .writeStream

 .format(“org.neo4j.spark.DataSource”)

 // Neo4j Aura connection options

 .option(“url”, “neo4j+s://abcd.databases.neo4j.io”)

 .option(“authentication.type”, “basic”)

 .option(“authentication.basic.username”, “neo4j”)

 .option(“authentication.basic.password”, “password”)

 .option(“checkpointLocation”, “/tmp/kinesis2Neo4jCheckpoint”)

 // end connection options

 .option(“save.mode”, “Append”)

 .option(“relationship”, “CHECKED_IN”)

 .option(“relationship.save.strategy”, “keys”)

 .option(“relationship.properties”, “user_checkin_time:at”)

 .option(“relationship.source.labels”, “:Attendee”)

 .option(“relationship.source.save.mode”, “Overwrite”)

 .option(“relationship.source.node.keys”, “user_name:name”)

 .option(“relationship.target.labels”, “:Event”)

 .option(“relationship.target.save.mode”, “Overwrite”)

 .option(“relationship.target.node.keys”, “event_name:name”)

 .start()

 

Завтра мы продолжим разговор про интеграции этой NoSQL-СУБД с другими системами и рассмотрим, как связать ее с Apache Kafka. А о том, как разработчики Cypher хотели внедрить его в Spark, но так и не реализовали эту идею до конца, читайте в нашей новой статье

Еще больше интересных примеров графовой аналитики больших данных в бизнес-приложениях вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков больших данных в Москве:

Источники

  1. https://neo4j.com/blog/apache-spark-for-neo4j-auradb/
  2. https://neo4j.com/developer/spark/faq/