Как преобразовать несколько категориальных столбцов в целые числа, сохраняющие общие значения в PySpark?

#python #apache-spark #pyspark

#python #apache-spark #PySpark

Вопрос:

Есть ли простой способ преобразовать несколько столбцов с общими метками в столбцы целых чисел, сохраняя эти общие метки как целые числа?

Вот что я попробовал:

 from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

df = spark.createDataFrame(
        [(0, "a", "b"), (1, "b", "b"), (2, "c", "b"), 
         (3, "a", "b"), (4, "a", "a"), (5, "c", "a")],
        ["id", "c1", "c2"])

columns = df.columns
columns.remove('id')

indexers = [StringIndexer(inputCol="{}".format(col), outputCol="{}_index".format(col)) for col in columns]
pipeline = Pipeline(stages=indexers)

indexed = pipeline.fit(df).transform(df)
indexed.show()

 --- --- --- -------- -------- 
| id| c1| c2|c1_index|c2_index|
 --- --- --- -------- -------- 
|  0|  a|  b|     0.0|     0.0|
|  1|  b|  b|     2.0|     0.0|
|  2|  c|  b|     1.0|     0.0|
|  3|  a|  b|     0.0|     0.0|
|  4|  a|  a|     0.0|     1.0|
|  5|  c|  a|     1.0|     1.0|
 --- --- --- -------- -------- 
 

Результат, который я хотел бы получить, это:

  --- --- --- -------- -------- 
| id| c1| c2|c1_index|c2_index|
 --- --- --- -------- -------- 
|  0|  a|  b|     0.0|     2.0|
|  1|  b|  b|     2.0|     2.0|
|  2|  c|  b|     1.0|     2.0|
|  3|  a|  b|     0.0|     2.0|
|  4|  a|  a|     0.0|     0.0|
|  5|  c|  a|     1.0|     0.0|
 --- --- --- -------- -------- 
 

Я полагаю, что могу извлечь все уникальные значения по столбцам, создать словарь и использовать его для замены во всех категориальных столбцах. Но мне интересно, есть ли более простой способ сделать это.

Моя система:

  • python 2.7
  • pyspark 2.2.0

Редактировать:

Я попытался использовать решение, предложенное @chlebek. Я адаптировал его для pyspark 2.2.0, и это результат:

 from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

df = spark.createDataFrame(
        [(0, "a", "b"), (1, "b", "b"), (2, "c", "b"), 
         (3, "a", "b"), (4, "a", "a"), (5, "c", "a")],
        ["id", "c1", "c2"])

columns = df.columns
columns.remove('id')

indexer = StringIndexer(inputCol='c1', outputCol='c1_i')
model = indexer.fit(df)
indexed = model.transform(df)

indexed.show()

model2 = model._java_obj.setInputCol('c2').setOutputCol('c2_i')
indexed2 = model2.transform(indexed)

indexed2.show()
 

Выполнение получает следующее исключение (я опустил часть выходных данных):

 ---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-17-1f8dd5cc9b11> in <module>()
     18 
     19 model2 = model._java_obj.setInputCol('c2').setOutputCol('c2_i')
---> 20 indexed2 = model2.transform(indexed)
     21 
     22 indexed2.show()

[...]


AttributeError: 'DataFrame' object has no attribute '_get_object_id'
 

Я предполагаю, что при использовании model._java_obj я что-то путаю, но я не знаю, что именно. Типы для model и model2 разные, и AFAIK они должны быть одинаковыми:

 print(type(model))

<class 'pyspark.ml.feature.StringIndexerModel'>

print(type(model2))

<class 'py4j.java_gateway.JavaObject'>
 

Редактировать 2:

Я добавлю выполнение решения, рекомендованного @chlebek, без адаптации для pyspark 2.2.0:

 df = spark.createDataFrame(
        [(0, "a", "b"), (1, "b", "b"), (2, "c", "b"), 
         (3, "a", "b"), (4, "a", "a"), (5, "c", "a")],
        ["id", "c1", "c2"])

columns = df.columns
columns.remove('id')

indexer = StringIndexer(inputCol='c1', outputCol='c1_i')
model = indexer.fit(df)
indexed = model.transform(df)

model2 = model.setInputCol('c2').setOutputCol('c2_i')
indexed2 = model2.transform(indexed)

indexed2.show()
 

Что дает следующий результат:

 ---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-18-2bbc90b5fdd3> in <module>()
     13 indexed.show()
     14 
---> 15 model2 = model.setInputCol('c2').setOutputCol('c2_i')
     16 indexed2 = model2.transform(indexed)
     17 

AttributeError: 'StringIndexerModel' object has no attribute 'setInputCol'
 

Ответ №1:

Вы не можете сделать это за один transform шаг. Вы должны обучить свой StringIndexerModel первый столбец model = indexer.fit(df) , а затем использовать эту модель с измененными столбцами ввода / вывода во втором столбце.

 import org.apache.spark.ml.feature.StringIndexer

val df = spark.createDataFrame( Seq((0, "a", "b"), (1, "b", "b"), (2, "c", "b"), (3, "a", "b"), (4, "a", "a"), (5, "c", "a"))).toDF("id", "category", "category2")

val indexer = new StringIndexer().setInputCol("category").setOutputCol("categoryIndex")

val model =  indexer.fit(df)
val indexed = model.transform(df)

indexed.show()

 --- -------- --------- ------------- 
| id|category|category2|categoryIndex|
 --- -------- --------- ------------- 
|  0|       a|        b|          0.0|
|  1|       b|        b|          2.0|
|  2|       c|        b|          1.0|
|  3|       a|        b|          0.0|
|  4|       a|        a|          0.0|
|  5|       c|        a|          1.0|
 --- -------- --------- ------------- 
    

val model2 = model.setInputCol("category2").setOutputCol("categoryIndex2")
val indexed2 = model2.transform(indexed).show()

 --- -------- --------- ------------- -------------- 
| id|category|category2|categoryIndex|categoryIndex2|
 --- -------- --------- ------------- -------------- 
|  0|       a|        b|          0.0|           2.0|
|  1|       b|        b|          2.0|           2.0|
|  2|       c|        b|          1.0|           2.0|
|  3|       a|        b|          0.0|           2.0|
|  4|       a|        a|          0.0|           0.0|
|  5|       c|        a|          1.0|           0.0|
 --- -------- --------- ------------- -------------- 
 

Редактировать:

вместо модификации StringIndexerModel вы можете использовать эту первую модель, но вам нужно изменить имена dataframe, чтобы они соответствовали именам столбцов в модели индексатора

 indexed.toDF("id","c1_1","c1","c1_i_1")  
indexed2 = model.transform(indexed)
 

итак, наконец, вы получите df с columns = ("id","c1_1","c1","c1_i_1","c1_i") и сможете переименовать их снова indexed2.toDF("id","c1","c2","c1_i","c2_i")

Комментарии:

1. Ваш ответ кажется правильным, но я пытаюсь запустить его в spark 2.2.0, и он не работает. Я отредактирую свой вопрос, чтобы показать ошибки, которые я получаю.

2. @ZakaElab вам не нужно это вызывать ._java_obj. , просто model.setInputCol("category2").setOutputCol("categoryIndex2") это работает для меня на python

3. Я добавил выполнение предложенного вами кода без использования . _java_obj. , помните, что я использую python 2.7 и pyspark 2.2.0. В документации pyspark 2.2.0 StringIndexModel не имеет метода setInputCol, поэтому я прибегнул к _java_obj

4. У меня есть другое решение, вместо модификации StringIndexerModel вы можете использовать это сначала model , но вам нужно изменить имена dataframe, чтобы они соответствовали именам столбцов в модели индексатора indexed.toDF("id","c1_1","c1","c1_i_1") indexed2 = model.transform(indexed) , чтобы, наконец, вы получили df с columns = ("id","c1_1","c1","c1_i_1","c1_i") и вы могли переименовать их снова indexed2.toDF("id","c1","c2","c1_i","c2_i")

5. Это чертовски сложный взлом, но он работает. Отредактируйте свой ответ, добавив это решение как то, которое работает с pyspark 2.2.0, чтобы принять его. Вы должны использовать handleInvalid=’keep’ в качестве параметра StringIndexer, чтобы обрабатывать новые значения, которые появляются в последующих столбцах. Спасибо за вашу помощь!