Pyspark: группировка непрерывных строк по логическому столбцу

#apache-spark #pyspark #apache-spark-sql

#apache-spark #pyspark #apache-spark-sql

Вопрос:

У меня есть фрейм данных Spark на Python, и он находится в определенном порядке, в котором строки могут быть разделены на правильные группы в соответствии со столбцом «start_of_section», который имеет значения 1 или 0. Для каждой коллекции строк, которые необходимо сгруппировать вместе, все столбцы, кроме «value» и «start_of_section», равны. Я хочу сгруппировать каждую такую коллекцию в одну строку, которая имеет одинаковые значения для каждого другого столбца, и столбец «list_values», который содержит массив всех значений в каждой строке.

Таким образом, некоторые строки могут выглядеть следующим образом:

 Row(category=fruit, object=apple, value=60, start_of_section=1)
Row(category=fruit, object=apple, value=160, start_of_section=0)
Row(category=fruit, object=apple, value=30, start_of_section=0)
  

и в новом фрейме данных это было бы

 Row(category=fruit, object=apple, list_values=[60, 160, 30])
  

(Редактировать: обратите внимание, что столбец «start_of_section» не должен был быть включен в окончательный фрейм данных.)

Проблема, с которой я столкнулся при попытке найти ответ, заключается в том, что я нашел только способы группировки по значению столбца без учета порядка, так что это привело бы к ошибочному получению двух строк, одна из которых группирует все строки с помощью «start_of_section» = 1, а другая группирует все строки с помощью «start_of_section» = 0..

Какой код может этого достичь?

Комментарии:

1. как вы определяете порядок? вам нужен столбец «порядок»

2. Достаточно ли для этого использовать порядок, который я вижу при использовании «показать»? В противном случае могу ли я создать новый столбец с увеличивающимся счетчиком на основе его текущей позиции во фрейме данных?

3. Укажите столбец вашего порядка. Нам это нужно, чтобы определить, когда начинается раздел.

4. И Lamanus, и я даем один и тот же ответ. просто убедитесь, что у вас есть столбец «порядок», и он должен работать. В противном случае вы обречены.

Ответ №1:

Предполагая, что ваш столбец order равен order_col

 df.show()
 -------- ------ --------- ---------------- ----- 
|category|object|order_col|start_of_section|value|
 -------- ------ --------- ---------------- ----- 
|   fruit| apple|        1|               1|   60|
|   fruit| apple|        2|               0|  160|
|   fruit| apple|        3|               0|   30|
|   fruit| apple|        4|               1|   50|
 -------- ------ --------- ---------------- ----- 
  

вам нужно сгенерировать идентификатор, чтобы сгруппировать строки в одном разделе вместе, затем сгруппировать по этому идентификатору и нужному размеру. Вот как вы это делаете.

 from pyspark.sql import functions as F, Window as W

df.withColumn(
    "id",
    F.sum("start_of_section").over(
        W.partitionBy("category", "object").orderBy("order_col")
    ),
).groupBy("category", "object", "id").agg(F.collect_list("value").alias("values")).drop(
    "id"
).show()

 -------- ------ ------------- 
|category|object|       values|
 -------- ------ ------------- 
|   fruit| apple|[60, 160, 30]|
|   fruit| apple|         [50]|
 -------- ------ ------------- 
  

РЕДАКТИРОВАТЬ: Если у вас их нет order_col , это невыполнимая задача. Смотрите на свои строки во фрейме данных как на мрамор в пакете. У них нет никакого порядка. Вы можете упорядочивать их по мере извлечения из пакета в соответствии с некоторыми критериями, но в противном случае вы не можете предполагать какой-либо порядок. show вы просто вытаскиваете 10 шариков (строк) из сумки. Порядок может быть одинаковым каждый раз, когда вы это делаете, но внезапно меняться, и вы не можете его контролировать

Комментарии:

1. Это выдает ошибку «Входная строка не содержит ожидаемого количества значений, требуемых схемой. требуется ввести 6 полей, при этом должно быть предоставлено 5 значений.»

2. @A.Morris откуда у вас эта ошибка? Ничто в моем коде не должно вызывать это исключение.

Ответ №2:

Ну, теперь я понял. Вы можете выполнить группировку с помощью столбца, суммирующего start_of_section .

Чтобы убедиться в результате, вы должны включить столбец упорядочения.

 from pyspark.sql.types import Row
from pyspark.sql.functions import *
from pyspark.sql import Window

data = [Row(category='fruit', object='apple', value=60, start_of_section=1),
    Row(category='fruit', object='apple', value=160, start_of_section=0),
    Row(category='fruit', object='apple', value=30, start_of_section=0),
    Row(category='fruit', object='apple', value=50, start_of_section=1),
    Row(category='fruit', object='apple', value=30, start_of_section=0),
    Row(category='fruit', object='apple', value=60, start_of_section=1),
    Row(category='fruit', object='apple', value=110, start_of_section=0)]

df = spark.createDataFrame(data)

w = Window.partitionBy('category', 'object').rowsBetween(Window.unboundedPreceding, Window.currentRow)

df.withColumn('group', sum('start_of_section').over(w)) 
  .groupBy('category', 'object', 'group').agg(collect_list('value').alias('list_value')) 
  .drop('group').show()

 -------- ------ ------------- 
|category|object|   list_value|
 -------- ------ ------------- 
|   fruit| apple|[60, 160, 30]|
|   fruit| apple|     [50, 30]|
|   fruit| apple|    [60, 110]|
 -------- ------ ------------- 
  

СБОЙ: monotonically_increasing_id сбой при наличии большого количества разделов.

 df.repartition(7) 
  .withColumn('id', monotonically_increasing_id()) 
  .withColumn('group', sum('start_of_section').over(w)) 
  .groupBy('category', 'object', 'group').agg(collect_list('value').alias('list_value')) 
  .drop('group').show()

 -------- ------ -------------------- 
|category|object|          list_value|
 -------- ------ -------------------- 
|   fruit| apple|                [60]|
|   fruit| apple|[60, 160, 30, 30,...|
|   fruit| apple|                [50]|
 -------- ------ -------------------- 
  

Это совершенно нежелательно.

Комментарии:

1. Нет, вы ничего не упускаете — это идеально. Но обратите внимание, что я допустил ошибку — я не имел в виду, что start_of_section должен быть включен в конечный фрейм данных (значение list_value должно быть [60, 160, 30]), поэтому мы не должны включать это в groupBy .

2. для чего start_of_section используется тогда?

3. @Lamanus кажется, мне снова не удалось сообщить о своем намерении. Дело в том, что если мы добавим еще одну строку, Row(category=’fruit’, object=’apple’, value = 50, start_of_section = 1), то это должно быть объединено во вторую строку в новом фрейме данных. Столбец start_of_section должен указывать, должна ли быть запущена новая строка. Я полагал, что это то, что сделал ваш aggregate, но после дальнейшего тестирования, я думаю, он просто объединяет строки по тому, имеют ли они одинаковое значение в start_of_section — это то, что я указал в конце вопроса, которого я хотел избежать.

4. Я понимаю. Вам нужен еще один шаг.

5. настолько неправильное использование monotonically_increasing_id > Функция недетерминирована, потому что ее результат зависит от идентификаторов разделов. Это вообще не гарантирует никакого порядка.