Проверьте начало, середину и конец групп в Spark

#apache-spark #apache-spark-sql #window-functions

#apache-spark #apache-spark-sql #окно-функции

Вопрос:

У меня есть фрейм данных Spark, который выглядит следующим образом:

  --- ----------- ------------------------- --------------- 
| id| Phase     | Switch                  | InputFileName |
 --- ----------- ------------------------- --------------- 
|  1|          2|                        1|          fileA|
|  2|          2|                        1|          fileA|
|  3|          2|                        1|          fileA|
|  4|          2|                        0|          fileA|
|  5|          2|                        0|          fileA|
|  6|          2|                        1|          fileA|
| 11|          2|                        1|          fileB|
| 12|          2|                        1|          fileB|
| 13|          2|                        0|          fileB|
| 14|          2|                        0|          fileB|
| 15|          2|                        1|          fileB|
| 16|          2|                        1|          fileB|
| 21|          4|                        1|          fileB|
| 22|          4|                        1|          fileB|
| 23|          4|                        1|          fileB|
| 24|          4|                        1|          fileB|
| 25|          4|                        1|          fileB|
| 26|          4|                        0|          fileB|
| 31|          1|                        0|          fileC|
| 32|          1|                        0|          fileC|
| 33|          1|                        0|          fileC|
| 34|          1|                        0|          fileC|
| 35|          1|                        0|          fileC|
| 36|          1|                        0|          fileC|
 --- ----------- ------------------------- --------------- 
  

Для каждой группы (комбинация InputFileName и Phase ) мне нужно запустить функцию проверки, которая проверяет, что Switch значение равно 1 в самом начале и в конце группы, и переходит в 0 в любой промежуточный момент. Функция должна добавить результат проверки в виде нового столбца. Ожидаемый результат приведен ниже: (пробелы предназначены только для выделения разных групп)

  --- ----------- ------------------------- --------------- -------- 
| id| Phase     | Switch                  | InputFileName | Valid  |
 --- ----------- ------------------------- --------------- -------- 
|  1|          2|                        1|          fileA|   true |
|  2|          2|                        1|          fileA|   true |
|  3|          2|                        1|          fileA|   true |
|  4|          2|                        0|          fileA|   true |
|  5|          2|                        0|          fileA|   true |
|  6|          2|                        1|          fileA|   true |

| 11|          2|                        1|          fileB|   true |
| 12|          2|                        1|          fileB|   true |
| 13|          2|                        0|          fileB|   true |
| 14|          2|                        0|          fileB|   true |
| 15|          2|                        1|          fileB|   true |
| 16|          2|                        1|          fileB|   true |

| 21|          4|                        1|          fileB|   false|
| 22|          4|                        1|          fileB|   false|
| 23|          4|                        1|          fileB|   false|
| 24|          4|                        1|          fileB|   false|
| 25|          4|                        1|          fileB|   false|
| 26|          4|                        0|          fileB|   false|

| 31|          1|                        0|          fileC|   false|
| 32|          1|                        0|          fileC|   false|
| 33|          1|                        0|          fileC|   false|
| 34|          1|                        0|          fileC|   false|
| 35|          1|                        0|          fileC|   false|
| 36|          1|                        0|          fileC|   false|
 --- ----------- ------------------------- --------------- -------- 

  

Ранее я решал эту проблему с помощью Pyspark и Pandas UDF:

 df = df.groupBy("InputFileName", "Phase").apply(validate_profile)

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def validate_profile(df: pd.DataFrame):
    first_valid = True if df["Switch"].iloc[0] == 1 else False
    during_valid = (df["Switch"].iloc[1:-1] == 0).any()
    last_valid = True if df["Switch"].iloc[-1] == 1 else False
    df["Valid"] = first_valid amp; during_valid amp; last_valid
    return df
  

Однако теперь мне нужно переписать это в Scala. Я просто хочу знать, как лучше всего это сделать.

В настоящее время я пытаюсь использовать оконные функции для получения первого и последнего идентификаторов каждой группы:

 val minIdWindow = Window.partitionBy("InputFileName", "Phase").orderBy("id")
val maxIdWindow = Window.partitionBy("InputFileName", "Phase").orderBy(col("id").desc)
  

Затем я могу добавить идентификаторы min и max в виде отдельных столбцов и использовать when для получения начального и конечного значений Switch :

 df.withColumn("MinId", min("id").over(minIdWindow))
    .withColumn("MaxId", max("id").over(maxIdWindow))
    .withColumn("Valid", when(
        col("id") === col("MinId"), col("Switch")
    ).when(
        col("id") === col("MaxId"), col("Switch")
    ))
  

Это дает мне начальное и конечное значения, но я не уверен, как проверить, Switch равно ли значение 0 между ними. Я на правильном пути, используя оконные функции? Или вы бы порекомендовали альтернативное решение?

Ответ №1:

Попробуйте это,

 val wind = Window.partitionBy("InputFileName", "Phase").orderBy("id")
  .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

val df1 = df.withColumn("Valid", 
  when(first("Switch").over(wind) === 1 
    amp;amp; last("Switch").over(wind) === 1 
    amp;amp; min("Switch").over(wind) === 0, true)
    .otherwise(false))
df1.orderBy("id").show() //Ordering for display purpose
  

Вывод:

  --- ----- ------ ------------- ----- 
| id|Phase|Switch|InputFileName|Valid|
 --- ----- ------ ------------- ----- 
|  1|    2|     1|        fileA| true|
|  2|    2|     1|        fileA| true|
|  3|    2|     1|        fileA| true|
|  4|    2|     0|        fileA| true|
|  5|    2|     0|        fileA| true|
|  6|    2|     1|        fileA| true|
| 11|    2|     1|        fileB| true|
| 12|    2|     1|        fileB| true|
| 13|    2|     0|        fileB| true|
| 14|    2|     0|        fileB| true|
| 15|    2|     1|        fileB| true|
| 16|    2|     1|        fileB| true|
| 21|    4|     1|        fileB|false|
| 22|    4|     1|        fileB|false|
| 23|    4|     1|        fileB|false|
| 24|    4|     1|        fileB|false|
| 25|    4|     1|        fileB|false|
| 26|    4|     0|        fileB|false|
| 31|    1|     0|        fileC|false|
| 32|    1|     0|        fileC|false|
 --- ----- ------ ------------- ----- 
  

Комментарии:

1. Спасибо @Sathiyan S, это сделало это. Это подтвердило мой подход к использованию функций Windows. Неограниченное окно также было ключевой информацией. Прочитав об этом, я теперь вижу, что он учитывает все окно, а не только текущую строку.