#apache-spark #apache-spark-sql #window-functions
#apache-spark #apache-spark-sql #окно-функции
Вопрос:
У меня есть фрейм данных Spark, который выглядит следующим образом:
--- ----------- ------------------------- ---------------
| id| Phase | Switch | InputFileName |
--- ----------- ------------------------- ---------------
| 1| 2| 1| fileA|
| 2| 2| 1| fileA|
| 3| 2| 1| fileA|
| 4| 2| 0| fileA|
| 5| 2| 0| fileA|
| 6| 2| 1| fileA|
| 11| 2| 1| fileB|
| 12| 2| 1| fileB|
| 13| 2| 0| fileB|
| 14| 2| 0| fileB|
| 15| 2| 1| fileB|
| 16| 2| 1| fileB|
| 21| 4| 1| fileB|
| 22| 4| 1| fileB|
| 23| 4| 1| fileB|
| 24| 4| 1| fileB|
| 25| 4| 1| fileB|
| 26| 4| 0| fileB|
| 31| 1| 0| fileC|
| 32| 1| 0| fileC|
| 33| 1| 0| fileC|
| 34| 1| 0| fileC|
| 35| 1| 0| fileC|
| 36| 1| 0| fileC|
--- ----------- ------------------------- ---------------
Для каждой группы (комбинация InputFileName
и Phase
) мне нужно запустить функцию проверки, которая проверяет, что Switch
значение равно 1 в самом начале и в конце группы, и переходит в 0 в любой промежуточный момент. Функция должна добавить результат проверки в виде нового столбца. Ожидаемый результат приведен ниже: (пробелы предназначены только для выделения разных групп)
--- ----------- ------------------------- --------------- --------
| id| Phase | Switch | InputFileName | Valid |
--- ----------- ------------------------- --------------- --------
| 1| 2| 1| fileA| true |
| 2| 2| 1| fileA| true |
| 3| 2| 1| fileA| true |
| 4| 2| 0| fileA| true |
| 5| 2| 0| fileA| true |
| 6| 2| 1| fileA| true |
| 11| 2| 1| fileB| true |
| 12| 2| 1| fileB| true |
| 13| 2| 0| fileB| true |
| 14| 2| 0| fileB| true |
| 15| 2| 1| fileB| true |
| 16| 2| 1| fileB| true |
| 21| 4| 1| fileB| false|
| 22| 4| 1| fileB| false|
| 23| 4| 1| fileB| false|
| 24| 4| 1| fileB| false|
| 25| 4| 1| fileB| false|
| 26| 4| 0| fileB| false|
| 31| 1| 0| fileC| false|
| 32| 1| 0| fileC| false|
| 33| 1| 0| fileC| false|
| 34| 1| 0| fileC| false|
| 35| 1| 0| fileC| false|
| 36| 1| 0| fileC| false|
--- ----------- ------------------------- --------------- --------
Ранее я решал эту проблему с помощью Pyspark и Pandas UDF:
df = df.groupBy("InputFileName", "Phase").apply(validate_profile)
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def validate_profile(df: pd.DataFrame):
first_valid = True if df["Switch"].iloc[0] == 1 else False
during_valid = (df["Switch"].iloc[1:-1] == 0).any()
last_valid = True if df["Switch"].iloc[-1] == 1 else False
df["Valid"] = first_valid amp; during_valid amp; last_valid
return df
Однако теперь мне нужно переписать это в Scala. Я просто хочу знать, как лучше всего это сделать.
В настоящее время я пытаюсь использовать оконные функции для получения первого и последнего идентификаторов каждой группы:
val minIdWindow = Window.partitionBy("InputFileName", "Phase").orderBy("id")
val maxIdWindow = Window.partitionBy("InputFileName", "Phase").orderBy(col("id").desc)
Затем я могу добавить идентификаторы min и max в виде отдельных столбцов и использовать when
для получения начального и конечного значений Switch
:
df.withColumn("MinId", min("id").over(minIdWindow))
.withColumn("MaxId", max("id").over(maxIdWindow))
.withColumn("Valid", when(
col("id") === col("MinId"), col("Switch")
).when(
col("id") === col("MaxId"), col("Switch")
))
Это дает мне начальное и конечное значения, но я не уверен, как проверить, Switch
равно ли значение 0 между ними. Я на правильном пути, используя оконные функции? Или вы бы порекомендовали альтернативное решение?
Ответ №1:
Попробуйте это,
val wind = Window.partitionBy("InputFileName", "Phase").orderBy("id")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val df1 = df.withColumn("Valid",
when(first("Switch").over(wind) === 1
amp;amp; last("Switch").over(wind) === 1
amp;amp; min("Switch").over(wind) === 0, true)
.otherwise(false))
df1.orderBy("id").show() //Ordering for display purpose
Вывод:
--- ----- ------ ------------- -----
| id|Phase|Switch|InputFileName|Valid|
--- ----- ------ ------------- -----
| 1| 2| 1| fileA| true|
| 2| 2| 1| fileA| true|
| 3| 2| 1| fileA| true|
| 4| 2| 0| fileA| true|
| 5| 2| 0| fileA| true|
| 6| 2| 1| fileA| true|
| 11| 2| 1| fileB| true|
| 12| 2| 1| fileB| true|
| 13| 2| 0| fileB| true|
| 14| 2| 0| fileB| true|
| 15| 2| 1| fileB| true|
| 16| 2| 1| fileB| true|
| 21| 4| 1| fileB|false|
| 22| 4| 1| fileB|false|
| 23| 4| 1| fileB|false|
| 24| 4| 1| fileB|false|
| 25| 4| 1| fileB|false|
| 26| 4| 0| fileB|false|
| 31| 1| 0| fileC|false|
| 32| 1| 0| fileC|false|
--- ----- ------ ------------- -----
Комментарии:
1. Спасибо @Sathiyan S, это сделало это. Это подтвердило мой подход к использованию функций Windows. Неограниченное окно также было ключевой информацией. Прочитав об этом, я теперь вижу, что он учитывает все окно, а не только текущую строку.