Pyspark: изменение значений в столбце массива на основе другого столбца массива

#python #apache-spark #pyspark #apache-spark-sql

Вопрос:

У меня есть следующий фрейм данных pyspark:

 root
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- posTags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- dependencies: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- labelledDependencies: array (nullable = true)
 |    |-- element: string (containsNull = true)
 

с примером следующих данных

  ------------------------------ --------------------------- ----------------------------------- -------------------------------------------- 
|tokens                        |posTags                    |dependencies                       |labelledDependencies                        |
 ------------------------------ --------------------------- ----------------------------------- -------------------------------------------- 
|[i, try, to, get, my, balance]|[NNP, VB, TO, VB, PRP$, NN]|[try, ROOT, get, try, balance, get]|[nsubj, root, mark, parataxis, appos, nsubj]|
 ------------------------------ --------------------------- ----------------------------------- -------------------------------------------- 
 

Я хочу изменить помеченную зависимость баланса токена с nsubj на dobj.

Моя логика такова: если вы обнаружите помеченную зависимость nsubj , а токен имеет POS тег NN , а токен зависит от токена, у которого есть POS тег VB (get), тогда измените nsubj на dobj .

Я могу сделать это с помощью следующей функции:

 def change_things(tokens,posTags,dependencies,labelledDependencies):
    for i in range(0,len(labelledDependencies)):
        if labelledDependencies[i] == 'nsubj':
            if posTags[i] == 'NN':
                if posTags[tokens.index(dependencies[i])] == 'VB':
                    labelledDependencies[i] = 'dobj'
    return tokens,posTags,dependencies,labelledDependencies
 

и, возможно, даже зарегистрировать его как udf.

Однако мой вопрос заключается в том, как я могу сделать это без использования udf и вместо этого использовать только встроенные методы pyspark.

Ответ №1:

Вы можете использовать встроенную transform функцию Spark :

 import pyspark.sql.functions as F

df2 = df.withColumn(
    "labelledDependencies",
    F.expr("""transform(
            labelledDependencies, 
            (x, i) -> CASE WHEN x = 'nsubj' 
                                AND posTags[i] = 'NN' 
                                AND posTags[array_position(tokens, dependencies[i]) - 1] = 'VB' 
                           THEN 'dobj'
                           ELSE x
                      END
        )
    """)
)



df2.show(1, False)
# ------------------------------ --------------------------- ----------------------------------- ------------------------------------------- 
#|tokens                        |posTags                    |dependencies                       |labelledDependencies                       |
# ------------------------------ --------------------------- ----------------------------------- ------------------------------------------- 
#|[i, try, to, get, my, balance]|[NNP, VB, TO, VB, PRP$, NN]|[try, ROOT, get, try, balance, get]|[nsubj, root, mark, parataxis, appos, dobj]|
# ------------------------------ --------------------------- ----------------------------------- ------------------------------------------- 
 

Комментарии:

1. Спасибо! Это сработало. Что AND posTags[array_position(tokens, dependencies[i]) - 1] именно это делает?

2. @romborimba это эквивалентно условию if posTags[tokens.index(dependencies[i])] == 'VB' в вашем коде. i является индексом текущего элемента в labelledDependencies массиве.

3. Спасибо, все понятно. Последний вопрос, как бы я поступил, если бы захотел создать новый столбец, а не преобразовывать существующий?

4. @romborimba просто переименуйте его 😉 df.withColumn("labelledDependencies_v2", ...)