Анализ данных временных рядов с Apache Spark: пара примеров c Flint и Pandas

анализ данных временных рядов Spark Flint пример, курсы аналитик больших данных, Apache Spark для аналитиков данных и дата-инженеров примеры курсы обучение, курсы примеры обучение Spark SQL PySpark, обучение Spark курсы, примеры Spark обработка CSV, примеры Spark для разработчиков курсы обучение, обучение большим данным, Школа Больших Данных Учебный Центр Коммерсант

В этой статье для дата-инженеров и аналитиков рассмотрим пример мониторинга состояния электрогенераторов с помощью анализа данных временных рядов и ранжирования в pandas для предупреждения выхода оборудования из строя. А также разберем основы анализа временных рядов на больших данных с открытой библиотекой Flint для Apache Spark.

Постановка задачи: температура и производительность электрогенераторов

Показания датчиков, измеряющих температуру и давление, во многих технических системах нужны для мониторинга их состояния в текущих условиях эксплуатации. Непрерывный мониторинг за этими параметрами поможет определить, как выглядит нормальное поведение актива и сообщить о возможных повреждениях или износе при отклонениях от нормы. Так можно предупредить отказы дорогостоящего оборудования, своевременно предприняв меры по профилактическому обслуживанию или осмотру.

Однако, что температура связана с рядом факторов: она зависит как от окружающей среды (летом теплее, чем зимой), загруженности производства и эффективность средств охлаждения. Поэтому имеет смысл отслеживать не только состояние каждого отдельного прибора, а учитывать его параметры вместе с окружением. В частности, для электрических генераторов имеет смысл ранжировать поведение каждого генератора относительно других в течение времени.

Предположим, собраны исторические показания электрогенерации и температуры для набора генераторов на станции. Один из генераторов показывал самую высокую производительность на протяжении 4-х лет, показатели других также были стабильны, т.е. держались на своем уровне «нормы». Но спустя 5 лет картина поменялась: среди генераторов поменялся лидер по производительности и показания их температуры также изменились. Отслеживая только показания температуры или только производительность выработки электроэнергии, можно было бы пропустить аномальные изменения. Поэтому целесообразно провести исследование данных по этим двум измерениям, чтобы понять, что влияет на систему в данный момент? Например, был ли актив в автономном режиме в течение длительного периода, случались ли проблемы со связью или точностью датчика? Что именно могло вызвать такое поведение? Чтобы ответить на эти вопросы, далее проанализируем исходные данных временных рядов, поступающие с генераторов. Подробнее про особенности данных временных рядов читайте в нашей новой статье.

Практический пример на PySpark

Датасет с исходными данными содержит отметку времени (Timestamp), идентификатор актива (Asset), показатели производительности генератора (Generation) и его температуру (Temperature). Данные можно группировать по желаемому временному интервалу, чтобы получить среднегодовые или месячные значения и понять отклонения. Далее сгруппированные данные ранжируются и анализируются.

Для подготовки данных к ранжированию можно использовать следующий код на PySpark:

# import pandas library
import pandas as pd# read in timeseries data, may be via csv or queried direct
# from a database
df = pd.read_csv(r'filepath\filename.csv')# create a variable for your desired time interval
# i.e. year or month
df['year'] = pd.to_datetime(df['TimeStamp']).dt.to_period('Y')# group the data by generator and year and find the aggregate values
#e.g. average temperature over the year
df2 = df.groupby(['GeneratorId','year']).agg({'average_temp': ['std', 'mean'], 'generation': ['std', 'sum']})# write to excel
df2.to_csv(r'filepath\filename.csv', index = False)

Хотя на этом этапе уже можно применить ранжирование, лучше сперва сверить полученное подмножество данных с фактическими значениями, чтобы проверить корректность скрипта и избежать ошибок в дальнейшем. Функция ранжирования данных по температуре очень проста :

df2['average_temp_rank'] = df_year.groupby('year')['average_temp_mean'].rank(pct=True)

Аналогичный прием можно использовать для ранжирования других значений, а затем провести дальнейший анализ, чтобы отследить, как изменились эти рейтинги. Например, чтобы определить тенденцию ранжирования поколения: соотношение ранжирования температуры, остается ли оно стабильным или становится несоответствующим? Также имеет смысл посмотреть ранжирование стандартного отклонения с течением времени. В частности, в генераторах частые колебания температуры могут привести к повреждению из-за расширения и сжатия материалов в ответ на эти изменения.

Рассмотренный пример с использованием Python-библиотеки pandas отлично иллюстрирует саму идею анализа данных о технологических параметрах реальных устройств, но неприменим для работы с огромными объемами данных. В этом случае для анализа данных временных рядов можно использовать Flint – открытую библиотеку Apache Spark, которую мы рассмотрим далее.

Что такое Flint: анализ данных временных рядов с Apache Spark

Прежде чем перейти к Flint, напомним про анализ временных рядов, который состоит из двух компонентов:

  • манипуляции с временными рядами – это процесс работы с данными и их преобразования в функции для обучения ML-модели. Сюда входят задачи подготовки данных к машинному обучению и интеллектуальному анализу, в т.ч. очистка и разработка фич (feature engineering). Чаще всего на этом этапе выполняются соединения (join), оконные функции, изменение периода дискретизации (частоты точек данных) и заполнение отсутствующих значений или удаление строк NA.
  • моделирование временных рядов – это процесс выявления закономерностей в данных временных рядов и обучения ML-моделей прогнозированию с использованием специальных методы: ARIMA, автокорреляция, регрессия и пр.

Возможность анализа данных временных рядов в огромных масштабах сегодня очень востребована как в системах автоматизации производства, так и в финансовых приложениях, а также платформах интернета вещей (IoT, Internet of Things). Стандартом де факто для обработки больших объемов данных сегодня стал вычислительный фреймворк Apache Spark. Он содержит множество встроенных и подключаемых библиотек, одной из которых стал продукт компании Two Sigma – открытая библиотека Flint для быстрых параллельных операций с временными рядами. Flint использует преимущества естественного упорядочения данных временных рядов для обеспечения оптимизации на основе местоположения.

Библиотека Flint доступна через Maven и PyPI. Точкой входа во все функции анализа временных рядов во Flint является TimeSeriesRDD для Scala API и TimeSeriesDataFrame для Python API. На высоком уровне TimeSeriesRDD содержит OrderedRDD, который можно использовать для представления последовательности упорядоченных пар ключ-значение. TimeSeriesRDD использует Long для представления меток времени в наносекундах с начала эпохи в качестве ключей и InternalRows в качестве значений для OrderedRDD для представления набора данных временных рядов.

В отличие от DataFrame и Dataset, TimeSeriesRDD Flint могут использовать существующие свойства упорядочения наборов данных в состоянии покоя и тот факт, что почти все операции с данными и анализ этих наборов данных учитывают их свойства временного упорядочения. Он отличается от других временных рядов в Spark своей способностью эффективно выполнять вычисления по панельным данным или крупномасштабным высокочастотным данным. Следующий пример показывает, как считывать данные во Flint и использовать его с PySpark-функциями DataFrame:

from ts.flint import FlintContext, summarizers
flintContext = FlintContext(sqlContext)
df = spark.createDataFrame(
  [('2018-08-20', 1.0), ('2018-08-21', 2.0), ('2018-08-24', 3.0)],
  ['time', 'v']
).withColumn('time', from_utc_timestamp(col('time'), 'UTC'))
# Convert to Flint DataFrame
flint_df = flintContext.read.dataframe(df)
# Use Spark DataFrame functionality
flint_df = flint_df.withColumn('v', flint_df['v'] + 1)
# Use Flint functionality
flint_df = flint_df.summarizeCycles(summarizers.count())

Больше подробностей про применение Apache Spark для задач дата-инженерии, разработки распределенных приложений и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://towardsdatascience.com/how-ranking-timeseries-data-can-identify-abnormal-behaviour-475c7fd21fb5
  2. https://databricks.com/blog/2018/09/11/introducing-flint-a-time-series-library-for-apache-spark.html
  3. https://github.com/twosigma/flint
Поиск по сайту