#python #apache-spark #pyspark #rdd #dstream
#python #apache-spark #pyspark #rdd #dstream
Вопрос:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc , 10)
rdd = ssc.sparkContext.parallelize(pd_binance)
rdd.take(1)
Вот небольшая часть результата:
[['0.02703300', '1.30900000'],
['0.02703300', '0.61800000'],
['0.02704600', '3.90800000'],
['0.02704700', '4.00000000'],
['0.02704700', '7.44600000']
И я хочу получить максимальное и минимальное значение для каждого ключа, как?
Ответ №1:
Как сказал @mck, вы можете использовать reduceByKey, но это может быть немного сложно понять, если вы никогда не использовали функциональное программирование.
То, что делает метод, — это применение функции к результирующему значению выполнения a groupByKey
. Давайте проанализируем это шаг за шагом.
>>> rdd.groupByKey().take(1)
[('0.02704600', <pyspark.resultiterable.ResultIterable object at 0x7fac15f1fd90>)]
Делая это, мы получаем RDD с одной записью на ключ (первый столбец в парном RDD), и значение является итеративным. Мы можем представить это как список.
Мы получили из базового RDD
[['0.02703300', '1.30900000'],
['0.02703300', '0.61800000'],
['0.02704600', '3.90800000'],
['0.02704700', '4.00000000'],
['0.02704700', '7.44600000']]
В один сгруппированный
[('0.02704600', <pyspark.resultiterable.ResultIterable object at 0x7fac15f2fe20>),
('0.02704700', <pyspark.resultiterable.ResultIterable object at 0x7fac15f2f910>),
('0.02703300', <pyspark.resultiterable.ResultIterable object at 0x7fac15f2f550>)]
Затем мы должны применить нужную функцию к значениям. Мы можем сделать это, передав нужную функцию mapValues
методу (в моем случае я передаю непосредственно лямбда-функцию)
>>> rdd.groupByKey().mapValues(lambda k: (max(k), min(k))).collect()
[('0.02704600', ('3.90800000', '3.90800000')),
('0.02704700', ('7.44600000', '4.00000000')),
('0.02703300', ('1.30900000', '0.61800000'))]
Есть несколько соображений:
reducebyKey
более аккуратный и эффективный. Хотя это может сбивать с толку- Если вам нужны как максимальное, так и минимальное значение, попробуйте сделать это одновременно, как я показал (вы также можете сделать это с помощью reduceByKey ). Таким образом, вместо того, чтобы выполнять два прохода по данным, вы просто делаете это один раз.
- Попробуйте использовать DataFrame (SQL) API. Он более современный и пытается оптимизировать вычисления для вас.
reduceByKey
функция должна быть немного другой, поскольку она получает два элемента вместо повторяющегося
>>> rdd.reduceByKey(lambda a, b: (max(a,b), min(a, b))).collect()
[('0.02704600', '3.90800000'),
('0.02704700', ('7.44600000', '4.00000000')),
('0.02703300', ('1.30900000', '0.61800000'))]
Комментарии:
1. какую версию Spark / Python вы используете? Потому что я только что попробовал это и вернул ошибку: TypeError: ‘>’ не поддерживается между экземплярами ‘float’ и ‘tuple’
Ответ №2:
Вы можете использовать reduceByKey
:
minimum = rdd.reduceByKey(min)
maximum = rdd.reduceByKey(max)