#apache-spark #pyspark #apache-spark-sql
#apache-spark #pyspark #apache-spark-sql
Вопрос:
Я хочу проверить, есть ли последние два значения массива в PySpark Dataframe [1, 0]
, и обновить его до [1, 1]
Входной фрейм данных
Column1 Array_column
abc [0,1,1,0]
def [1,1,0,0]
adf [0,0,1,0]
Выходной фрейм данных
Column1 Array_column
abc [0,1,1,1]
def [1,1,0,0]
adf [0,0,1,1]
Ответ №1:
>>> def udf1(i):
if (i[2]==1) amp; (i[3]==0):
i[3]=1
else:
i[3]=i[3]
return i
>>> udf2=udf(udf1)
df1.withColumn("Array_Column",udf2(col("Array_Column"))).show()
------- ------------
|Column1|Array_Column|
------- ------------
| abc|[0, 1, 1, 1]|
| def|[1, 1, 0, 0]|
| adf|[0, 0, 1, 1]|
------- ------------
Ответ №2:
Вы можете комбинировать функции массива с выражением when :
from pyspark.sql import functions as F
df1 = df.withColumn(
"Array_column",
F.when(
F.slice("Array_column", -2, 2) == F.array(F.lit(1), F.lit(0)),
F.flatten(F.array(F.expr("slice(Array_column, 1, size(Array_column) - 2)"), F.array(F.lit(1), F.lit(1))))
).otherwise(F.col("Array_column"))
)
df1.show()
# ------- ------------
#|Column1|Array_column|
# ------- ------------
#| abc|[0, 1, 1, 1]|
#| def|[1, 1, 0, 0]|
#| adf|[0, 0, 1, 1]|
# ------- ------------
Ответ №3:
Вы можете нарезать массив, выполнить a case when
для последних двух элементов и объединить два фрагмента с помощью concat
.
import pyspark.sql.functions as F
df2 = df.withColumn(
'Array_column',
F.expr("""
concat(
slice(Array_column, 1, size(Array_column) - 2),
case when slice(Array_column, size(Array_column) - 1, 2) = array(1,0)
then array(1,1)
else slice(Array_column, size(Array_column) - 1, 2)
end
)
""")
)
df2.show()
------- ------------
|Column1|Array_column|
------- ------------
| abc|[0, 1, 1, 1]|
| def|[1, 1, 0, 0]|
| adf|[0, 0, 1, 1]|
------- ------------