Найдите положение всех вхождений значений столбца во фрейме данных

#python #apache-spark #pyspark #apache-spark-sql

Вопрос:

 df = spark.createDataFrame([("A", "X-X-------------------------------X--X---XX-X--X-------")],["id", "value"])
 

Из приведенного выше фрейма данных найдите все вхождения X значения столбца for

Ожидаемый результат:

ID ценность
A [1, 3, 35, 38, 42, 43, 45, 48]

Комментарии:

1. Получение этой ошибки Из UDF было выдано исключение: «Ошибка типа: объект «Нетип» не может быть повторен»

2. в наборе данных могут быть нулевые значения . Вы можете проверить значение параметра, принятое UDF, None прежде чем выполнять итерацию, например if str_val is None: return []

3. спасибо, это сработало

Ответ №1:

Вы можете использовать пользовательский udf для достижения этой цели, например

 from pyspark.sql import functions as F
from pyspark.sql import types as T

@F.udf(T.ArrayType(T.IntegerType()))
def udf_val_indexes(str_val):
    indexes = []
    for index,val in enumerate(str_val):
        if val=="X":
            indexes.append(index 1)
    return indexes

df.withColumn("value",udf_val_indexes(F.col("value"))).show(truncate=False)
 
  --- ------------------------------ 
|id |value                         |
 --- ------------------------------ 
|A  |[1, 3, 35, 38, 42, 43, 45, 48]|
 --- ------------------------------ 
 

или разделите по вашему персонажу и с помощью posexplode суммируйте, чтобы найти индексы, прежде чем группировать их в одну строку, как показано ниже.

nb. Предложения order by помогают поддерживать порядок индексов.

 from pyspark.sql import functions as F
from pyspark.sql import Window
(
    df.withColumn("val_split",F.split("value","X"))
      .select(
          F.col("id"),
          F.posexplode("val_split") 
      )
      .withColumn("row_pos_to_exclude",F.max("pos").over(Window.partitionBy("id")))
      .filter(F.col("pos") != F.col("row_pos_to_exclude") )
      .withColumn("val_split_len",F.length("col") 1)
      .withColumn(
          "val_split_len",
          F.sum("val_split_len").over(
              Window.partitionBy("id")
                    .orderBy("pos")
                    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
          )
      )
      .withColumn(
          "value",
          F.collect_list("val_split_len").over(
              Window.partitionBy("id")
                    .orderBy("pos")
          )
      )
      .groupBy("id")
      .agg(
          F.max("value").alias("value")
      )
).show(truncate=False)
 
  --- ------------------------------ 
|id |value                         |
 --- ------------------------------ 
|A  |[1, 3, 35, 38, 42, 43, 45, 48]|
 --- ------------------------------ 
 

Дайте мне знать, если это сработает для вас.