#python #apache-spark #pyspark #apache-spark-sql
Вопрос:
df = spark.createDataFrame([("A", "X-X-------------------------------X--X---XX-X--X-------")],["id", "value"])
Из приведенного выше фрейма данных найдите все вхождения X
значения столбца for
Ожидаемый результат:
ID | ценность |
---|---|
A | [1, 3, 35, 38, 42, 43, 45, 48] |
Комментарии:
1. Получение этой ошибки Из UDF было выдано исключение: «Ошибка типа: объект «Нетип» не может быть повторен»
2. в наборе данных могут быть нулевые значения . Вы можете проверить значение параметра, принятое UDF,
None
прежде чем выполнять итерацию, напримерif str_val is None: return []
3. спасибо, это сработало
Ответ №1:
Вы можете использовать пользовательский udf для достижения этой цели, например
from pyspark.sql import functions as F
from pyspark.sql import types as T
@F.udf(T.ArrayType(T.IntegerType()))
def udf_val_indexes(str_val):
indexes = []
for index,val in enumerate(str_val):
if val=="X":
indexes.append(index 1)
return indexes
df.withColumn("value",udf_val_indexes(F.col("value"))).show(truncate=False)
--- ------------------------------
|id |value |
--- ------------------------------
|A |[1, 3, 35, 38, 42, 43, 45, 48]|
--- ------------------------------
или разделите по вашему персонажу и с помощью posexplode суммируйте, чтобы найти индексы, прежде чем группировать их в одну строку, как показано ниже.
nb. Предложения order by помогают поддерживать порядок индексов.
from pyspark.sql import functions as F
from pyspark.sql import Window
(
df.withColumn("val_split",F.split("value","X"))
.select(
F.col("id"),
F.posexplode("val_split")
)
.withColumn("row_pos_to_exclude",F.max("pos").over(Window.partitionBy("id")))
.filter(F.col("pos") != F.col("row_pos_to_exclude") )
.withColumn("val_split_len",F.length("col") 1)
.withColumn(
"val_split_len",
F.sum("val_split_len").over(
Window.partitionBy("id")
.orderBy("pos")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
)
.withColumn(
"value",
F.collect_list("val_split_len").over(
Window.partitionBy("id")
.orderBy("pos")
)
)
.groupBy("id")
.agg(
F.max("value").alias("value")
)
).show(truncate=False)
--- ------------------------------
|id |value |
--- ------------------------------
|A |[1, 3, 35, 38, 42, 43, 45, 48]|
--- ------------------------------
Дайте мне знать, если это сработает для вас.