MLOps для Apache Flink с MLeap

MLOps машинное обучение примеры курсы, Machine Learning Flink Spark примеры курсы обучение, обучение Apache Flink Spark для дата-инженеров и NL-разработчиков, обучение Data Science, Шкоал Больших Данных Учебный Центр Коммерсант

Сегодня рассмотрим, как реализовать MLOps-идеи при разработке приложений Apache Flink с использованием MLeap, библиотеки сериализации для моделей машинного обучения. Зачем инженеры GetInData разрабатывали для этого свой коннектор и как его использовать на практике.

Что такое MLeap и при чем здесь MLOps

Будучи популярным вычислительным движком для потоковой аналитики больших данных, Apache Flink также предоставляет возможности создавать системы машинного обучения, предоставляя набор готовых алгоритмов во встроенной библиотеке ML. Чтобы обеспечить переносимость моделей Machine Learning в соответствии с концепцией MLOps, используются различные форматы сохранения ML-алгоритмов, например, Pickle, ONNX, PMML, PFA и другие, о чем мы писали здесь.

Помимо них также есть MLeap — распространенный формат сериализации и механизм выполнения для конвейеров машинного обучения. Он поддерживает Apache Spark, Scikit-learn и TensorFlow для обучения конвейеров и их экспорта в MLeap Bundle. Сериализированные конвейеры (пакеты) можно десериализовать обратно в Spark для оценки в пакетном режиме или в среду выполнения MLeap для предоставления услуг API в реальном времени.

Многие компании, использующие Spark и Scikit-learn, сталкиваются с трудностями при развертывании своих исследовательских ML-моделей конвейеров данных в производственных сервисах API. Даже при использовании TensorFlow может быть сложно настроить эти сервисы без использования Python или Google ML Cloud в стеке API. MLeap предоставляет простые интерфейсы для выполнения всех конвейеров машинного обучения, от трансформеров фичей до классификаторов, регрессий, алгоритмов кластеризации и нейронных сетей.

MLeap Bundles обеспечивает переносимость ML-моделей, а единая среда выполнения упрощает работу с разными инструментами: Spark, Scikit-learn и Tensorflow. Достаточно просто экспортировать ML-конвейеры в MLeap Bundle и запускать их там, где требуется. Можно обучить различные части конвейера Machine Learning с помощью Spark, Scikit-learn или Tensorflow, затем экспортировать их в один файл MLeap Bundle, чтобы развернуть где угодно. Например, если простая Python-библиотека Scikit применяется для исследований и разработки, но в промышленном масштабе Spark предлагает лучший алгоритм, можно экспортировать конвейер машинного обучения Scikit в Spark, обучить в кластере новую модель, а затем развернуть в рабочей среде с помощью среды выполнения MLeap.

Изначально MLeap не поддерживала Apache Flink, поэтому MLOps-инженеры GetInData разработали собственную библиотеку, которая обеспечивает поддержку Flink в MLeap. Это потребовалось, чтобы обслуживать модели машинного обучения внутри потокового движка с низкой задержкой. Конечно, можно использовать какой-нибудь HTTP-сервис, который будет предоставлять прогнозы или считывать данные из некого хранилища, но это довольно медленное решение. Обслуживание прогнозов внутри потоковых заданий происходит намного быстрее, поскольку исключаются сетевые сбои и задержки, вызванные подключениями к HTTP-серверу или БД. Кроме того, хранение ML-моделей внутри заданий Flink позволит избежать отказов, если другие команды изменят полезную нагрузку JSON-сообщений в REST API или переименуют столбец в базе данных.

Наконец, наличие дополнительных HTTP-серверов, баз данных и конвейеров, которые вычисляют прогнозы и сохраняют их, приводит к дополнительным временным и финансовым затратам. Однако, хранение моделей машинного обучения внутри заданий ставит перед MLOps-инженерами вопросы о том, как загрузить модель в задание и как делать прогноз наиболее эффективно. Возвращаясь к Apache Flink, возникает вопрос, можно ли делать прогнозы с помощью API SQL-модуля этого вычислительного движка. Как инженеры GetInData ответили на все эти вопросы, мы рассмотрим далее.

Реализация для Apache Flink

Чтобы аналитик данных и Data Scientist могли легко использовать существующую инфраструктуру Flink для подключения к обученной модели машинного обучения и прогнозирования потоковых данных, им пригодится SQL-модуль этого фреймворка. Он обеспечивает подключение к кластеру Apache Flink с помощью SQL-клиента и упростит прогнозирование потоков данных через SQL-запросы.

Сперва создадим поток с признаками для машинного обучения, т.е. фичами ML-модели, используя SQL. Для этого используем коннектор генерации данных Flink, который генерирует таблицу как поток фичей со случайными значениями, что пригодится на этапе разработки:

CREATE TABLE Features (
feature1 DOUBLE NOT NULL,
feature2 INT NOT NULL,
feature3 DOUBLE NOT NULL,
feature_timestamp TIMESTAMP(3))
WITH (
'connector' = 'datagen',
'number-of-rows' = '10',
'fields.feature1.min' = '0.0',
'fields.feature1.max' = '1.0'
)

Далее сделаем прогнозы на основе этих фичей:

SELECT
Predict(feature1, feature2, feature3) as prediction,
Predictv2(feature1) as prediction2
FROM Features

Здесь используются UDF-функции Predict и Predictv2, которые могут принимать разное количество аргументов и типов, что можно определить в конфигурации. Чтобы упростить использование моделей машинного обучения в заданиях Flink, MLOps-инженеры GetInData реализовали дополнительные утилиты для SQL API, который проще чем Java/Scala. Одной из них является утилита MLeapUDFRegistry, основная цель которой зарегистрировать UDF-функции Flink, которые впоследствии можно использовать в запросах SQL. Чтобы добавить свои UDF, их следует определить их внутри конфигурации Flink-приложения (файл application.conf) следующим образом:

mleap {
udfRegistry = [
{
udfName = "Predict"
bundlePath = "/mleap-example-1"
bundleSource = "file"
},
{
udfName = "Predictv2"
bundlePath = "/mleap-example-2"
bundleSource = "file"
}
]
}

Далее следует запустить функцию MLeapUDFRegistry.registerFromConfig(config, tableEnv) перед выполнением SQL-запросов:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

// Register UDFs basing on config
val config = ConfigFactory.load()
MLeapUDFRegistry.registerFromConfig(config, tableEnv)

Поскольку разработка самих UDF-функций для каждой ML-модели достаточно трудоемкий процесс, MLOps-инженеры GetInData определили очень общий класс MLeapUDF, чтобы его можно было легко повторно использовать для любого пакета MLeap. Этот проект написан на языке Scala и содержит два модуля: lib (с библиотечными классами) и example (с примерами использования). В модуле lib используется два API-интерфейса Flink: потоковая передача и SQL. Для загрузки моделей в задания Flink создан класс BundleLoaders, который делится на FileBundleLoader, загружающий пакеты из локальных файлов, и GCSBundleLoader, извлекающий ML-модели из корзины облачного хранилища Google.

В MleapMapFunction реализован способ использования пакетов MLeap, включая загрузку ML-модели открытым методом:

case class MleapMapFunction(bundleName: String, bundleLoader: BundleLoader) extends
RichMapFunction[Double, Double] {

private val LOG = LoggerFactory.getLogger(classOf[MleapMapFunction])
@transient var transformer: Transformer = _

override def open(parameters: Configuration): Unit = {
transformer = bundleLoader.loadBundle(bundleName) match {
case Failure(exception) => {
LOG.error(s"Error while loading bundle: $bundleName", exception)
throw BundleLoadProblem(exception)
}
case Success(value) => value
}
}

override def map(value: Double): Double = {
val dataset = Seq(Row(DenseTensor(Array(value), List(1))))
val frame = DefaultLeapFrame(transformer.inputSchema, dataset)
val res = transformer.transform(frame).get.dataset.head(1).asInstanceOf[Double]
res
}
}

Затем в методе Map выполняется прогнозирование, которое можно использовать далее в потоковом Flink-задании FlinkDatastreamWithMleap:

object FlinkDatastreamWithMleap {
def main(args: Array[String]): Unit = {

implicit val typeInfo = TypeInformation.of(classOf[StructType])
val env = StreamExecutionEnvironment.getExecutionEnvironment

val rand: Random = new Random()
val text = env.fromElements(rand.nextDouble(), rand.nextDouble(), rand.nextDouble())
val bundlePath = getClass.getResource("/mleap-example-1").toString
text.map(MleapMapFunction(bundlePath, FileBundleLoader)).print()
env.execute()
}
}

Освойте тонкости использования Apache Flink для потоковой обработки событий в распределенных приложениях аналитики больших данных с моделями машинного обучения на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://medium.com/getindata-blog/flink-with-mleap-24a24cb718db
  2. https://combust.github.io/mleap-docs/
Поиск по сайту