Сеансовые окна в Apache Spark Structured Streaming: кейсы, примеры и ограничения

Автор Категория ,
Сеансовые окна в Apache Spark Structured Streaming: кейсы, примеры и ограничения

Анализ данных в рамках пользовательский сеансов (сессий) – довольно востребованный кейс в Apache Spark, который не так просто реализовать из-за особенностей потоковой и пакетной обработки, а также эксплуатационных расходов. Сегодня рассмотрим, как работают сеансовые окна Spark Structured Streaming и каковы ограничения этого фреймворка.

Что такое сеансовые окна: краткий ликбез по аналитике больших данных

Оконные функции в Apache Spark мы уже рассматривали здесь. Напомним, при потоковой передаче в рамках сеанса группируются события, относящиеся к одному конкретному объекту (пользователю, устройству и пр.). Такую группировку можно рассматривать как окно, в котором накапливается вся активность этого объекта, за которым следует период бездействия или явное событие, закрывающее окно. Например, пользователь выполнил следующие действия:

  • (“click”, “2021-10-12 20:00”);
  • (“link hover”, “2021-10-32 20:03”);
  • (“click”, “2021-10-32 20:06”);
  • (“click”, “2021-10-12 20:10”);

Сессия заканчивается после закрытия браузера (действие “browser_close”) или 10 минут бездействия. Сессия заканчивается в 20:20 и имеет следующий формат:

{“user_id”: …, “actions”: [“click”, “link hover”, “click”, “click “],” duration_sec “: 600,” end_reason “:” inactivity “}.

Сеансовое окно (session window) разбивает поток на части конечного размера, к которым можно применять вычисления. Окна сеанса фиксируют период активности данных, который заканчивается перерывом в бездействии. В отличие от переворачивающихся (tumbling) и скользящих (sliding) окон, сеансовые окна не перекрываются и не имеют фиксированного времени начала и окончания. Промежуток бездействия используется для закрытия текущего сеанса, а следующие события назначаются новому сеансу. Сеансовые окна особенно полезны для анализа данных при исследовании действий пользователей за конкретный период времени, в течение которого они выполняли определенные действия. Однако, эта лаконичная идея оказалась не слишком проста в реализации [1]. Как это делается в Apache Spark, мы рассмотрим далее.

Анализ данных с Apache Spark

Код курса
SPARK
Ближайшая дата курса
17 января, 2022
Длительность обучения
24 ак.часов
Стоимость обучения
54 000 руб.

Сеансовая аналитика в пакетной и потоковой обработке Apache Spark

Для решения проблемы сеанса в потоковой передаче есть разные шаблоны, но наиболее распространенным являлась обработка с отслеживанием состояния. В зависимости от последнего сеанса идет накопление событий в каком-то состоянии с генерацией результатов при обнаружении завершающих действий. Состояние может иметь разный формат и храниться во внешнем хранилище данных с быстрым доступом: кэш в памяти, key-value хранилище с небольшой задержкой, основная память приложения с резервной копией в контрольной точке.

Например, чтобы отслеживать сеансы из потоков событий потребуется сохранять произвольные типы данных в качестве состояния и выполнять произвольные операции с состоянием, используя события потока данных в каждом триггере. Начиная с Apache Spark 2.2, это можно сделать с помощью операций mapGroupsWithState и flatMapGroupsWithState, которые позволяют применять определяемый пользователем код к сгруппированным наборам данных для обновления состояния, определенного пользователем. При этом разработчику стоит помнить о некоторых особенностях потоковой обработки в Apache Spark: функция состояния должна быть реализована с учетом семантики режима вывода. В частности, в режиме обновления (Update) функция состояния не должна выдавать строки, которые старше текущего водяного знака плюс допустимая задержка поздней записи. А в режиме добавления (Append) функция состояния может выдавать такие строки. Это ограничение глобального водяного знака, которое потенциально может вызвать проблемы сеансовой обработки.

Как и для любых произвольных операций с сохранением состояния в Apache Spark, изменение схемы пользовательского состояния и типа тайм-аута не допускается. Изменения в определяемой пользователем функции отображения состояний разрешены, но зависят от определяемой пользователем логики. Если необходимо поддерживать изменения схемы состояния, рекомендуется явно кодировать и декодировать сложные структуры данных состояния в байты, используя схему, которая поддерживает миграцию. Например, сохраняя состояние как байты в кодировке AVRO, можно изменять эту схему между перезапусками запроса, поскольку двоичное состояние всегда будет успешно восстановлено. Наконец, начиная с Apache Spark 2.4 не следует использовать операции mapGroupsWithState и flatMapGroupsWithState в режиме обновления перед соединением через JOIN [2].

Потоковая обработка в Apache Spark

Код курса
SPOT
Ближайшая дата курса
9 ноября, 2021
Длительность обучения
16 ак.часов
Стоимость обучения
36 000 руб.

Если нет жестких требований к обработке данных в режиме реального времени или около этого, т.е. допускается более высокая задержка, то для решения проблем сеанса можно использовать пакетный подход. Идея состоит в том, чтобы хранить все действия пользователя в распределенной и масштабируемой файловой системе, такой как AWS S3, Google Cloud Storage или Hadoop HDFS, чтобы генерировать сеансы через равные промежутки времени. В идеале это делается с помощью оркестратора типа Apache AirFlow. Ключевым аспектом здесь является обеспечение последовательности выполнения. Это означает, что генерация очередного события может начаться только в том случае, если генерация предыдущего завершилась корректно. Для повышения производительности файлы следует разбивать по времени событий, чтобы не фильтровать события, произошедшие за последние 24 часа для последней эпохи дня.

Возможно, в новой версии Apache Spark сеансовая аналитика в потоковой передаче станет проще благодаря функции session_window, которая использует метку времени и период бездействия в качестве параметров [3]: def session_window(timeColumn: Column, gapDuration: String)

Например, имеем следующие входные данные для одного пользователя:

  • {“time”: “2021-10-03 19:39:34”, “user_id”: “a”}
  • {“time”: “2021-10-03 19:39:41”, “user_id”: “a”}
  • {“time”: “2021-10-03 19:39:42”, “user_id”: “a”}
  • {“time”: “2021-10-03 19:39:49”, “user_id”: “a”}
  • {“time”: “2021-10-03 19:40:03”, “user_id”: “a”}

Первое событие – это запись {“time”: “2021-10-03 19:39:34”, “user_id”: “a”}, а другие записи означают последующие события с периодом бездействия менее 10 секунд, кроме последнего.

Группировка этих данных в виде датафрейма сеансового окна длительностью 10 секунд будет выглядеть так:

val sessionDF = df.groupBy(session_window(‘time, “10 seconds”), ‘user_id)

.count

В результате получим 4 события, которые попадают в первое окно сеанса, потому что период бездействия меньше 10 секунд. Пятое событие попадет в следующее сеансовое окно для этого пользователя, поскольку интервал активности превысил 10 секунд.

обучение Apache Spark, Spark Structured Streaming курсы обучениеб Spark SQL примеры курсы обучение, анализ данных Apache Spark
Результаты группировки данных в сеансовом окне

Узнайте больше практических примеров использования Apache Spark для разработки распределенных приложений и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://github.com/bartosz25/sessionization-demo
  2. https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
  3. https://towardsdatascience.com/native-sessionization-in-apache-spark-624883a8a9e2