Преобразование столбцов в PySpark

Автор Категория , ,
Преобразование столбцов в PySpark

Обработка данных является одной из самых первоочередных задач анализа Big Data. Сегодня мы расскажем о самых полезных преобразованиях PySpark, которые можно выполнить над столбцами. Читайте далее, как привести значения к 0 или 1, как преобразовать из строк в числа и обратно, а также как обработать недостающие значения(Nan) с примерами в PySpark.

Бинаризация: приводим к 0 и 1

Бинаризация используется тогда, когда нужно привести значения к 0 и 1. Бинаризация может использоваться как один из видов категоризации признаков. В PySpark для этого вида преобразования используется класс Binarizer.

Binarizer принимает на вход столбцец inputCol и выходной столбец outputCol, а также порог бинаризации. Значения, превышающие пороговое значение, преобразуются в 1.0; значения, которые равны или меньше порогового значения, преобразуются в 0.0. Типы входных столбцов может быть также Vector и Double. Вот так выглядит пример бинаризации в PySpark:

from pyspark.ml.feature import Binarizer

continuousDataFrame = spark.createDataFrame([
    (0, 0.1),
    (1, 0.8),
    (2, 0.2)
], ["id", "feature"])
binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized")
binarizedDataFrame = binarizer.transform(continuousDataFrame)
print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
#
Binarizer output with Threshold = 0.500000
+---+-------+---------+
| id|feature|binarized|
+---+-------+---------+
|  0|    0.1|      0.0|
|  1|    0.8|      1.0|
|  2|    0.2|      0.0|
+---+-------+---------+

StringIndexer: из строк в числа

StringIndexer кодирует строковые столбцы в столбцы индексов. Он очень удобен при обучении алгоритмов Machine Learning, поскольку они работают только с числами. Ниже пример кода для преобразования из строковых значений в числовые в PySpark. Значения индексов будут находятся в диапазоне [0, количество_меток). Поддерживаются четыре варианта упорядочения (по умолчанию стоит frequencyDesc).

  • frequencyDesc: в порядке убывания по частоте метки (наиболее частой метке присвоено 0)
  • frequencyAsc: в порядке возрастания по частоте метки (наименее частой метке присвоено 0)
  • alphabetDesc: алфавитный порядок по убыванию
  • alphabetAsc: возрастающий алфавитный порядок
from pyspark.ml.feature import StringIndexer

data = spark.createDataFrame([
    (0, 'a'),
    (1, 'b'),
    (2, 'c'),
    (3, 'a'),
    (4, 'a'),
    (5, 'c'),
], ["id", "feature"])
indexer = StringIndexer(inputCol="feature", outputCol="indexed", )
indexerModel = indexer.fit(data)
indexedData = indexerModel.transform(data)
indexedData.show()
#
+---+-------+-------+
| id|feature|indexed|
+---+-------+-------+
|  0|      a|    0.0|
|  1|      b|    2.0|
|  2|      c|    1.0|
|  3|      a|    0.0|
|  4|      a|    0.0|
|  5|      c|    1.0|
+---+-------+-------+

IndexToString: из чисел в строки

IndexToString является противоположной PySpark-операцией StringIndexer, т.е. производит преобразование из чисел в строки. IndexToString принимает на вход результат StringIndexer, и еще нужно указать название преобразованного столбца. Возвращаясь к предыдущем примеру преобразуем назад столбец indexedData в строки:

from pyspark.ml.feature import IndexToString, StringIndexer

converter = IndexToString(inputCol="indexed", outputCol="originalCategory")
converted = converter.transform(indexedData)
converted.select("id", "indexed", "originalCategory").show()
#
+---+-------+----------------+
| id|indexed|originalCategory|
+---+-------+----------------+
|  0|    0.0|               a|
|  1|    2.0|               b|
|  2|    1.0|               c|
|  3|    0.0|               a|
|  4|    0.0|               a|
|  5|    1.0|               c|
+---+-------+----------------+

Imputer: заменяем недостающие значения на среднее или медианное

Imputer дополняет недостающие значения в наборе данных средним или медианным значением столбцов, в которых находятся пропущенные значения. Столбцы должны быть числового типа. У Imputer могут возникнуть проблемы с категориальными признаками и возможно выдаст в неправильном формате. Кроме того Imputer может заменять отличные от NaN значения, их тогда нужно передать в аргумент missingValue. Например, missingValue=0 заменит все 0 на средние или медианные значения. Пример преобразования Imputer в PySpark для замены недостающих значений на медианное выглядит следующим образом:

from pyspark.ml.feature import Imputer

df = spark.createDataFrame([
    (1.0, float("nan")),
    (2.0, float("nan")),
    (float("nan"), 3.0),
    (4.0, 4.0),
    (5.0, 5.0)
], ["a", "b"])
imputer = Imputer(
    strategy="median", missingValue=float("nan"),
    inputCols=["a", "b"], outputCols=["out_a", "out_b"],)
model = imputer.fit(df)
model.transform(df).show()
#
+---+---+-----+-----+
|  a|  b|out_a|out_b|
+---+---+-----+-----+
|1.0|NaN|  1.0|  4.0|
|2.0|NaN|  2.0|  4.0|
|NaN|3.0|  2.0|  3.0|
|4.0|4.0|  4.0|  4.0|
|5.0|5.0|  5.0|  5.0|
+---+---+-----+-----+

Чтобы заменить недостающие значения на средние, в аргумент strategy следовало бы передать значение "mean".

 

 

О том, как применять различные преобразования в PySpark для решения реальных задач Big Data, вы узнаете на специализированном курсе «Анализ данных с Apache Spark» в лицензированном учебном центре обучения и повышения квалификации разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве.

Источники

  1. https://spark.apache.org/docs/latest/ml-features.html