#python #apache-spark #pyspark #apache-spark-sql #user-defined-functions
Вопрос:
Фрейм данных (Pyspark)
----------- --- -----------
| Name|Age|word_count |
----------- --- -----------
| John | 23| 1 |
|Paul Din O.| 45| 3 |
|Kelvin Tino| 12| 2 |
----------- --- -----------
Ожидаемый Результат:
----------- --- -----------
| Name|Age|word_index |
----------- --- -----------
| John | 23| 0 |
| Paul | 45| 0 |
| Din | 45| 1 |
| O. | 45| 2 |
| Kelvin | 12| 0 |
| Tino | 12| 1 |
----------- --- -----------
Цели:
- Создайте повторяющиеся строки, разделив
name
поле. - Переиндексируйте каждый разделенный блок
name
. (т. е. индекс будет сброшен для каждогоname
)
Рассчитал количество по приведенному ниже коду,
def countc(inp='', search_chars='', modifier=None):
"""
Counts the number of characters that appear or do not appear in a list of characters.
"""
# Modifier(s)
if modifier is not None:
modifier = modifier.lower()
if modifier == 'i':
# Ignore case
inp = inp.lower()
count = 0
for c in search_chars:
count = inp.count(c)
return count
udf_countc = F.udf(lambda x, y: countc(x, y), IntegerType())
# spark.udf.register('udf_countc', udf_countc)
df = df.withColumn('word_count', udf_countc(F.col('Name'), F.lit(' ')))
Сгенерировал повторяющиеся строки с помощью приведенного ниже кода.
df.withColumn('DuplicatedRow', F.expr('explode(array_repeat(name, F.col('word_count')))')).show()
Как мы можем поместить разделенные слова в каждую ячейку и заполнить index
каждый блок разделенного name
поля?
Ответ №1:
posexplode
делает именно то, что вы хотите:
import pyspark.sql.functions as F
df2 = df.select(F.posexplode(F.split('Name', ' ')).alias('word_index', 'Name'), 'Age')
df2.show()
---------- ------ ---
|word_index| Name|Age|
---------- ------ ---
| 0| John| 23|
| 0| Paul| 45|
| 1| Din| 45|
| 2| O.| 45|
| 0|Kelvin| 12|
| 1| Tino| 12|
---------- ------ ---
Для вашего отредактированного вопроса и комментария,
def countc(inp='', search_chars='', modifier=None):
"""
Counts the number of characters that appear or do not appear in a list of characters.
"""
# Modifier(s)
if modifier is not None:
modifier = modifier.lower()
if modifier == 'i':
# Ignore case
inp = inp.lower()
count = 0
for c in search_chars:
count = inp.count(c)
return list(range(count 1))
udf_countc = F.udf(lambda x, y: countc(x, y), 'array<int>')
df2 = df.withColumn('word_count', F.explode(udf_countc(F.col('Name'), F.lit(' '))))
df2.show()
----------- --- ----------
| Name|Age|word_count|
----------- --- ----------
| John| 23| 0|
|Paul Din O.| 45| 0|
|Paul Din O.| 45| 1|
|Paul Din O.| 45| 2|
|Kelvin Tino| 12| 0|
|Kelvin Tino| 12| 1|
----------- --- ----------
Комментарии:
1. @JaiK Смотрите мой отредактированный ответ. Вы можете поместить
range
его внутрь UDF и изменить тип возвращаемого значения UDF наarray<int>
вместо int.