Как получить уникальные значения в каждом окне в pyspark dataframe

#python #group-by #pyspark #apache-spark-sql #window

#python #группировать по #pyspark #apache-spark-sql #окно

Вопрос:

У меня есть следующий spark dataframe:

 from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('').getOrCreate()
df = spark.createDataFrame([(1, "a", "2"), (2, "b", "2"),(3, "c", "2"), (4, "d", "2"),
                (5, "b", "3"), (6, "b", "3"),(7, "c", "2")], ["nr", "column2", "quant"])
  

который возвращает мне:

  --- ------- ------ 
| nr|column2|quant |
 --- ------- ------ 
|  1|      a|     2|
|  2|      b|     2|
|  3|      c|     2|
|  4|      d|     2|
|  5|      b|     3|
|  6|      b|     3|
|  7|      c|     2|
 --- ------- ------ 
  

Я хотел бы получить строки, в которых для каждых 3 сгруппированных строк (из каждого окна, где размер окна равен 3) столбец quant имеет уникальные значения. как на следующем рисунке:

введите описание изображения здесь

Здесь красным обозначен размер окна, и в каждом окне я сохраняю только зеленые строки, где quant уникален:

Вывод, который я хотел бы получить, выглядит следующим образом:

  --- ------- ------ 
| nr|column2|values|
 --- ------- ------ 
|  1|      a|     2|
|  4|      d|     2|
|  5|      b|     3|
|  7|      c|     2|
 --- ------- ------ 
  

Я новичок в spark, поэтому был бы признателен за любую помощь. Спасибо

Комментарии:

1. как вы создаете свою красную группу? каково ваше условие группировки?

2. только первые 3 строки — это одна группа, а следующие 3 строки — следующая группа

3. первый заказанный чем?

4. сверху. набор данных уже отсортирован. только первые 3 верхние строки — это одна группа, затем следующие 3 строки — следующая группа. это должно сработать

5. в распределенной файловой системе или в базе данных еще ничего не отсортировано. это как мешок с шариками, вы можете сортировать их вне мешка, но не внутри.

Ответ №1:

Этот подход должен сработать для вас, предполагая, что группировка 3 записей основана на столбце ‘nr’.

Использование udf , которое решает, следует ли выбирать запись или нет, и lag используется для получения данных предыдущих строк.

 def tag_selected(index, current_quant, prev_quant1, prev_quant2):                                                                                                    
    if index % 3 == 1:  # first record in each group is always selected                                                                                              
        return True                                                                                                                                                  
    if index % 3 == 2 and current_quant != prev_quant1: # second record will be selected if prev quant is not same as current                                        
        return True                                                                                                                                                  
    if index % 3 == 0 and current_quant != prev_quant1 and current_quant != prev_quant2: # third record will be selected if prev quant are not same as current       
        return True                                                                                                                                                  
    return False                                                                                                                                                     

tag_selected_udf = udf(tag_selected, BooleanType())                                                                                                                  

  
 df = spark.createDataFrame([(1, "a", "2"), (2, "b", "2"),(3, "c", "2"), (4, "d", "2"),
                (5, "b", "3"), (6, "b", "3"),(7, "c", "2")], ["nr", "column2", "quant"])

window = Window.orderBy("nr")

df = df.withColumn("prev_quant1", lag(col("quant"),1, None).over(window))
       .withColumn("prev_quant2", lag(col("quant"),2, None).over(window)) 
       .withColumn("selected", 
                   tag_selected_udf(col('nr'),col('quant'),col('prev_quant1'),col('prev_quant2')))
       .filter(col('selected') == True).drop("prev_quant1","prev_quant2","selected")
df.show()
  

какие результаты

  --- ------- ----- 
| nr|column2|quant|
 --- ------- ----- 
|  1|      a|    2|
|  4|      d|    2|
|  5|      b|    3|
|  7|      c|    2|
 --- ------- ----- 
  

Комментарии:

1. @Sascha, если у вас это работает, не могли бы вы, пожалуйста, принять это как ответ, спасибо.

2. Прежде всего, спасибо за помощь, у меня есть вопрос относительно этого кода, поэтому df.withColumn("prev_quant1", lag(col("quant"),1, None).over(window)).... каждый раз, когда мы создаем новый столбец? это означает, что в этом случае потребуется слишком много памяти, если объем данных будет больше? и второй вопрос касается def tag_selected . Здесь это означает, что это работает как переходящее окно, но не принимает группу из 3 строк (1,2,3) и следующие 3 строки (4,5,6) в качестве другой группы? потому что, когда я запускаю это, я получаю результат с 1,4,5,6,7. и возможно ли определить размер окна как 3?

3. Преобразования в rdd / df / datasets создадут новые rdd как неизменяемые rdd. Таким образом, добавление нового столбца создает новые экземпляры rdd, но управление памятью spark решает, нужно ли сохранить старые rdd или очистить.tag_selected, принимает текущие, предыдущие значения, как переходящее окно, но логика внутри игнорирует другие элементы окна. Например, для 4-й строки вызывает udf с данными 2-й, 3-й строк, но udf игнорирует, поскольку на основе индекса знает, что это первая строка группы, и ее всегда нужно выбирать. Код не должен выбирать 6 в качестве условия в udf, если индекс % 3 == 0…… сбой и возвращает False.

4. Спасибо за пояснения. я хотел бы прояснить еще одну вещь. Что, если spark dataframe уже отсортирован по nr, но nr не начинается с 1. например, порядок nr может быть следующим: «15.3, 22.8, 37.1, 39, 56 и т.д.». в этом случае лучше добавить новый столбец индекса и следовать этой логике? или может быть другой вариант, касающийся nr?

5. Да, если nr-значения не являются номерами индексов, тогда вам нужно добавить новый col в качестве индекса с номерами индексов.