Pyspark отображает максимальное значение (значения) и множественную сортировку

#python #apache-spark #pyspark #rdd

#python #apache-spark #pyspark #rdd

Вопрос:

Благодарен за некоторую помощь здесь. Использование Pyspark (пожалуйста, не используйте SQL). Итак, у меня есть список кортежей, хранящихся в виде пар RDD:

[((‘City1’, ‘2020-03-27’, ‘X1’), 44),

((‘City1’, ‘2020-03-28’, ‘X1’), 44),

((‘City3’, ‘2020-03-28’, ‘X3’), 15),

((‘City4’, ‘2020-03-27’, ‘X4’), 5),

((‘City4’, ‘2020-03-26’, ‘X4’), 4),

((‘City2’, ‘2020-03-26’, ‘X2’), 14),

((‘City2’, ‘2020-03-25’, ‘X2’), 4),

((‘City4’, ‘2020-03-25’, ‘X4’), 1),

((‘City1’, ‘2020-03-29’, ‘X1’), 1),

((‘City5’, ‘2020-03-25’, ‘X5’), 15)]

Например, (‘City5’, ‘2020-03-25’, ‘X5’) в качестве ключа и 15 в качестве значения последней пары.

Я хотел бы получить следующий результат:

City1, X1, 2020-03-27, 44

City1, X1, 2020-03-28, 44

City5, X3, 2020-03-25, 15

City3, X3, 2020-03-28, 15

City2, X2, 2020-03-26, 14

City4, X4, 2020-03-27, 5

Пожалуйста, обратите внимание, что результат отображается:

  • Я предполагаю, что ключ (ы) с максимальным значением для каждого города (это самая сложная часть, для отображения одного и того же города дважды, если они имеют одинаковые максимальные значения (значения) в разные даты, не может использовать reduceByKey(), поскольку ключ не уникален, может быть, GroupBy() или Filter() ?
  • В следующей последовательности порядка / сортировки:
  1. По убыванию наибольшего значения
  2. Дата по возрастанию
  3. Название города по убыванию (например: City1)

Итак, я попробовал следующий код:

 res = rdd2.map(lambda x: ((x[0][0],x[0][2]), (x[0][1], x[1])))
rdd3 = res.reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[1]))
rdd4 = rdd3.sortBy(lambda a: a[1][1], ascending=False)
rdd5 = rdd4.sortBy(lambda a: a[1][0])
 

Хотя он дает мне города с максимальным значением, он не возвращает один и тот же город дважды (потому что уменьшается на Key: City), если 2 города имеют одинаковое максимальное значение в 2 разные даты.

Я надеюсь, что это достаточно ясно, любая точность, пожалуйста, спросите! Большое спасибо!

Ответ №1:

Чтобы сохранить все города со значением, равным максимальному значению, вы все равно можете использовать reduceByKey , но над массивами вместо значений:

  • вы преобразуете свои строки в ключ / значение, причем значение представляет собой массив кортежей, а не кортеж
  • вы уменьшаете по ключу, объединяя массивы, если они содержат одно и то же значение, в противном случае сохраняя массив с максимальным значением, с reduceByKey
  • вы сглаживаете свои массивы значений, объединяя с ними ключ, с flatMap
  • наконец, вы выполняете свою сортировку

Полный код будет выглядеть следующим образом:

 def merge(array1, array2):
    if array1[0][2] > array2[0][2]:
        return array1
    elif array1[0][2] == array2[0][2]:
        return array1   array2
    else:
        return array2


res = rdd2.map(lambda x: (x[0][0], [(x[0][1], x[0][2], x[1])]))
rdd3 = res.reduceByKey(lambda x1, x2: merge(x1, x2))
rdd4 = rdd3.flatMap(lambda x: map(lambda y: (x[0], y[1], y[0], y[2]), x[1]))
rdd5 = rdd4.sortBy(lambda a: (-a[3], a[2], a[0]))
 

А затем вы можете распечатать свой RDD:

 [print(', '.join([row[0], row[1], row[2], str(row[3])])) for row in rdd5.collect()]
 

Это, с вашим вводом, дает вам следующий результат:

 City1, X1, 2020-03-27, 44
City1, X1, 2020-03-28, 44
City5, X5, 2020-03-25, 15
City3, X3, 2020-03-28, 15
City2, X2, 2020-03-26, 14
City4, X4, 2020-03-27, 5
 

Комментарии:

1. Это здорово @Vincent Doba! 2 последние вещи: результаты отображаются как «City4, 2020-03-27, x4, 5» вместо «City4, X4, 2020-03-27, 5». Порядок в порядке вплоть до reduceByKey. Играл с порядком плоской карты (x [0] -> x [1] и т. Д.), Но результат не меняется, Поэтому я подозреваю, что функция слияния — это то, где порядок неправильный?

2. Кроме того, на выходе выводятся скобки (кортеж) в виде: (City4, X4, 2020-03-27, 5), как убрать скобки? Я пытался распараллелить, но не работает.

3. @JohnDoe34 Я переупорядочил строки в результате. Вы были правы, вам нужно играть с порядком плоской карты. Для проблемы с кортежами мне нужны некоторые уточнения: что вы ожидаете в качестве вывода? Строка со всеми объединенными полями? Потому что в rdd может быть только один тип: строка, значение, кортеж, объект или массив.

Ответ №2:

Можете ли вы работать / выводить с фреймами данных?

 List = [(('City1', '2020-03-27', 'X1'), 44),
        (('City1', '2020-03-28', 'X1'), 44),
        (('City3', '2020-03-28', 'X3'), 15),
        (('City4', '2020-03-27', 'X4'), 5),
        (('City4', '2020-03-26', 'X4'), 4),
        (('City2', '2020-03-26', 'X2'), 14),
        (('City2', '2020-03-25', 'X2'), 4),
        (('City4', '2020-03-25', 'X4'), 1),
        (('City1', '2020-03-29', 'X1'), 1),
        (('City5', '2020-03-25', 'X5'), 15)]

rdd = sc.parallelize(List)

import pyspark.sql.functions as F

df = rdd
        .toDF()
        .select('_1.*', F.col('_2').alias('value'))
        .orderBy(F.desc('value'), F.asc('_2'), F.desc('_1'))

df.show(truncate=False)

 ----- ---------- --- ----- 
|_1   |_2        |_3 |value|
 ----- ---------- --- ----- 
|City1|2020-03-27|X1 |44   |
|City1|2020-03-28|X1 |44   |
|City5|2020-03-25|X5 |15   |
|City3|2020-03-28|X3 |15   |
|City2|2020-03-26|X2 |14   |
|City4|2020-03-27|X4 |5    |
|City2|2020-03-25|X2 |4    |
|City4|2020-03-26|X4 |4    |
|City4|2020-03-25|X4 |1    |
|City1|2020-03-29|X1 |1    |
 ----- ---------- --- ----- 

 

Комментарии:

1. Большое спасибо, Луис, но мне действительно нужно найти решение, не использующее dataframes sql, вместо этого используя python с преобразованиями и действиями RDD.

Ответ №3:

Вы можете преобразовать свой rdd в фрейм данных, а затем использовать окно Spark, чтобы получить максимальное значение для каждого города, отфильтровать строки с использованием этого значения и, наконец, упорядочить свой фрейм данных по своему усмотрению:

 from pyspark.sql import functions as F
from pyspark.sql import Window

window = Window.partitionBy('City').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df = rdd.toDF().select(
 F.col('_1._1').alias('city'),
 F.col('_1._2').alias('date'),
 F.col('_1._3').alias('key'),
 F.col('_2').alias('value'),
).withColumn('max_value', F.max('value').over(window))
 .filter(F.col('value') == F.col('max_value'))
 .drop('max_value')
 .orderBy(F.desc('value'), F.asc('date'), F.asc('city'))
 

И вы получаете следующий фрейм данных с вашим входным rdd:

  ----- ---------- --- ----- 
|city |date      |key|value|
 ----- ---------- --- ----- 
|City1|2020-03-27|X1 |44   |
|City1|2020-03-28|X1 |44   |
|City5|2020-03-25|X5 |15   |
|City3|2020-03-28|X3 |15   |
|City2|2020-03-26|X2 |14   |
|City4|2020-03-27|X4 |5    |
 ----- ---------- --- ----- 
 

Если вам нужен RDD в конце процесса, вы можете получить его с помощью .rdd метода:

 df.rdd