Заполните номер индекса на основе количества разделенных слов в pyspark

#python #apache-spark #pyspark #apache-spark-sql #user-defined-functions

Вопрос:

Фрейм данных (Pyspark)

  ----------- --- ----------- 
|       Name|Age|word_count |
 ----------- --- ----------- 
|      John | 23|         1 |
|Paul Din O.| 45|         3 |        
|Kelvin Tino| 12|         2 |
 ----------- --- ----------- 
 

Ожидаемый Результат:

  ----------- --- ----------- 
|       Name|Age|word_index |
 ----------- --- ----------- 
|      John | 23|         0 |
|      Paul | 45|         0 |
|       Din | 45|         1 |
|        O. | 45|         2 |        
|    Kelvin | 12|         0 |
|      Tino | 12|         1 |
 ----------- --- ----------- 
 

Цели:

  1. Создайте повторяющиеся строки, разделив name поле.
  2. Переиндексируйте каждый разделенный блок name . (т. е. индекс будет сброшен для каждого name )

Рассчитал количество по приведенному ниже коду,

 def countc(inp='', search_chars='', modifier=None):
"""
Counts the number of characters that appear or do not appear in a list of characters.
"""
# Modifier(s)
if modifier is not None: 
    modifier = modifier.lower()
    if modifier == 'i':
        # Ignore case
        inp = inp.lower()

count = 0
for c in search_chars:
    count  = inp.count(c)
return count

udf_countc = F.udf(lambda x, y: countc(x, y), IntegerType())
# spark.udf.register('udf_countc', udf_countc)
df = df.withColumn('word_count', udf_countc(F.col('Name'), F.lit(' ')))
 

Сгенерировал повторяющиеся строки с помощью приведенного ниже кода.

 df.withColumn('DuplicatedRow', F.expr('explode(array_repeat(name, F.col('word_count')))')).show()
 

Как мы можем поместить разделенные слова в каждую ячейку и заполнить index каждый блок разделенного name поля?

Ответ №1:

posexplode делает именно то, что вы хотите:

 import pyspark.sql.functions as F

df2 = df.select(F.posexplode(F.split('Name', ' ')).alias('word_index', 'Name'), 'Age')

df2.show()
 ---------- ------ --- 
|word_index|  Name|Age|
 ---------- ------ --- 
|         0|  John| 23|
|         0|  Paul| 45|
|         1|   Din| 45|
|         2|    O.| 45|
|         0|Kelvin| 12|
|         1|  Tino| 12|
 ---------- ------ --- 
 

Для вашего отредактированного вопроса и комментария,

 def countc(inp='', search_chars='', modifier=None):
    """
    Counts the number of characters that appear or do not appear in a list of characters.
    """
    # Modifier(s)
    if modifier is not None: 
        modifier = modifier.lower()
        if modifier == 'i':
            # Ignore case
            inp = inp.lower()    
    count = 0
    for c in search_chars:
        count  = inp.count(c)
    return list(range(count 1))

udf_countc = F.udf(lambda x, y: countc(x, y), 'array<int>')
df2 = df.withColumn('word_count', F.explode(udf_countc(F.col('Name'), F.lit(' '))))

df2.show()
 ----------- --- ---------- 
|       Name|Age|word_count|
 ----------- --- ---------- 
|       John| 23|         0|
|Paul Din O.| 45|         0|
|Paul Din O.| 45|         1|
|Paul Din O.| 45|         2|
|Kelvin Tino| 12|         0|
|Kelvin Tino| 12|         1|
 ----------- --- ---------- 
 

Комментарии:

1. @JaiK Смотрите мой отредактированный ответ. Вы можете поместить range его внутрь UDF и изменить тип возвращаемого значения UDF на array<int> вместо int.