#apache-spark #pyspark
#apache-spark #pyspark
Вопрос:
Я пытаюсь распараллелить существующий алгоритм в spark (таким образом, чтобы он масштабировался). Я упростил это для целей вопроса, но это выглядит примерно так:
for p in all_p:
all_q = calculate1(p)
results_for_p = []
for q in all_q:
results_for_p.append(calculate2(q))
save_results(results_for_p)
В принципе, у меня есть вложенный цикл for с двумя долго выполняющимися функциями, которые я хотел бы запускать параллельно. Однако параметры вложенной функции calculate2
имеют переменный размер в зависимости от каждого из них p
.
Моей попыткой было сгладить входные данные, чтобы calculate2 выполнялся на all_q и all_p вместе:
rdd = sc.parallelize(all_p)
all_q_each_p = rdd.map(calculate1).collect()
# flatten output to something we can parallelize:
all_q_all_p = []
for all_q in all_q_each_p:
all_q_all_p.append(all_q)
rdd = sc.parallelize(all_q_all_p)
res = rdd.map(calculate2).collect()
# How to do this??
collect_and_save_all_results(res)
Как написать это таким образом, который будет хорошо масштабироваться?
Комментарии:
1. Я не уверен, что мы можем говорить о вложенном распараллеливании.
2. @thebluephantom Я обновил заголовок до «распараллеливание вложенных циклов»
Ответ №1:
Это именно тот тип проблемы, который flatMap
решается. flatMap
изменяет размер rdd по умолчанию.
Код становится намного проще:
rdd = sc.parallelize(all_p)
rdd.flatMap(calculate1).map(
lambda args: calculate2(*args)
).collect()