Как вести мониторинг финансовых транзакций в реальном времени с Apache Kafka и Spark в Delta Lake: пример аналитики больших данных

Автор Категория , , ,
Как вести мониторинг финансовых транзакций в реальном времени с Apache Kafka и Spark в Delta Lake: пример аналитики больших данных

Сегодня рассмотрим пример построения системы аналитики больших данных для мониторинга финансовых транзакций в реальном времени на базе облачного Delta Lake и конвейера распределенных приложений Apache Kafka, Spark Structured Streaming и других технологий Big Data. Читайте далее о преимуществах облачного Delta Lake от Databricks над традиционным Data Lake.

Постановка задачи: финансовая аналитика в реальном времени

Для кредитной организации необходимо реализовать сервис мониторинга финансовых транзакций в реальном времени, который будет включать следующие функциональные возможности [1]:

  • выявление мошеннических транзакций и генерация оповещение о таких случаях. К примеру, обнаружив лазейку в правилах андеррайтинга, клиент может злонамеренно использовать систему, взяв несколько кредитов и совершив онлайн-покупки с отложенной оплатой.
  • быстрый поиск и устранение неисправностей в случае сбоя или замедления работы системы;
  • мгновенное отслеживание и оценка отклика на маркетинговые кампании.

Для реализации перечисленных функций было решено создать озеро данных (Data Lake), которое будет предоставлять информацию для потоковой аналитики в режиме, близком к реальному времени (near real-time). При этом за последние 2 года в компании накопилось около 400 ТБ исторических данных о финансовых транзакциях. А каждые 5 минут необходимо обрабатывать примерно 10 тысяч записей о транзакциях и результатах различных маркетинговых кампаний.

Как правило, типичная транзакция выполняется в несколько этапов [1]:

  • cбор деталей;
  • шифрование информации;
  • маршрутизация к платежной системе;
  • отправка уведомления об одобрении или отклонении платежа.

Таким образом, в озере данных должна быть одна запись для каждой транзакции с указанием ее итогового состояния.

Delta Lake как решение проблем 2-х разных подходов с Data Lake

Первым вариантом является создание конвейера данных на базе Apache Spark Structured Streaming со списанием данных, который будет состоять из следующих шагов:

  1. раз в 5 минут считывать микро-пакеты данных о транзакциях из Apache Kafka и сохранять их в виде небольших Parquet-файлов;
  2. объединять все новые файлы и исторические данные для создания нового датасета с регулярным интервалом, например, раз в 3 часа с использованием инструментария запросов, таких как Apache Presto, AWS Athena, Google BigQuery и пр.
  3. создавать таблицу Apache Presto или AWS Athena, чтобы сделать эти данные доступными для аналитики с помощью SQL-запросов.
курсы по Kafka, обучение Кафка, курсы инженеров данных, курсы Spark, оучение Apache Spark, big data pipeline on Apache Kafka and Spark
Конвейер аналитики больших данных на Apache Kafka и Spark

При относительно простой реализации, этот вариант имеет следующие недостатки:

  • при увеличении размера данных их консолидация каждые 3 часа становится сложной задачей;
  • при увеличении интервал консолидации до 6 или 12 часов, озеро данных теряет свою интерактивность и уже не может считаться приближенным к реальному времени;
  • любая ошибка в финансовой системе, которая обнаружена недобросовестными клиентами и не может быть моментально исправленной, т.к. данные об этом появляются на дэшборде спустя несколько часов, может привести к потере большого количества денег, т.е. пострадает бизнес;
  • затруднен мониторинга определенных маркетинговых кампаний, таких как кэшбэк при покупке товаров и услуг, приуроченных к определенной дате и времени, например, цветы к 8-му марта, доставка еды во время трансляции футбольного чемпионата и пр.

Обойти некоторые из этих недостатков можно с помощью альтернативного конвейера данных на тех же технологиях Big Data с дополнительным копированием данных. Так data pipeline будет состоять из 2-х этапов [1]:

  1. раз в 5 минут считывать микро-пакеты данных о транзакциях из Apache Kafka и сохранять их в виде небольших Parquet-файлов;
  2. создавать таблицу AWS Athena для аналитики больших данных через SQL-запросы с условием WHERE, что увеличит задержку и может привести к дополнительным накладным расходам, когда данные достигнут петабайтного масштаба.

В первом рассмотренном варианте есть 2 копии одних и тех же данных: необработанные и последнее состояние транзакции. Необработанная копия данных, по сути, бесполезна и хранится в топиках Kafka. Второй подход предполагает единую копию базы транзакций с дубликатами, поэтому требуется добавлять условие фильтрации в SQL-запросе для удаления устаревших транзакций. Таким образом, оба рассмотренных варианта являются неоптимальными.  Необходимо сохранить только одну копию базы транзакций с последним состоянием каждой записи и предоставить возможность просмотра различных snapshot’ов, добавив ACID-свойства к этой единственной Parquet-таблице. Это можно реализовать, заменив Data Lake на Delta Lake от Databricks, о котором мы рассказывали здесь. Кратко перечислим его ключевые особенности, важные с точки зрения рассматриваемой Real-time системы финансовой аналитики больших данных:

  • открытый формат на основе Parquet;
  • поддержка ACID- транзакций;
  • API Apache Spark;
  • возможность путешествия во времени с получением нужных snapshot’ов.

С Delta Lake конвейер проектируемой Big Data системы будет состоять из 3-х этапов [1]:

  1. создание дельта-таблицы из исторических данных о транзакциях;
  2. считывание оперативных данных о транзакциях из Kafka каждые 5 минут;
  3. объединение полученных микро-пакетов с существующей дельта-таблицей.

Чтобы создать дельта-таблицу, можно использовать существующий код Apache Spark SQL и изменить формат с Parquet, csv, json и пр. на delta. При создании дельта-таблицы в хранилище метаданных она сохраняет там расположение данных таблицы, с помощью указателя упрощая пользователям поиск данных и обращение к ним. Однако, хранилище метаданных не является источником достоверной информации о том, что допустимо в таблице – эта ответственность остается за Delta Lake. Оно использует версионные файлы Parquet для хранения данных в облачном хранилище. ACID-транзакции поддерживаются с помощью журнала транзакций с отслеживанием всех фиксаций в таблице или каталоге хранилища больших двоичных объектов [2]. Все это отлично подходит к рассмотренному кейсу построения аналитической Big Data системы для мониторинга финансовых транзакций в реальном времени. Завтра мы продолжим разговор про Delta Lake Databrics и рассмотрим его версию для Google Cloud. А проблемы построения подобного конвейера в AWS мы анализировали здесь.  О другом примере построения облачного озера данных с Apache Kafka читайте в нашей новой статье.

Узнайте больше о построении эффективных конвейеров для потоковой аналитики больших данных с Delta Lake, Apache Spark и Kafka на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

 

 

Источники

  1. https://sigmoidanalytics.medium.com/near-real-time-finance-data-warehousing-using-apache-spark-and-delta-lake-79df4cedab14
  2. https://docs.databricks.com/delta/delta-batch.html