#apache-spark #pyspark #apache-spark-sql
#apache-spark #pyspark #apache-spark-sql
Вопрос:
У меня есть фрейм данных Spark на Python, и он находится в определенном порядке, в котором строки могут быть разделены на правильные группы в соответствии со столбцом «start_of_section», который имеет значения 1 или 0. Для каждой коллекции строк, которые необходимо сгруппировать вместе, все столбцы, кроме «value» и «start_of_section», равны. Я хочу сгруппировать каждую такую коллекцию в одну строку, которая имеет одинаковые значения для каждого другого столбца, и столбец «list_values», который содержит массив всех значений в каждой строке.
Таким образом, некоторые строки могут выглядеть следующим образом:
Row(category=fruit, object=apple, value=60, start_of_section=1)
Row(category=fruit, object=apple, value=160, start_of_section=0)
Row(category=fruit, object=apple, value=30, start_of_section=0)
и в новом фрейме данных это было бы
Row(category=fruit, object=apple, list_values=[60, 160, 30])
(Редактировать: обратите внимание, что столбец «start_of_section» не должен был быть включен в окончательный фрейм данных.)
Проблема, с которой я столкнулся при попытке найти ответ, заключается в том, что я нашел только способы группировки по значению столбца без учета порядка, так что это привело бы к ошибочному получению двух строк, одна из которых группирует все строки с помощью «start_of_section» = 1, а другая группирует все строки с помощью «start_of_section» = 0..
Какой код может этого достичь?
Комментарии:
1. как вы определяете порядок? вам нужен столбец «порядок»
2. Достаточно ли для этого использовать порядок, который я вижу при использовании «показать»? В противном случае могу ли я создать новый столбец с увеличивающимся счетчиком на основе его текущей позиции во фрейме данных?
3. Укажите столбец вашего порядка. Нам это нужно, чтобы определить, когда начинается раздел.
4. И Lamanus, и я даем один и тот же ответ. просто убедитесь, что у вас есть столбец «порядок», и он должен работать. В противном случае вы обречены.
Ответ №1:
Предполагая, что ваш столбец order равен order_col
df.show()
-------- ------ --------- ---------------- -----
|category|object|order_col|start_of_section|value|
-------- ------ --------- ---------------- -----
| fruit| apple| 1| 1| 60|
| fruit| apple| 2| 0| 160|
| fruit| apple| 3| 0| 30|
| fruit| apple| 4| 1| 50|
-------- ------ --------- ---------------- -----
вам нужно сгенерировать идентификатор, чтобы сгруппировать строки в одном разделе вместе, затем сгруппировать по этому идентификатору и нужному размеру. Вот как вы это делаете.
from pyspark.sql import functions as F, Window as W
df.withColumn(
"id",
F.sum("start_of_section").over(
W.partitionBy("category", "object").orderBy("order_col")
),
).groupBy("category", "object", "id").agg(F.collect_list("value").alias("values")).drop(
"id"
).show()
-------- ------ -------------
|category|object| values|
-------- ------ -------------
| fruit| apple|[60, 160, 30]|
| fruit| apple| [50]|
-------- ------ -------------
РЕДАКТИРОВАТЬ: Если у вас их нет order_col
, это невыполнимая задача. Смотрите на свои строки во фрейме данных как на мрамор в пакете. У них нет никакого порядка. Вы можете упорядочивать их по мере извлечения из пакета в соответствии с некоторыми критериями, но в противном случае вы не можете предполагать какой-либо порядок. show
вы просто вытаскиваете 10 шариков (строк) из сумки. Порядок может быть одинаковым каждый раз, когда вы это делаете, но внезапно меняться, и вы не можете его контролировать
Комментарии:
1. Это выдает ошибку «Входная строка не содержит ожидаемого количества значений, требуемых схемой. требуется ввести 6 полей, при этом должно быть предоставлено 5 значений.»
2. @A.Morris откуда у вас эта ошибка? Ничто в моем коде не должно вызывать это исключение.
Ответ №2:
Ну, теперь я понял. Вы можете выполнить группировку с помощью столбца, суммирующего start_of_section
.
Чтобы убедиться в результате, вы должны включить столбец упорядочения.
from pyspark.sql.types import Row
from pyspark.sql.functions import *
from pyspark.sql import Window
data = [Row(category='fruit', object='apple', value=60, start_of_section=1),
Row(category='fruit', object='apple', value=160, start_of_section=0),
Row(category='fruit', object='apple', value=30, start_of_section=0),
Row(category='fruit', object='apple', value=50, start_of_section=1),
Row(category='fruit', object='apple', value=30, start_of_section=0),
Row(category='fruit', object='apple', value=60, start_of_section=1),
Row(category='fruit', object='apple', value=110, start_of_section=0)]
df = spark.createDataFrame(data)
w = Window.partitionBy('category', 'object').rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn('group', sum('start_of_section').over(w))
.groupBy('category', 'object', 'group').agg(collect_list('value').alias('list_value'))
.drop('group').show()
-------- ------ -------------
|category|object| list_value|
-------- ------ -------------
| fruit| apple|[60, 160, 30]|
| fruit| apple| [50, 30]|
| fruit| apple| [60, 110]|
-------- ------ -------------
СБОЙ: monotonically_increasing_id
сбой при наличии большого количества разделов.
df.repartition(7)
.withColumn('id', monotonically_increasing_id())
.withColumn('group', sum('start_of_section').over(w))
.groupBy('category', 'object', 'group').agg(collect_list('value').alias('list_value'))
.drop('group').show()
-------- ------ --------------------
|category|object| list_value|
-------- ------ --------------------
| fruit| apple| [60]|
| fruit| apple|[60, 160, 30, 30,...|
| fruit| apple| [50]|
-------- ------ --------------------
Это совершенно нежелательно.
Комментарии:
1. Нет, вы ничего не упускаете — это идеально. Но обратите внимание, что я допустил ошибку — я не имел в виду, что start_of_section должен быть включен в конечный фрейм данных (значение list_value должно быть [60, 160, 30]), поэтому мы не должны включать это в groupBy .
2. для чего
start_of_section
используется тогда?3. @Lamanus кажется, мне снова не удалось сообщить о своем намерении. Дело в том, что если мы добавим еще одну строку, Row(category=’fruit’, object=’apple’, value = 50, start_of_section = 1), то это должно быть объединено во вторую строку в новом фрейме данных. Столбец start_of_section должен указывать, должна ли быть запущена новая строка. Я полагал, что это то, что сделал ваш aggregate, но после дальнейшего тестирования, я думаю, он просто объединяет строки по тому, имеют ли они одинаковое значение в start_of_section — это то, что я указал в конце вопроса, которого я хотел избежать.
4. Я понимаю. Вам нужен еще один шаг.
5. настолько неправильное использование
monotonically_increasing_id
> Функция недетерминирована, потому что ее результат зависит от идентификаторов разделов. Это вообще не гарантирует никакого порядка.