#python #mapreduce #pyspark
#python #mapreduce #pyspark
Вопрос:
Я хочу иметь логику в моем combineByKey
/ reduceByKey
/ foldByKey
, которая зависит от ключа, с которым в данный момент выполняется операция. Из того, что я могу сказать по сигнатурам методов, единственными параметрами, передаваемыми этим методам, являются значения, которые объединяются / уменьшаются / складываются.
Используя простой пример, где у меня просто есть RDD, который является (int, int)
кортежами, результат, который я хочу, — это rdd с ключом tuple[0]
, значение которого int
ближе всего к ключу.
Например:
(1, 8)
(1, 3)
(1, -1)
(2, 4)
(2, 5)
(2, 2)
(3, 2)
(3, 4)
Должен сводиться к:
(1, 3)
(2, 2)
(3, 2)
Обратите внимание на сравнение (1, 3)
, и (1, -1)
мне все равно, какой из них выбран, поскольку они находятся на одинаковом расстоянии. То же самое для ключа «3».
То, как я мог бы представить, что это будет что-то вроде:
rdd.reduceByKey(lambda key, v1, v2: v1 if abs(key - v1) < abs(key - v2) else v2)
Но reduce
функция принимает только 2 аргумента: два значения для объединения. Кажется, что самым простым методом было бы ссылаться на ключ в моем редукторе для достижения моей цели; возможно ли это?
Если я попробую это, я получу сообщение об ошибке:
rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)])
rdd.reduceByKey(lambda key, v1, v2: v1 if abs(key - v1) < abs(key - v2) else v2).collect()
TypeError: () принимает ровно 3 аргумента (2 задано)
На самом деле я не ищу решения этой проблемы с примером. Мне интересно, есть ли причина, по которой ключ не передается reduceByKey
функции? Я предполагаю, что мне не хватает какого-то основного принципа философии map-reduce.
Обратите внимание, что я могу решить свой пример, вставив шаг сопоставления, который сопоставляет каждое значение кортежу, состоящему из значения и расстояния от ключа:
rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)])
rdd = rdd.map(lambda tup: (tup[0], tuple([tup[1], abs(tup[0] - tup[1])])))
rdd.reduceByKey(lambda v1, v2: v1 if v1[1] < v2[1] else v2).mapValues(lambda x: x[0]).collectAsMap()
Ответ №1:
Я думаю, что нет веских причин не передавать ключи.
тем не менее, я чувствую reduceByKey
, что API был разработан для общего варианта использования — вычислить сумму значений для каждого ключа. До сих пор мне никогда не требовались ключи для вычисления значения. Но это только мое мнение.
Также проблема, которую вы решили, кажется простой проблемой агрегации. min()
и groupByKey
могу найти ответ. Я знаю, что вы не ищете решения, но вот как я бы написал.
from pyspark import SparkContext
sc = SparkContext()
rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)])
reduced = rdd.groupByKey().map(lambda (k, v): (k, min(v, key=lambda e:abs(e-k))))
print(reduced.collectAsMap())
Результат
{1: 3, 2: 2, 3: 2}
Комментарии:
1. Хороший ответ. Очень возможно, что реальный ответ на мой вопрос просто «потому что это не API». Но мне все равно было интересно об этом.