Мониторинг Flink-приложений: метрики JVM и RocksDB

Автор Категория ,
Мониторинг Flink-приложений: метрики JVM и RocksDB

Мы уже рассматривали важность мониторинга приложений Apache Flink и говорили про метрики отслеживания задержки обработки данных в потоковых заданиях. Сегодня заглянем под капот этого фреймворка и разберем, какие показатели работы JVM, а также RocksDB особенно важны для дата-инженера и разработчика распределенных приложений.

Метрики JVM во Flink-приложениях

Напомним, основным языком разработки Apache Flink является Java, несмотря на наличие интерфейса для Python. Поэтому даже разработчику PyFlink-кода стоит помнить про нюансы работы JVM, учитывая среду развертывания самого Flink-приложения. Например, в Kubernetes есть метрика container_memory_working_set_bytes — точный моментальный снимок того, сколько памяти в целом используют контейнеры пода, включая память, доступную JVM, и те области, к которым она не имеет доступа, например, собственная память бэкэнда состояния. В качестве бэкэнда состояния Apache Flink чаще всего используется key-value NoSQL-СУБД  RocksDB, о чем мы писали здесь и здесь. Чтобы предупредить ошибку нехватки памяти, целесообразно сравнивать container_memory_working_set_bytes со значением container_spec_memory_limit_bytes и генерировать оповещение заблаговременно до перезапуска узла или автоматически устранять причину этого превышения. Точно так же любая из метрик Status.JVM.Memory.* поможет следить за памятью JVM и ее компонентами.

Обычно в менеджере заданий (JobManager) мало что может пойти не так, кроме недостаточного предоставления его ресурсов в одной из следующих ситуаций:

  • если количество задач, поддерживаемых менеджером задач (TaskManager) велико, JobManager требуется больше памяти для поддержки внутренних структур данных, а также больше ресурсов ЦП для обработки различных сообщений проверки активности, сообщений и данных контрольных точек, которые координируются координатором контрольных точек в JobManager.
  • Аналогично, если есть много заданий для обслуживания в сеансовом кластере или задана высокая частота контрольных точек, можно увеличить доступное (пиковое) время ЦП. Говоря о контрольных точках, в зависимости от размера Flink-задания и конфигурации state.storage.fs.memory-threshold менеджер заданий может потребовать больше ресурсов для создания и записи встроенных данных контрольной точки в файл _metadata.
  • При выполнении развертывания в режиме приложения, JobManager также будет выполнять пользовательский код, и для этого может потребоваться дополнительный ЦП, память и т.д., в зависимости от бизнес-логики.

Для всех этих случаев можно проверять метрики системного уровня, например, из Kubernetes, или метрики Flink, такие как Status.JVM.CPU.*, Status.JVM.Memory.* или Status.JVM.GarbageCollector.* в панели управления.

В TaskManager те же метрики системного уровня можно использовать для выявления проблем с фактической обработкой данных, например, дисбаланса нагрузки, утечек памяти, проблемных TM и пр. При этом не рекомендуется настраивать оповещения об использовании ЦП, т.к. в рамках задачи, которых может быть много, такое решение не целесообразно. Мониторинг пропускной способности Flink-приложения, что мы разбирали в прошлый раз, является гораздо лучшим индикатором узких мест, поскольку он включает все ресурсы (диск, сеть и пр.).

Однако, для устранения неполадок можно определить конкретное узкое место ресурса, где находится Status.JVM.CPU.Load и т.д. Впрочем, стоит помнить, что подобные измерения нагрузки могут ввести в заблуждение, поскольку, например, значение 0,021 уже может означать 100%-ную нагрузку для контейнера TaskManager с 1-м ЦП на 48-ядерной машине.

Кроме того, в зависимости от бэкенда состояния следует сосредоточиться на разных показателях. Например, для бэкендов состояния на основе кучи наиболее важной частью является мониторинг Status.JVM.Memory.Heap.Used для каждого менеджера задач, который является индикатором размера состояния этого TaskManager. Он не должен превышать установленные пределы, и его следует масштабировать до того, как эти лимиты будут достигнуты. Полезно настроить оповещение об этом.

Увеличение размеров кучи/состояния может происходить либо из-за законного увеличения объема задания, например, из-за большего количества сущностей для обработки, что приводит к увеличению количества ключей или из-за того, что Flink буферизует больше данных из-за увеличивающегося перекоса времени события между разными потоками или сбой очистки состояния в коде программы. Дополнительные показатели для каждой из задействованных задач, например, numRecordsIn/Out, помогут оценить характеристики нагрузки задания.

Поскольку задействована память кучи, сборка мусора (Garbage Collection) также может быть проблематичной. Есть пара статистических метрик на уровне JVM, которые помогут отслеживать их, например, Status.JVM.GarbageCollector.<GarbageCollector>.[Count|Time]. Дополнительные сведения также доступны в GC-логах JVM, которые необходимо активировать отдельно. При том, что бэкенд состояния RocksDB работает вне кучи, придется все равно следить за памятью и сборщиком мусора, т.к. Flink не может полностью контролировать, как RocksDB использует свою память. Если RocksDB хочет использовать больше памяти, чем выделено, это может привести к сбою.

Чтобы получать уведомления об этом заблаговременно, то есть до того, как TaskManager будут уничтожены после достижения установленного лимита памяти, следует настроить оповещения об используемой и доступной памяти. Для этого лучше всего использовать уже упомянутые метрики системного уровня, такие как container_memory_working_set_bytes и container_spec_memory_limit_bytes из Kubernetes, которые включают все типы памяти, которые получает задание Flink, включая те, которые JVM не может отслеживать. Если задание приближается к пределу, для RocksDB можно настроить память вне кучи платформы/задачи или служебную часть JVM в структуре памяти TaskManager. Впрочем, даже без бэкенда состояния на основе кучи нужно следить за сборкой мусора, поскольку высокая нагрузка на Garbage Collector приведет к другим проблемам. В случае с RocksDB это должно происходить только из самого Flink или пользовательского кода. Самые важные метрики RocksDB рассмотрим далее.

Мониторинг RocksDB

RocksDB собирает огромное количество низкоуровневых метрик, которые можно просмотреть. После включения во Flink эти метрики отображаются в области действия <operator>.<state-name>.rocksdb. Напомним, RocksDB — это дерево слияния с журнальной структурой, в котором используются неизменяемые файлы на диске. Удаленные данные будут отмечены как удаленные в последующих файлах, а обновленные данные будут записаны в последующие файлы, чтобы скрыть любые предыдущие версии. Метрика assessment-live-data-size помогает определить фактический размер состояния без устаревших данных. Если этот размер намного меньше, чем занимаемый размер на диске (увеличение пространства), имеет смысл позволить RocksDB чаще выполнять сжатие для очистки устаревших данных. Существует также метрика общего размера sst-файлов, но она может замедлить запросы, если файлов слишком много.

В качестве альтернативы можно использовать метрики размера контрольной точки, что мы рассматривали здесь. Метрику lastCheckpointSize можно использовать для оценки размера текущего состояния. В Apache Flink 1.15 добавлена новая метрика lastCheckpointFullSize, которая предоставляет полный размер контрольной точки, включая файлы, совместно используемые с предыдущими контрольными точками, а не просто количество байтов в добавочной контрольной точке. Эти метрики также включают состояние оператора (обычно небольшое) и все остальное из памяти, например, таймеры в куче или состояние, управляемое пользователем.

Метрика фоновых ошибок background-errors указывает на низкоуровневые сбои внутри RocksDB, которые подробно описаны в лог-файле этой NoSQL-СУБД. Операции записи RocksDB сначала добавляются в таблицу в памяти, которая при заполнении будет поставлена ​​в очередь для сброса на диск и реорганизации данных. Эта очередь имеет ограниченный размер, и если очистка не завершится достаточно быстро, она создаст противодавление внутри RocksDB. Метрика фактической задержки записи actual-delayed-write-rate показывает эти остановки записи, которые могут быть вызваны медленными дисками.

В этих случаях часто бывает полезно настроить количество потоков для фоновых заданий (сброс и уплотнение) для каждого оператора с отслеживанием состояния, изменив значение конфигурации state.backend.rocksdb.thread.num, что добавит здесь больше параллелизма. В RockDB сжатие — это процесс удаления устаревших данных с диска путем объединения файлов и сохранения только самой последней версии каждого элемента данных. Это важно для производительности чтения и может быть проверено путем просмотра таких метрик, как оценка ожидающих сжатия байтов (estimate-pending-compaction-bytes), число выполняемых сжатий (num-running-compactions), приостановленные сжатия (compaction-pending), которые указывают на узкие места в процессах уплотнения. Это может происходить из-за медленных дисков или низкого уровня параллелизма, не перегружающего доступные диски. Как и в случае зависших операций записи, настройка state.backend.rocksdb.thread.num может помочь увеличить параллелизм.

Аналогичным образом можно устранять неполадки процесса сброса таблиц в памяти на диск, просматривая метрики is-write-stopped, num-running-flushes и mem-table-flush-pending. Эти метрики указывают на задания с большим объемом записи и вместе со статистикой ввода-вывода на уровне системы позволяют точно настроить производительность диска, например, путем увеличения параллелизма через state.backend.rocksdb.thread.num.

Все рассмотренные метрики помогут заглянуть под капот потокового stateful-приложения Apache Flink и интерпретировать то, что происходит внутри RocksDB. Если задание не требует интенсивной записи и по-прежнему выполняется медленно, проводя большую часть своего времени в операциях чтения RocksDB, это можно отследить через гистограмму state.backend.latency-track.history-size с помощью отслеживания задержки доступа к состоянию. Также можно воспользоваться профилировщиком или «огненными графами» Flink (Flame Graphs), которые представляют собой визуализацию иерархических данных для отображения трассировки стека профилированного ПО, чтобы можно было быстро и точно определить наиболее частые пути кода. Это хороший индикатор затрат на чтение и/или узкого места на диске. Сжатие данных может помочь, но чаще используется включение фильтров Блума, чтобы уменьшить объем данных для сканирования. О том, что такое фильтр Блума и как он работает, мы рассказывали в этом материале.

Освойте практику использования Apache Flink для потоковой обработки событий в распределенных приложениях аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/metrics/
  2. https://www.ververica.com/blog/monitoring-large-scale-apache-flink-applications-part-1-concepts-continuous-monitoring
  3. https://www.ververica.com/blog/monitoring-large-scale-apache-flink-applications-part-2-metrics-for-troubleshooting