Содержание
Обработка данных является одной из самых первоочередных задач анализа Big Data. Сегодня мы расскажем о самых полезных преобразованиях PySpark, которые можно выполнить над столбцами. Читайте далее, как привести значения к 0 или 1, как преобразовать из строк в числа и обратно, а также как обработать недостающие значения(Nan) с примерами в PySpark.
Бинаризация: приводим к 0 и 1
Бинаризация используется тогда, когда нужно привести значения к 0 и 1. Бинаризация может использоваться как один из видов категоризации признаков. В PySpark для этого вида преобразования используется класс Binarizer.
Binarizer принимает на вход столбцец inputCol и выходной столбец outputCol, а также порог бинаризации. Значения, превышающие пороговое значение, преобразуются в 1.0; значения, которые равны или меньше порогового значения, преобразуются в 0.0. Типы входных столбцов может быть также Vector и Double. Вот так выглядит пример бинаризации в PySpark:
from pyspark.ml.feature import Binarizer
continuousDataFrame = spark.createDataFrame([
(0, 0.1),
(1, 0.8),
(2, 0.2)
], ["id", "feature"])
binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized")
binarizedDataFrame = binarizer.transform(continuousDataFrame)
print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
#
Binarizer output with Threshold = 0.500000
+---+-------+---------+
| id|feature|binarized|
+---+-------+---------+
| 0| 0.1| 0.0|
| 1| 0.8| 1.0|
| 2| 0.2| 0.0|
+---+-------+---------+
StringIndexer: из строк в числа
StringIndexer кодирует строковые столбцы в столбцы индексов. Он очень удобен при обучении алгоритмов Machine Learning, поскольку они работают только с числами. Ниже пример кода для преобразования из строковых значений в числовые в PySpark. Значения индексов будут находятся в диапазоне [0, количество_меток). Поддерживаются четыре варианта упорядочения (по умолчанию стоит frequencyDesc).
- frequencyDesc: в порядке убывания по частоте метки (наиболее частой метке присвоено 0)
- frequencyAsc: в порядке возрастания по частоте метки (наименее частой метке присвоено 0)
- alphabetDesc: алфавитный порядок по убыванию
- alphabetAsc: возрастающий алфавитный порядок
from pyspark.ml.feature import StringIndexer
data = spark.createDataFrame([
(0, 'a'),
(1, 'b'),
(2, 'c'),
(3, 'a'),
(4, 'a'),
(5, 'c'),
], ["id", "feature"])
indexer = StringIndexer(inputCol="feature", outputCol="indexed", )
indexerModel = indexer.fit(data)
indexedData = indexerModel.transform(data)
indexedData.show()
#
+---+-------+-------+
| id|feature|indexed|
+---+-------+-------+
| 0| a| 0.0|
| 1| b| 2.0|
| 2| c| 1.0|
| 3| a| 0.0|
| 4| a| 0.0|
| 5| c| 1.0|
+---+-------+-------+
IndexToString: из чисел в строки
IndexToString является противоположной PySpark-операцией StringIndexer, т.е. производит преобразование из чисел в строки. IndexToString принимает на вход результат StringIndexer, и еще нужно указать название преобразованного столбца. Возвращаясь к предыдущем примеру преобразуем назад столбец indexedData в строки:
from pyspark.ml.feature import IndexToString, StringIndexer
converter = IndexToString(inputCol="indexed", outputCol="originalCategory")
converted = converter.transform(indexedData)
converted.select("id", "indexed", "originalCategory").show()
#
+---+-------+----------------+
| id|indexed|originalCategory|
+---+-------+----------------+
| 0| 0.0| a|
| 1| 2.0| b|
| 2| 1.0| c|
| 3| 0.0| a|
| 4| 0.0| a|
| 5| 1.0| c|
+---+-------+----------------+
Imputer: заменяем недостающие значения на среднее или медианное
Imputer дополняет недостающие значения в наборе данных средним или медианным значением столбцов, в которых находятся пропущенные значения. Столбцы должны быть числового типа. У Imputer могут возникнуть проблемы с категориальными признаками и возможно выдаст в неправильном формате. Кроме того Imputer может заменять отличные от NaN значения, их тогда нужно передать в аргумент missingValue. Например, missingValue=0 заменит все 0 на средние или медианные значения. Пример преобразования Imputer в PySpark для замены недостающих значений на медианное выглядит следующим образом:
from pyspark.ml.feature import Imputer
df = spark.createDataFrame([
(1.0, float("nan")),
(2.0, float("nan")),
(float("nan"), 3.0),
(4.0, 4.0),
(5.0, 5.0)
], ["a", "b"])
imputer = Imputer(
strategy="median", missingValue=float("nan"),
inputCols=["a", "b"], outputCols=["out_a", "out_b"],)
model = imputer.fit(df)
model.transform(df).show()
#
+---+---+-----+-----+
| a| b|out_a|out_b|
+---+---+-----+-----+
|1.0|NaN| 1.0| 4.0|
|2.0|NaN| 2.0| 4.0|
|NaN|3.0| 2.0| 3.0|
|4.0|4.0| 4.0| 4.0|
|5.0|5.0| 5.0| 5.0|
+---+---+-----+-----+
Чтобы заменить недостающие значения на средние, в аргумент strategy следовало бы передать значение "mean".
О том, как применять различные преобразования в PySpark для решения реальных задач Big Data, вы узнаете на специализированном курсе «Анализ данных с Apache Spark» в лицензированном учебном центре обучения и повышения квалификации разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве.
Источники


