#python #apache-spark #pyspark
#python #apache-spark #PySpark
Вопрос:
Есть ли простой способ преобразовать несколько столбцов с общими метками в столбцы целых чисел, сохраняя эти общие метки как целые числа?
Вот что я попробовал:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
df = spark.createDataFrame(
[(0, "a", "b"), (1, "b", "b"), (2, "c", "b"),
(3, "a", "b"), (4, "a", "a"), (5, "c", "a")],
["id", "c1", "c2"])
columns = df.columns
columns.remove('id')
indexers = [StringIndexer(inputCol="{}".format(col), outputCol="{}_index".format(col)) for col in columns]
pipeline = Pipeline(stages=indexers)
indexed = pipeline.fit(df).transform(df)
indexed.show()
--- --- --- -------- --------
| id| c1| c2|c1_index|c2_index|
--- --- --- -------- --------
| 0| a| b| 0.0| 0.0|
| 1| b| b| 2.0| 0.0|
| 2| c| b| 1.0| 0.0|
| 3| a| b| 0.0| 0.0|
| 4| a| a| 0.0| 1.0|
| 5| c| a| 1.0| 1.0|
--- --- --- -------- --------
Результат, который я хотел бы получить, это:
--- --- --- -------- --------
| id| c1| c2|c1_index|c2_index|
--- --- --- -------- --------
| 0| a| b| 0.0| 2.0|
| 1| b| b| 2.0| 2.0|
| 2| c| b| 1.0| 2.0|
| 3| a| b| 0.0| 2.0|
| 4| a| a| 0.0| 0.0|
| 5| c| a| 1.0| 0.0|
--- --- --- -------- --------
Я полагаю, что могу извлечь все уникальные значения по столбцам, создать словарь и использовать его для замены во всех категориальных столбцах. Но мне интересно, есть ли более простой способ сделать это.
Моя система:
- python 2.7
- pyspark 2.2.0
Редактировать:
Я попытался использовать решение, предложенное @chlebek. Я адаптировал его для pyspark 2.2.0, и это результат:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
df = spark.createDataFrame(
[(0, "a", "b"), (1, "b", "b"), (2, "c", "b"),
(3, "a", "b"), (4, "a", "a"), (5, "c", "a")],
["id", "c1", "c2"])
columns = df.columns
columns.remove('id')
indexer = StringIndexer(inputCol='c1', outputCol='c1_i')
model = indexer.fit(df)
indexed = model.transform(df)
indexed.show()
model2 = model._java_obj.setInputCol('c2').setOutputCol('c2_i')
indexed2 = model2.transform(indexed)
indexed2.show()
Выполнение получает следующее исключение (я опустил часть выходных данных):
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-17-1f8dd5cc9b11> in <module>()
18
19 model2 = model._java_obj.setInputCol('c2').setOutputCol('c2_i')
---> 20 indexed2 = model2.transform(indexed)
21
22 indexed2.show()
[...]
AttributeError: 'DataFrame' object has no attribute '_get_object_id'
Я предполагаю, что при использовании model._java_obj
я что-то путаю, но я не знаю, что именно. Типы для model и model2 разные, и AFAIK они должны быть одинаковыми:
print(type(model))
<class 'pyspark.ml.feature.StringIndexerModel'>
print(type(model2))
<class 'py4j.java_gateway.JavaObject'>
Редактировать 2:
Я добавлю выполнение решения, рекомендованного @chlebek, без адаптации для pyspark 2.2.0:
df = spark.createDataFrame(
[(0, "a", "b"), (1, "b", "b"), (2, "c", "b"),
(3, "a", "b"), (4, "a", "a"), (5, "c", "a")],
["id", "c1", "c2"])
columns = df.columns
columns.remove('id')
indexer = StringIndexer(inputCol='c1', outputCol='c1_i')
model = indexer.fit(df)
indexed = model.transform(df)
model2 = model.setInputCol('c2').setOutputCol('c2_i')
indexed2 = model2.transform(indexed)
indexed2.show()
Что дает следующий результат:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-18-2bbc90b5fdd3> in <module>()
13 indexed.show()
14
---> 15 model2 = model.setInputCol('c2').setOutputCol('c2_i')
16 indexed2 = model2.transform(indexed)
17
AttributeError: 'StringIndexerModel' object has no attribute 'setInputCol'
Ответ №1:
Вы не можете сделать это за один transform
шаг. Вы должны обучить свой StringIndexerModel
первый столбец model = indexer.fit(df)
, а затем использовать эту модель с измененными столбцами ввода / вывода во втором столбце.
import org.apache.spark.ml.feature.StringIndexer
val df = spark.createDataFrame( Seq((0, "a", "b"), (1, "b", "b"), (2, "c", "b"), (3, "a", "b"), (4, "a", "a"), (5, "c", "a"))).toDF("id", "category", "category2")
val indexer = new StringIndexer().setInputCol("category").setOutputCol("categoryIndex")
val model = indexer.fit(df)
val indexed = model.transform(df)
indexed.show()
--- -------- --------- -------------
| id|category|category2|categoryIndex|
--- -------- --------- -------------
| 0| a| b| 0.0|
| 1| b| b| 2.0|
| 2| c| b| 1.0|
| 3| a| b| 0.0|
| 4| a| a| 0.0|
| 5| c| a| 1.0|
--- -------- --------- -------------
val model2 = model.setInputCol("category2").setOutputCol("categoryIndex2")
val indexed2 = model2.transform(indexed).show()
--- -------- --------- ------------- --------------
| id|category|category2|categoryIndex|categoryIndex2|
--- -------- --------- ------------- --------------
| 0| a| b| 0.0| 2.0|
| 1| b| b| 2.0| 2.0|
| 2| c| b| 1.0| 2.0|
| 3| a| b| 0.0| 2.0|
| 4| a| a| 0.0| 0.0|
| 5| c| a| 1.0| 0.0|
--- -------- --------- ------------- --------------
Редактировать:
вместо модификации StringIndexerModel вы можете использовать эту первую модель, но вам нужно изменить имена dataframe, чтобы они соответствовали именам столбцов в модели индексатора
indexed.toDF("id","c1_1","c1","c1_i_1")
indexed2 = model.transform(indexed)
итак, наконец, вы получите df с columns = ("id","c1_1","c1","c1_i_1","c1_i")
и сможете переименовать их снова indexed2.toDF("id","c1","c2","c1_i","c2_i")
Комментарии:
1. Ваш ответ кажется правильным, но я пытаюсь запустить его в spark 2.2.0, и он не работает. Я отредактирую свой вопрос, чтобы показать ошибки, которые я получаю.
2. @ZakaElab вам не нужно это вызывать
._java_obj.
, простоmodel.setInputCol("category2").setOutputCol("categoryIndex2")
это работает для меня на python3. Я добавил выполнение предложенного вами кода без использования . _java_obj. , помните, что я использую python 2.7 и pyspark 2.2.0. В документации pyspark 2.2.0 StringIndexModel не имеет метода setInputCol, поэтому я прибегнул к _java_obj
4. У меня есть другое решение, вместо модификации StringIndexerModel вы можете использовать это сначала
model
, но вам нужно изменить имена dataframe, чтобы они соответствовали именам столбцов в модели индексатораindexed.toDF("id","c1_1","c1","c1_i_1") indexed2 = model.transform(indexed)
, чтобы, наконец, вы получили df с columns =("id","c1_1","c1","c1_i_1","c1_i")
и вы могли переименовать их сноваindexed2.toDF("id","c1","c2","c1_i","c2_i")
5. Это чертовски сложный взлом, но он работает. Отредактируйте свой ответ, добавив это решение как то, которое работает с pyspark 2.2.0, чтобы принять его. Вы должны использовать handleInvalid=’keep’ в качестве параметра StringIndexer, чтобы обрабатывать новые значения, которые появляются в последующих столбцах. Спасибо за вашу помощь!