Функция окна Spark и получение первого и последнего значений для каждого столбца в разделе (агрегирование по окну)

#apache-spark #pyspark #apache-spark-sql

#apache-spark #pyspark #apache-spark-sql

Вопрос:

Представьте, что у меня есть огромный набор данных, который я partitionBy('id') . Предположим, что идентификатор уникален для человека, поэтому на идентификатор может быть n строк, и цель состоит в том, чтобы уменьшить его до одного. По сути, агрегирование для разделения идентификаторов.

 w = Window().partitionBy(id).rowsBetween(-sys.maxsize, sys.maxsize)

test1 = {
    key: F.first(key, True).over(w).alias(key)
    for key in some_dict.keys()
    if (some_dict[key] == 'test1')
}
test2 = {
    key: F.last(key, True).over(w).alias(k)
    for k in some_dict.keys()
    if (some_dict[k] == 'test2')
}
  

Предположим, что у меня есть some_dict значения либо как test1, либо как test2, и на основе значения я либо беру первое, либо последнее, как показано выше.

Как мне на самом деле вызвать aggregate и уменьшить это?

  cols = {**test1, **test2}
 cols = list(cols.value())
 df.select(*cols).groupBy('id').agg(*cols) # Doesnt work
  

Вышеуказанное явно не работает. Есть идеи?
Цель здесь: у меня есть 5 уникальных идентификаторов и 25 строк, каждый идентификатор имеет 5 строк. Я хочу уменьшить его до 5 строк с 25.

Комментарии:

1. каковы ваши примерные данные? и каков порядок по столбцам?

2. Порядок по дате, но здесь дело не в этом. В основном это касается того, как его агрегировать.

3. Функция окна не предназначена для агрегирования, вы должны использовать group by для своих целей. Почему вы их смешиваете?

4. Я согласен, используя функцию окна для разделения. Представьте, что данные очень большие. Приведенный выше пример понятен, верно?

Ответ №1:

Предположим, что у вас имя фрейма данных df, которое содержит дубликат, используйте метод ниже

 from pyspark.sql.functions import row_number 
from pyspark.sql.window import Window 
window = Window.partitionBy(df['id']).orderBy(df['id'])

final = df.withColumn("row_id", row_number.over(window)).filter("row_id = 1")
final.show(10,False) 

  

измените порядок по условию, если есть определенные критерии, чтобы конкретная запись была поверх раздела

Комментарии:

1. опечатка: row_number().over …