#python-3.x #apache-spark #pyspark
#python-3.x #apache-spark #pyspark
Вопрос:
Я пытаюсь изучить Spark на Python и застрял с combineByKey
для усреднения значений в парах ключ-значение. На самом деле, мое замешательство связано не с combineByKey
синтаксисом, а с тем, что происходит после. Типичный пример (из книги O’Rielly 2015 Learning Spark Book) можно увидеть в Интернете во многих местах; вот один.
Проблема заключается в sumCount.map(lambda (key, (totalSum, count)): (key, totalSum / count)).collectAsMap()
инструкции. При использовании spark 2.0.1 и IPython 3.5.2 возникает исключение синтаксической ошибки. Упрощение до чего-то, что должно работать (и это то, что есть в книге О’Рейли): sumCount.map(lambda key,vals: (key, vals[0]/vals[1])).collectAsMap()
заставляет Spark сходить с ума из-за исключений java, но я замечаю TypeError: <lambda>() missing 1 required positional argument: 'v'
ошибку.
Кто-нибудь может указать мне на пример этой функциональности, которая действительно работает с последней версией Spark amp; Python? Для полноты картины я включил свой собственный минимально рабочий (или, скорее, нерабочий) пример:
In: pRDD = sc.parallelize([("s",5),("g",3),("g",10),("c",2),("s",10),("s",3),("g",-1),("c",20),("c",2)])
In: cbk = pRDD.combineByKey(lambda x:(x,1), lambda x,y:(x[0] y,x[1] 1),lambda x,y:(x[0] y[0],x[1] y[1]))
In: cbk.collect()
Out: [('s', (18, 3)), ('g', (12, 3)), ('c', (24, 3))]
In: cbk.map(lambda key,val:(k,val[0]/val[1])).collectAsMap() <-- errors
Это достаточно просто вычислить [(e[0],e[1][0]/e[1][1]) for e in cbk.collect()]
, но я бы предпочел, чтобы работал «Sparkic» способ.
Ответ №1:
Шаг за шагом:
lambda (key, (totalSum, count)): ...
это так называемая распаковка параметров кортежа, которая была удалена в Python.-
RDD.map
принимает функцию, ожидаемую в качестве единственного аргумента. Функция, которую вы пытаетесь использовать:lambda key, vals: ...
Это функция, которая ожидает два аргумента, а не один. Допустимый перевод из синтаксиса 2.x был бы
lambda key_vals: (key_vals[0], key_vals[1][0] / key_vals[1][1])
или:
def get_mean(key_vals): key, (total, cnt) = key_vals return key, total / cnt cbk.map(get_mean)
Вы также можете сделать это намного проще с помощью
mapValues
:cbk.mapValues(lambda x: x[0] / x[1])
-
Наконец, численно стабильное решение было бы:
from pyspark.statcounter import StatCounter (pRDD .combineByKey( lambda x: StatCounter([x]), StatCounter.merge, StatCounter.mergeStats) .mapValues(StatCounter.mean))
Ответ №2:
Усреднение по определенному значению столбца может быть выполнено с использованием концепции Window. Рассмотрим следующий код:
import pyspark.sql.functions as F
from pyspark.sql import Window
df = spark.createDataFrame([('a', 2), ('b', 3), ('a', 6), ('b', 5)],
['a', 'i'])
win = Window.partitionBy('a')
df.withColumn('avg', F.avg('i').over(win)).show()
Дало бы:
--- --- ---
| a| i|avg|
--- --- ---
| b| 3|4.0|
| b| 5|4.0|
| a| 2|4.0|
| a| 6|4.0|
--- --- ---
Средняя агрегация выполняется для каждого рабочего по отдельности, не требует обратного перехода к хосту и, следовательно, эффективна.
Комментарии:
1. Спасибо, но, как ясно из всего, что уже опубликовано по этому вопросу, я специально задавал вопрос о том, почему в операции
combineByKey
->map
произошла ошибка.