#apache-spark #pyspark
Вопрос:
Возможно ли в Pyspark дважды связать groupByKey()
вызов с pair_rdd?
У меня есть два уровня ключей, которые я хочу сгруппировать, прежде чем объединять, создав специальный список всех значений.
Вот мой код. Первый groupByKey()
вызов группируется по внешнему ключу, а затем передается функции отображения, в которой я надеюсь снова превратить результирующий объект в pair_rdd, чтобы я мог выполнить второй groupByKey()
и сопоставить свою функцию с ним.
(Поскольку я сокращаю, я думаю, что мог бы также использовать reduceByKey()
там?)
from pyspark.sql import SparkSession
spark = SparkSession.builder
.appName("test")
.master("local")
.config('spark.sql.shuffle.partitions', '4')
.getOrCreate()
sc = spark.sparkContext
def group_by(ws):
L = ws[0]
E = ...ws[1]... <-- Do something here to turn this from resultIterable to Pair_RDD
rr = E.groupByKey().map(output_lists)
return (L, rr)
def output_lists(ws):
el = [e[0] for e in ws[1]]
res = [ws[0]] el
return (ws[0], res)
input_data = (('A', ('G', ('xyz',))),
('A', ('G', ('xys',))),
('A', ('H', ('asd',))),
('B', ('K', ('qwe',))),
('B', ('K', ('wer',))))
data = sc.parallelize(input_data)
data = data.groupByKey().map(group_by)
print(data.take(5))
Теперь, это вообще выполнимо, или мне нужен другой подход.
Я знаю два других способа обойти:
- Объедините оба ключа в один.
- Используйте фрейм данных SparkSQL.
Но мне любопытно, есть ли способ использовать вышеуказанный подход, поскольку я все еще изучаю Spark.
Ответ №1:
Я обнаружил, что могу использовать кортежи в качестве ключей в паре RDDs. Переназначение моих входных данных таким образом означает, что требуется только один groupByKey()
, и проблема может быть решена:
input_data = ((('A', 'G'), 'xyz'),
(('A', 'G'), 'xys'),
(('A', 'H'), 'asd'),
(('B', 'K'), 'qwe'),
(('B', 'K'), 'wer'))