Потоковая аналитика пользовательских сеансов с Apache Flink на примере Wynk

Автор Категория ,
Потоковая аналитика пользовательских сеансов с Apache Flink на примере Wynk

В этой статье для инженеров данных и разработчиков Hadoop-приложений рассмотрим опыт индийской компании Wynk по применению Apache Flink в качестве средства потоковой аналитики больших данных пользовательского поведения в мобильных приложениях прослушивания музыки. Особое внимание уделим вопросу формирования и обработки пользовательских сессий.

Постановка задачи и выбор решения

Wynk Music является одним из популярных приложений потоковой передачи музыки в Индии. Чтобы улучшить пользовательский опыт, продакт-менеджеры и аналитики Wynk решили изучить модели прослушивания и понять, как пользователь ведет себя в каждом сеансе прослушивания музыки, а также какие факторы влияют на это поведение. При этом были поставлены следующие вопросы:

  • как новый продукт или отдельная функция влияет на время сеанса;
  • каковы основные причины прекращения сеансов;
  • сколько сеансов создается или закрывается одномоментно.

Пользовательский сеанс или сессию можно рассматривать как период непрерывного прослушивания музыки с небольшими перерывами между ними со следующими характеристиками:

  • две сессии разделены перерывом неактивности не менее 30 минут;
  • сеанс длится более 30 секунд, чтобы исключить случайные запуски приложения;
  • окончание суток не влияет на определение сеанса;
  • перезапуск приложения из-за сбоев или удаления открывает новый сеанс.

Чтобы измерять активность пользователей, нужны маркеры, которые могут отслеживать и передавать намерения пользователя. Поскольку в Wynk уже существовала система аналитики приложений, которая дает нам к более чем 150 маркерам событий, именно она выступала в качестве отправной точки для определения требований к приложениям потоковой обработки:

  • система должна быть способна обрабатывать около 2 ТБ логов событий в день, выдавая результаты почти в реальном времени;
  • поддержка нескольких версий мобильных клиентов для Android и iOS, а также веб-сайтов. Каждый клиент отправляет динамическую полезную нагрузку с логом событий.
  • способность к быстрому восстановлению в случае сбоя любого компонента без потери данных.

Из-за невозможности контролировать, когда клиент будет отправлять логи событий, все клиентские реализации буферизуют события в течение короткого периода времени перед их отправлением, чтобы уменьшить количество вызовов API к серверной системе. Например, если клиент отключается до отправки пакета с логами событий, этот пакет будет поступать для обработки всякий раз при подключении клиента к сети. Построение полностью точной системы в таком сценарии потребует огромных ресурсов. Поэтому в качестве компромисса 100% точность не требуется, однако, stateful-приложение потоковой обработки должно давать результаты, близкие к реальному времени.

В качестве фреймворка, позволяющего реализовать приложение согласно указанным требованиям ИТ-специалисты Wynk выбрали Apache Flink, благодаря наличию в нем следующих функциональных возможностей:

  • обработка данных в пакетном и потоковом режимах;
  • механизм контрольных точек, о чем мы писали здесь;
  • способность создавать различные дополнительные потоки вывода.

Все клиенты Wynk отправляют журналы событий серверной службе через API. Backend запускает Logstash, который перенаправляет эти события в кластер Kafka. Задание Flink считывает события из топика Kafka и назначает каждому событию идентификатор сеанса.

потоковая аналитика с Apache flink
Архитектура Flink-приложения для потоковой аналитики данных о событиях пользовательского поведения в Wynk Music

Как работает представленная архитектура потокового Flink-приложения, рассмотрим далее.

Потоковая обработка событий с Apache Flink: особенности операторов

Для распределенного выполнения заданий Flink, подобно Apache Spark, использует концепцию направленного ациклического графа (DAG, Directed Acyclic Graph) – цепочки работ из нескольких зада. Каждая задача выполняется одним потоком, а для оптимизации подзадачи оператора объединяются в задачи, что снижает накладные расходы на передачу обслуживания между потоками и буферизацию, а также увеличивает общую пропускную способность при одновременном снижении задержки.

Работу приложения Wynk обеспечивает потоковый конвейер из следующих операторов Flink:

  • Оператор преобразования (Transformation Operator), который поддерживает структурную целостность данных, выполняя фильтрацию и очистку данных, чтобы сделать их пригодными для дальнейшей обработки. Здесь исправляются структурные ошибки, обрабатываются отсутствующие значения и проверяется качество данных, чтобы избежать ненужных накладных расходов на вычисления в следующих операторах.
  • Оператор обработки (Wrangler Operator), который обрабатывает все операции во временной области, назначая временные метки и водяные знаки входным данным, чтобы сопоставить время обработки события приложением с временем фактического создания события. В реальности из-за сетевых задержек или сбоев эти временные метки отличаются друг от друга. Кроме того, в распределенной системе события могут поступать не по порядку, поэтому нужна дополнительная задержка в ожидании этих событий, что дополнительно влияет на время обработки. Здесь же отсеиваются события, в которых разница между временем события и текущим водяным знаком больше определенной временной дельты. Наконец, Wrangler-оператор сохраняет состояние окон предыдущих сеансов для событий, прибывающих с опозданием, но в пределах временной разницы, и назначает событие его окну с соответствующими временными рамками.
  • Оператор сеанса (Session Operator) реализует саму суть бизнес-логики, разделяя события на временные когорты, которые и представляют собой сеансы. Это результат работы всего Flink-конвейера, полученный с помощью DataStream API.

Хотя у Flink есть оконный API для объединения данных в сеансы, он не слишком удобен. Поэтому разработчики Wynk решили реализовать собственный оператор окна сеанса с помощью KeyedProcessFunction:

var timerState: ValueState[Long] = _
var sessionState: ValueState[GenericRecord] = _
var closedSessions: ListState[GenericRecord] = _val 
STATE_SERIALIZER: AvroSerializer[GenericRecord] = _val 
MAX_SESSION_GAP: Long = Time.minutes(30).toMilliseconds

Здесь поддерживается 3 состояния для каждого ключевого события;

  • состояние таймера (timer state) показывает время последнего увиденного события для этого ключа, чтобы проверить, выходит ли текущее событие за пределы сеанса или нет

eventTimestamp — timerState.value().get > MAX_SESSION_GAP

  • состояние сессии (sessionState) содержит текущий объект сеанса и поддерживает список закрытых сеансов, где каждый имеет TTL, равный максимально допустимому времени задержки события;
  • устаревшие состояния (Stale state) периодически очищаются путем регистрации таймера времени обработки, который запускает метод onTimer, предоставляемый функцией KeyedProcessFunction:

context.timerService().registerProcessingTimeTimer(currentTime + CLEAN_UP_INTERVAL)

Обогащенные сеансом события выводятся в два основных приемника топик Kafka в реальном времени и файл Parquet. Также собираются соответствующие метаданные, такие как данные сеанса, сообщения об ошибках, события не по порядку в различных побочных выводах. Например, следующий код показывает, как приложение-потребитель может считывать из топика Kafka обогащенные события:

val kafkaConsumer: KafkaSource[Event]val source: DataStream[Event] = env.fromSource(kafkaConsumer,
WatermarkStrategy.noWatermarks(), "Events")source
.process(new EventTransformationProcess)
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy)
.keyBy(EventKeySelector)
.process(new SimpleEventTimeWrangler)
.keyBy(EventKeySelector)
.process(new Sessionizer)

Поскольку Apache Flink разрешает доступ только к состояниям в KeyedStream, необходимо разделить поток по логическому ключу. Это гарантирует, что данные, соответствующие определенному ключу, всегда обрабатываются в одной и той же подзадаче. Оператор KeyBy вводит дополнительные сетевые буферы между последовательными подзадачами. Это приводит к более равномерной обработке переполнения противодавления от нижестоящих подзадач.

При загрузке данных на операторы целесообразно проверить, что:

  • логический ключ, используемый для партиционирования, не вызывает искажения данных. Обычно ключи с высокой кардинальностью, такие как userID или deviceID, являются подходящим выбором в качестве ключа раздела.
  • значение maxParallelism кратно параллелизму. Для абсолютно равномерного распределения

no of key groups (maxParallelism) = k * num operators (parallelism)

В дополнение к основному потоку, который является результатом операций DataStream, также можно создавать любое количество дополнительных выходных потоков, типы данных в которых могут отличаться от основного потока. Это полезно, когда нужно разделить поток данных, чтобы реплицировать его, а затем отфильтровывать из каждого потока ненужные данные. Про динамическое изменение условий фильтра без перезапуска потокового приложения читайте в нашей новой статье

Для защиты данных от потери и обеспечения последовательного восстановления после сбоев заданий Flink поддерживает технологию моментальных снимков (snapshot) состояния приложения. Фреймворк предлагает две готовые конфигурации серверной части состояния:

  • HashMapStateBackend — сохраняет данные в виде объектов Java в памяти. Размер состояния ограничен общим объемом памяти, доступной в кластере.
  • EmbeddedRocksDBStateBackend — хранит данные в виде сериализованных массивов байтов в базе данных RocksDB, о чем мы писали здесь. Размер состояния в этом случае ограничен общим доступным дисковым пространством.

С точки зрения производительности бэкенд состояния HashMap быстрее RocksDB, поскольку не требует десериализации/сериализации для чтения/сохранения данных с диска при обращении к ним. Однако, HashMapStateBackend ограничен общей доступной памятью, что влияет на производительность, особенно во время контрольных точек, когда все состояние приложения необходимо агрегировать в диспетчере заданий перед сохранением в файловой системе. Поэтому в приложении Wynk из-за большого количества ключей в потоке используется EmbeddedRocksDBStateBackend в качестве бэкенда состояния. А чтобы обеспечить полную эволюцию схем всех объектов перед сериализацией они преобразуются в общую запись AVRO.

После развертывания рассмотренного Flink-приложения в кластере AWS EMR были получены ответы на некоторые изначально поставленные вопросы относительно пользовательского опыта:

  • средняя продолжительность сеанса колеблется от 20 до 25 минут;
  • в среднем пользователи слушают около 6–8 песен за сеанс;
  • в часы пик музыкальным стриминговым сервисом пользуются более миллиона человек – таково количество одновременно активных сеансов.

Больше подробностей про администрирование и эксплуатацию Apache Flink, а также другие технологии потоковой обработки событий для распределенных приложений аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://medium.com/wynk/sessionization-with-flink-875d74093049
  2. https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/flink-architecture/