Получить максимальное и минимальное значение для каждого ключа в RDD

#python #apache-spark #pyspark #rdd #dstream

#python #apache-spark #pyspark #rdd #dstream

Вопрос:

 spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc , 10)
rdd = ssc.sparkContext.parallelize(pd_binance)
rdd.take(1)
 

Вот небольшая часть результата:

 [['0.02703300', '1.30900000'],
   ['0.02703300', '0.61800000'],
   ['0.02704600', '3.90800000'],
   ['0.02704700', '4.00000000'],
   ['0.02704700', '7.44600000']
 

И я хочу получить максимальное и минимальное значение для каждого ключа, как?

Ответ №1:

Как сказал @mck, вы можете использовать reduceByKey, но это может быть немного сложно понять, если вы никогда не использовали функциональное программирование.

То, что делает метод, — это применение функции к результирующему значению выполнения a groupByKey . Давайте проанализируем это шаг за шагом.

 >>> rdd.groupByKey().take(1)
[('0.02704600', <pyspark.resultiterable.ResultIterable object at 0x7fac15f1fd90>)]
 

Делая это, мы получаем RDD с одной записью на ключ (первый столбец в парном RDD), и значение является итеративным. Мы можем представить это как список.

Мы получили из базового RDD

 [['0.02703300', '1.30900000'],
   ['0.02703300', '0.61800000'],
   ['0.02704600', '3.90800000'],
   ['0.02704700', '4.00000000'],
   ['0.02704700', '7.44600000']]
 

В один сгруппированный

 [('0.02704600', <pyspark.resultiterable.ResultIterable object at 0x7fac15f2fe20>),
 ('0.02704700', <pyspark.resultiterable.ResultIterable object at 0x7fac15f2f910>), 
 ('0.02703300', <pyspark.resultiterable.ResultIterable object at 0x7fac15f2f550>)]
 

Затем мы должны применить нужную функцию к значениям. Мы можем сделать это, передав нужную функцию mapValues методу (в моем случае я передаю непосредственно лямбда-функцию)

 >>> rdd.groupByKey().mapValues(lambda k: (max(k), min(k))).collect()
[('0.02704600', ('3.90800000', '3.90800000')), 
('0.02704700', ('7.44600000', '4.00000000')), 
('0.02703300', ('1.30900000', '0.61800000'))]
 

Есть несколько соображений:

  1. reducebyKey более аккуратный и эффективный. Хотя это может сбивать с толку
  2. Если вам нужны как максимальное, так и минимальное значение, попробуйте сделать это одновременно, как я показал (вы также можете сделать это с помощью reduceByKey ). Таким образом, вместо того, чтобы выполнять два прохода по данным, вы просто делаете это один раз.
  3. Попробуйте использовать DataFrame (SQL) API. Он более современный и пытается оптимизировать вычисления для вас.
  4. reduceByKey функция должна быть немного другой, поскольку она получает два элемента вместо повторяющегося
 >>> rdd.reduceByKey(lambda a, b: (max(a,b), min(a, b))).collect()
[('0.02704600', '3.90800000'), 
('0.02704700', ('7.44600000', '4.00000000')), 
('0.02703300', ('1.30900000', '0.61800000'))]
 

Комментарии:

1. какую версию Spark / Python вы используете? Потому что я только что попробовал это и вернул ошибку: TypeError: ‘>’ не поддерживается между экземплярами ‘float’ и ‘tuple’

Ответ №2:

Вы можете использовать reduceByKey :

 minimum = rdd.reduceByKey(min)
maximum = rdd.reduceByKey(max)