#python #apache-spark #pyspark #rdd
#python #apache-spark #pyspark #rdd
Вопрос:
Благодарен за некоторую помощь здесь. Использование Pyspark (пожалуйста, не используйте SQL). Итак, у меня есть список кортежей, хранящихся в виде пар RDD:
[((‘City1’, ‘2020-03-27’, ‘X1’), 44),
((‘City1’, ‘2020-03-28’, ‘X1’), 44),
((‘City3’, ‘2020-03-28’, ‘X3’), 15),
((‘City4’, ‘2020-03-27’, ‘X4’), 5),
((‘City4’, ‘2020-03-26’, ‘X4’), 4),
((‘City2’, ‘2020-03-26’, ‘X2’), 14),
((‘City2’, ‘2020-03-25’, ‘X2’), 4),
((‘City4’, ‘2020-03-25’, ‘X4’), 1),
((‘City1’, ‘2020-03-29’, ‘X1’), 1),
((‘City5’, ‘2020-03-25’, ‘X5’), 15)]
Например, (‘City5’, ‘2020-03-25’, ‘X5’) в качестве ключа и 15 в качестве значения последней пары.
Я хотел бы получить следующий результат:
City1, X1, 2020-03-27, 44
City1, X1, 2020-03-28, 44
City5, X3, 2020-03-25, 15
City3, X3, 2020-03-28, 15
City2, X2, 2020-03-26, 14
City4, X4, 2020-03-27, 5
Пожалуйста, обратите внимание, что результат отображается:
- Я предполагаю, что ключ (ы) с максимальным значением для каждого города (это самая сложная часть, для отображения одного и того же города дважды, если они имеют одинаковые максимальные значения (значения) в разные даты, не может использовать reduceByKey(), поскольку ключ не уникален, может быть, GroupBy() или Filter() ?
- В следующей последовательности порядка / сортировки:
- По убыванию наибольшего значения
- Дата по возрастанию
- Название города по убыванию (например: City1)
Итак, я попробовал следующий код:
res = rdd2.map(lambda x: ((x[0][0],x[0][2]), (x[0][1], x[1])))
rdd3 = res.reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[1]))
rdd4 = rdd3.sortBy(lambda a: a[1][1], ascending=False)
rdd5 = rdd4.sortBy(lambda a: a[1][0])
Хотя он дает мне города с максимальным значением, он не возвращает один и тот же город дважды (потому что уменьшается на Key: City), если 2 города имеют одинаковое максимальное значение в 2 разные даты.
Я надеюсь, что это достаточно ясно, любая точность, пожалуйста, спросите! Большое спасибо!
Ответ №1:
Чтобы сохранить все города со значением, равным максимальному значению, вы все равно можете использовать reduceByKey
, но над массивами вместо значений:
- вы преобразуете свои строки в ключ / значение, причем значение представляет собой массив кортежей, а не кортеж
- вы уменьшаете по ключу, объединяя массивы, если они содержат одно и то же значение, в противном случае сохраняя массив с максимальным значением, с
reduceByKey
- вы сглаживаете свои массивы значений, объединяя с ними ключ, с
flatMap
- наконец, вы выполняете свою сортировку
Полный код будет выглядеть следующим образом:
def merge(array1, array2):
if array1[0][2] > array2[0][2]:
return array1
elif array1[0][2] == array2[0][2]:
return array1 array2
else:
return array2
res = rdd2.map(lambda x: (x[0][0], [(x[0][1], x[0][2], x[1])]))
rdd3 = res.reduceByKey(lambda x1, x2: merge(x1, x2))
rdd4 = rdd3.flatMap(lambda x: map(lambda y: (x[0], y[1], y[0], y[2]), x[1]))
rdd5 = rdd4.sortBy(lambda a: (-a[3], a[2], a[0]))
А затем вы можете распечатать свой RDD:
[print(', '.join([row[0], row[1], row[2], str(row[3])])) for row in rdd5.collect()]
Это, с вашим вводом, дает вам следующий результат:
City1, X1, 2020-03-27, 44
City1, X1, 2020-03-28, 44
City5, X5, 2020-03-25, 15
City3, X3, 2020-03-28, 15
City2, X2, 2020-03-26, 14
City4, X4, 2020-03-27, 5
Комментарии:
1. Это здорово @Vincent Doba! 2 последние вещи: результаты отображаются как «City4, 2020-03-27, x4, 5» вместо «City4, X4, 2020-03-27, 5». Порядок в порядке вплоть до reduceByKey. Играл с порядком плоской карты (x [0] -> x [1] и т. Д.), Но результат не меняется, Поэтому я подозреваю, что функция слияния — это то, где порядок неправильный?
2. Кроме того, на выходе выводятся скобки (кортеж) в виде: (City4, X4, 2020-03-27, 5), как убрать скобки? Я пытался распараллелить, но не работает.
3. @JohnDoe34 Я переупорядочил строки в результате. Вы были правы, вам нужно играть с порядком плоской карты. Для проблемы с кортежами мне нужны некоторые уточнения: что вы ожидаете в качестве вывода? Строка со всеми объединенными полями? Потому что в rdd может быть только один тип: строка, значение, кортеж, объект или массив.
Ответ №2:
Можете ли вы работать / выводить с фреймами данных?
List = [(('City1', '2020-03-27', 'X1'), 44),
(('City1', '2020-03-28', 'X1'), 44),
(('City3', '2020-03-28', 'X3'), 15),
(('City4', '2020-03-27', 'X4'), 5),
(('City4', '2020-03-26', 'X4'), 4),
(('City2', '2020-03-26', 'X2'), 14),
(('City2', '2020-03-25', 'X2'), 4),
(('City4', '2020-03-25', 'X4'), 1),
(('City1', '2020-03-29', 'X1'), 1),
(('City5', '2020-03-25', 'X5'), 15)]
rdd = sc.parallelize(List)
import pyspark.sql.functions as F
df = rdd
.toDF()
.select('_1.*', F.col('_2').alias('value'))
.orderBy(F.desc('value'), F.asc('_2'), F.desc('_1'))
df.show(truncate=False)
----- ---------- --- -----
|_1 |_2 |_3 |value|
----- ---------- --- -----
|City1|2020-03-27|X1 |44 |
|City1|2020-03-28|X1 |44 |
|City5|2020-03-25|X5 |15 |
|City3|2020-03-28|X3 |15 |
|City2|2020-03-26|X2 |14 |
|City4|2020-03-27|X4 |5 |
|City2|2020-03-25|X2 |4 |
|City4|2020-03-26|X4 |4 |
|City4|2020-03-25|X4 |1 |
|City1|2020-03-29|X1 |1 |
----- ---------- --- -----
Комментарии:
1. Большое спасибо, Луис, но мне действительно нужно найти решение, не использующее dataframes sql, вместо этого используя python с преобразованиями и действиями RDD.
Ответ №3:
Вы можете преобразовать свой rdd в фрейм данных, а затем использовать окно Spark, чтобы получить максимальное значение для каждого города, отфильтровать строки с использованием этого значения и, наконец, упорядочить свой фрейм данных по своему усмотрению:
from pyspark.sql import functions as F
from pyspark.sql import Window
window = Window.partitionBy('City').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df = rdd.toDF().select(
F.col('_1._1').alias('city'),
F.col('_1._2').alias('date'),
F.col('_1._3').alias('key'),
F.col('_2').alias('value'),
).withColumn('max_value', F.max('value').over(window))
.filter(F.col('value') == F.col('max_value'))
.drop('max_value')
.orderBy(F.desc('value'), F.asc('date'), F.asc('city'))
И вы получаете следующий фрейм данных с вашим входным rdd:
----- ---------- --- -----
|city |date |key|value|
----- ---------- --- -----
|City1|2020-03-27|X1 |44 |
|City1|2020-03-28|X1 |44 |
|City5|2020-03-25|X5 |15 |
|City3|2020-03-28|X3 |15 |
|City2|2020-03-26|X2 |14 |
|City4|2020-03-27|X4 |5 |
----- ---------- --- -----
Если вам нужен RDD в конце процесса, вы можете получить его с помощью .rdd
метода:
df.rdd