#pyspark #apache-spark-sql
#pyspark #apache-spark-sql
Вопрос:
У меня есть такие данные, как:
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as F
dd = spark.createDataFrame([
('0', [Row(f1=0),Row(f1=1),Row(f1=None)]),
('1', [Row(f1=None), Row(f1=2)]),
('2', [])
], ['id', 'arr'])
И требуется новый столбец, содержащий первый ненулевой элемент в массиве arr или null. В этом случае:
id | target_elt
0 | 1
1 | 2
2 | Null
Обратите внимание, что элементы массива имеют тип Struct с полем IntegerType «f1»
Моя попытка:
positiveNonNull = F.udf(
lambda array: [
x.f1 for x in array
if (x.f1 is not None) amp; (x.f1 > 0)
], ArrayType(LongType())
)
dd.withColumn('newcol', positiveNonNull(F.col('arr')).getItem(0)).show()
Я получаю TypeError: ‘>=’ не поддерживается между экземплярами ‘NoneType’ и ‘int’
Ответ №1:
Понял это, обернув лямбда-код во вспомогательный:
def val_if_pos(f1_value):
if f1_value > 0:
return f1_value
posNonNull = F.udf(lambda array: [val_if_pos(x.f1) for x in array if x.f1 is not None], ArrayType(LongType()))
(dd.withColumn('_temp', posNonNull(F.col('arr'))
).withColumn('firstPosNonNull', F.expr("FILTER(_temp, x -> x is not null)").getItem(0)
).drop('_temp')
).show()