Помнить все: как устранить утечки памяти в приложениях Apache Spark – 7 советов от Disney

курсы Spark, обучение Apache Spark, обработка данных, большие данные, Big Data, Spark

Сегодня рассмотрим Apache Spark с важной для разработчиков распределенных приложений точки зрения, разобрав как в рамках этого Big Data фреймворка справиться с утечками данных при их потоковой передаче. Читайте далее, почему возникает OutOfMemory Exception в Spark-приложениях и как дата-инженеры компании Disney решили эту проблему с нехваткой памяти для JVM.

Зачем Disney нужен Apache Spark Streaming и что с ним не так

Прежде всего поясним бизнес-контекст рассматриваемого примера: Disney Streaming Services — это подразделение одной из крупнейших мультимедиа-корпораций Disney, которое контролирует все услуги потребительской подписки на цифровое видео. Главная миссия Disney Streaming Service – предоставить пользователю свободу доступа к контенту с любого подключенного устройства, в любое время и в любом месте [1].

Disney Streaming Services использует Apache Spark и Spark Structured Streaming для конвейеров обработки больших данных. Spark-приложения работают в среде выполнения Databricks (DBR), которая довольно удобна для пользователя [2]. Databricks Runtime – это набор программных артефактов, которые запускаются на кластерах машин под управлением коммерческого решения Databricks на базе Apache Spark и компонентов для улучшения удобства эксплуатации, производительности и безопасности аналитики больших данных [3]. В частности, в Disney Streaming Services одно из заданий структурированной потоковой передачи использует API-интерфейс flatMapGroupsWithState для накопления состояний и группировки событий в соответствии с бизнес-логикой [2]. Это выполняется в рамках произвольной или настраиваемой обработки событий с отслеживанием состояния, например, чтобы выдавать предупреждение о превышении порогового значения с течением времени для группы или типа событий. Другой сценарии требуют подобной индивидуальной обработки событий во времени – это поддержка пользовательских сеансов в течение определенного или неопределенного времени и их сохранение для последующего анализа. API-интерфейс структурированной потоковой передачи Spark Structured Streaming flatMapGroupsWithState отлично подходит для этих кейсов и может выдавать одну или несколько строк результатов для каждой группы событий [4].

Apache Spark, Spaark Streaming, flatMapGroupsWithState, обучение Спарк
Принцип работы flatMapGroupsWithState в Apache Spark Structured Streaming

Что такое OutOfMemoryError в Spark-приложениях

В Disney Streaming Services часто возникали сбои Spark-приложения с flatMapGroupsWithState по причине необработанного исключения OutOfMemoryError (OOM), связанного с нехваткой памяти кучи (heap) для виртуальных машин Java, которые запускаются как исполнители или драйверы в составе приложения Apache Spark [2]. Напомним, размер памяти для для исполнителя (executor) Spark по умолчанию равен 1 ГБ. Если этого недостаточно, происходит полная сборка, которая приводит к высвобождению избыточной памяти. Но если объем памяти, высвобождаемой после каждого цикла полной сборка мусора, составляет менее 2% в последних 5 последовательных циклах работы сборщика мусора (Garbage Collector), JVM выдаcт исключение нехватки памяти – Out Of Memory Error. Обычно при этом рекомендуется увеличить память исполнителя Spark, а при работе в YARN – также увеличить накладную память для потоков JVM, внутренних метаданных и т. Д. Это настройка фиксируется через spark-submit или в файле spark-defaults.conf [5]:

—conf “spark.executor.memory=12g”

—conf “spark.yarn.executor.memoryOverhead=2048”

or

— «executor-memory=12g»

7 шагов по поиску и устранению причины OOM-ошибки

В случае Disney Streaming Services исключение OutOfMemoryError вызывало перезапуск приложения в среде DBR, обрывая непрерывность потоковой передачи данных. Такая ситуация неприемлема для режима near real-time и SLA с малой временной задержкой. Для решения этой задачи дата-инженеры компании предложили 7-шаговый подход, который позволит четко идентифицировать причину проблемы и устранить ее. Для этого необходимо проверить следующие факторы [2].

  1. логи драйвера, где остаются записи о каждой проблеме со сбоем Spark-задания. Например, если задача отказывает больше, чем задано настройкой task.maxFailures, причина последней неудачи будет описана в журнале драйвера с подробным описанием причины отказа всего задания.
  2. логи исполнителя, обычно доступныех через ssh. Так, к примеру, можно увидеть, что OOM-ошибка (java.lang.OutOfMemoryError) возникла по одной из двух основных причин: превышение предел накладных расходов Garbage Collector или пространство кучи Java (JavaHeapSpace). Также JavaHeapSpace OOM может возникнуть, когда системе недостаточно памяти для данных, которые необходимо обработать. В некоторых случаях эту проблему решает увеличение ресурсов инстанса (CPU, RAM). Также можно настроить параметры для обеспечения потребления количества памяти для объема данных, которые необходимо обработать за один пакет.
  3. активность Garbage Collector, который может занимать слишком много времени и приводить к сбою из-за ошибки превышения предела накладных расходов при полной сборки мусора. Решить эту проблему может G1GC – сборщик мусора серверного типа для многопроцессорных машин с большой памятью. Он соответствует целевым показателям времени паузы при сборке мусора, обеспечивая высокую пропускную способность. Операции с целой кучей, такие как глобальная маркировка, выполняются одновременно с потоками приложения. Это предотвращает прерывания, пропорциональные размеру кучи или оперативных данных. Включить G1GC в Apache Spark можно, задав конфигурацию executor.extraJavaOptions: -XX: + UseG1GC.
  4. состояние кластера, которое можно проверить с помощью инструмента мониторинга, например, Ganglia от Databricks, которая на наглядных дэшбордах покажет потребление памяти и другие важные параметры производительности распределенной системы. В частности, если память кластера какое-то время была стабильной, начала расти, продолжала расти, а затем упала, это означает, что состояние не очищалось с течением времени или возможна утечка памяти.
  5. показатели потоковой передачи – метрики, генерируемые Spark с информацией о каждом обработанном пакете. Например, можно построить график изменения значения во времени параметра numRowsTotal. Его стабильность исключает возможность возникновения OOM из-за сохранения состояния. Поэтому целесообразно включить дамп кучи, чтобы увидеть, что занимает столько памяти.
  6. анализ дампов кучи (HeapDumpOnOutOfMemory), для получения которого в конфигурации Spark Cluster на стороне исполнителя следует включить параметр spark.executor.extraJavaOptions: —XX: + HeapDumpOnOutOfMemoryError —XX: HeapDumpPath = / dbfs / heapDumps. Также можно указать путь для сохранения дампов кучи. Доступ к этим файлам возможен через ssh в workerr’ы и rsync. Периодическое получение дампов кучи позволяет анализировать работу Spark-приложения, сравнивая его нормальную работу с OOM-ошибками. Анализ дампа кучи можно выполнить с помощью таких инструментов, как YourKit или Eclipse MAT.
  7. поиск места утечки памяти, например, через обозреватель объектов YourKit во время проверки файлов hprof. В частности, в случае Disney Streaming Services это было некорректно закрытое соединение при подключении к  Amazon Kinesis — масштабируемый и надежный сервис для потоковой передачи данных в реальном времени. Закрывался только KinesisClient:

override def close(errorOrNull: Throwable): Unit = {

kinesisClient.close()

}

А клиент Apache Http оставался открытым, что привело к увеличению числа созданных Http-клиентов и открытию множества TCP-соединений, что и вызвало утечку памяти с OOM-ошибками. Проверить это предположение дата-инженерам Disney Streaming Services позволил следующий скрипт на Java:

import $ivy.`software.amazon.awssdk:apache-client:2.13.37`

// causes OOM

(1 to 1e6.toInt).foreach { _ =>

software.amazon.awssdk.http.apache.ApacheHttpClient.builder.build() }

// doesn’t cause OOM

(1 to 1e6.toInt).foreach { _ =>

 software.amazon.awssdk.http.apache.ApacheHttpClient.builder.build().close()
}

Для закрытия созданных клиентов Apache HTTP нужно написать следующий Java-код:

override def close(errorOrNull: Throwable): Unit = {

client.close()
httpClient.close()
}

Таким образом, рассмотренный пример показывает разнообразие инструментов, которыми должен владеть дата-инженер и разработчик распределенных Big Data приложений на Apache Spark. Завтра мы продолжим разбираться с ними и рассмотрим пример интеграции Спарк с BI-системой Tableau с помощью коннектора. Освоить все эти и другие средства на практике, а также разобраться с другими особенностями эксплуатации Apache Spark для аналитики больших данных вы сможете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

 

 

Источники

  1. https://medium.com/disney-streaming/about
  2. https://medium.com/disney-streaming/a-step-by-step-guide-for-debugging-memory-leaks-in-spark-applications-e0dd05118958
  3. https://databricks.com/glossary/what-is-databricks-runtime
  4. https://databricks.com/blog/2017/10/17/arbitrary-stateful-processing-in-apache-sparks-structured-streaming.html
  5. https://support.datafabric.hpe.com/s/article/Spark-Troubleshooting-guide-Memory-Management-How-to-troubleshooting-out-of-memory-OOM-issues-on-Spark-Executor