Python Spark combineByKey среднее значение

#python-3.x #apache-spark #pyspark

#python-3.x #apache-spark #pyspark

Вопрос:

Я пытаюсь изучить Spark на Python и застрял с combineByKey для усреднения значений в парах ключ-значение. На самом деле, мое замешательство связано не с combineByKey синтаксисом, а с тем, что происходит после. Типичный пример (из книги O’Rielly 2015 Learning Spark Book) можно увидеть в Интернете во многих местах; вот один.

Проблема заключается в sumCount.map(lambda (key, (totalSum, count)): (key, totalSum / count)).collectAsMap() инструкции. При использовании spark 2.0.1 и IPython 3.5.2 возникает исключение синтаксической ошибки. Упрощение до чего-то, что должно работать (и это то, что есть в книге О’Рейли): sumCount.map(lambda key,vals: (key, vals[0]/vals[1])).collectAsMap() заставляет Spark сходить с ума из-за исключений java, но я замечаю TypeError: <lambda>() missing 1 required positional argument: 'v' ошибку.

Кто-нибудь может указать мне на пример этой функциональности, которая действительно работает с последней версией Spark amp; Python? Для полноты картины я включил свой собственный минимально рабочий (или, скорее, нерабочий) пример:

 In: pRDD = sc.parallelize([("s",5),("g",3),("g",10),("c",2),("s",10),("s",3),("g",-1),("c",20),("c",2)])
In: cbk = pRDD.combineByKey(lambda x:(x,1), lambda x,y:(x[0] y,x[1] 1),lambda x,y:(x[0] y[0],x[1] y[1]))
In: cbk.collect()
Out: [('s', (18, 3)), ('g', (12, 3)), ('c', (24, 3))]
In: cbk.map(lambda key,val:(k,val[0]/val[1])).collectAsMap() <-- errors
  

Это достаточно просто вычислить [(e[0],e[1][0]/e[1][1]) for e in cbk.collect()] , но я бы предпочел, чтобы работал «Sparkic» способ.

Ответ №1:

Шаг за шагом:

  • lambda (key, (totalSum, count)): ... это так называемая распаковка параметров кортежа, которая была удалена в Python.
  • RDD.map принимает функцию, ожидаемую в качестве единственного аргумента. Функция, которую вы пытаетесь использовать:

     lambda key, vals: ...
      

    Это функция, которая ожидает два аргумента, а не один. Допустимый перевод из синтаксиса 2.x был бы

     lambda key_vals: (key_vals[0], key_vals[1][0] / key_vals[1][1])
      

    или:

     def get_mean(key_vals):
        key, (total, cnt) = key_vals
        return key, total / cnt
    
    cbk.map(get_mean)
      

    Вы также можете сделать это намного проще с помощью mapValues :

     cbk.mapValues(lambda x: x[0] / x[1])
      
  • Наконец, численно стабильное решение было бы:

     from pyspark.statcounter import StatCounter
    
    (pRDD
        .combineByKey(
            lambda x: StatCounter([x]),
            StatCounter.merge,
            StatCounter.mergeStats)
        .mapValues(StatCounter.mean))
      

Ответ №2:

Усреднение по определенному значению столбца может быть выполнено с использованием концепции Window. Рассмотрим следующий код:

 import pyspark.sql.functions as F
from pyspark.sql import Window
df = spark.createDataFrame([('a', 2), ('b', 3), ('a', 6), ('b', 5)],
                           ['a', 'i'])
win = Window.partitionBy('a')
df.withColumn('avg', F.avg('i').over(win)).show()
  

Дало бы:

  --- --- --- 
|  a|  i|avg|
 --- --- --- 
|  b|  3|4.0|
|  b|  5|4.0|
|  a|  2|4.0|
|  a|  6|4.0|
 --- --- --- 
  

Средняя агрегация выполняется для каждого рабочего по отдельности, не требует обратного перехода к хосту и, следовательно, эффективна.

Комментарии:

1. Спасибо, но, как ясно из всего, что уже опубликовано по этому вопросу, я специально задавал вопрос о том, почему в операции combineByKey -> map произошла ошибка.