Распараллеливание вложенного for-цикла в spark

#apache-spark #pyspark

#apache-spark #pyspark

Вопрос:

Я пытаюсь распараллелить существующий алгоритм в spark (таким образом, чтобы он масштабировался). Я упростил это для целей вопроса, но это выглядит примерно так:

 for p in all_p:
    all_q = calculate1(p)

    results_for_p = []
    for q in all_q:
        results_for_p.append(calculate2(q))

    save_results(results_for_p)
  

В принципе, у меня есть вложенный цикл for с двумя долго выполняющимися функциями, которые я хотел бы запускать параллельно. Однако параметры вложенной функции calculate2 имеют переменный размер в зависимости от каждого из них p .

Моей попыткой было сгладить входные данные, чтобы calculate2 выполнялся на all_q и all_p вместе:

 rdd = sc.parallelize(all_p)
all_q_each_p = rdd.map(calculate1).collect()

# flatten output to something we can parallelize:
all_q_all_p = []
for all_q in all_q_each_p:
    all_q_all_p.append(all_q)

rdd = sc.parallelize(all_q_all_p)
res = rdd.map(calculate2).collect()

# How to do this?? 
collect_and_save_all_results(res)
  

Как написать это таким образом, который будет хорошо масштабироваться?

Комментарии:

1. Я не уверен, что мы можем говорить о вложенном распараллеливании.

2. @thebluephantom Я обновил заголовок до «распараллеливание вложенных циклов»

Ответ №1:

Это именно тот тип проблемы, который flatMap решается. flatMap изменяет размер rdd по умолчанию.

Код становится намного проще:

 rdd = sc.parallelize(all_p)

rdd.flatMap(calculate1).map(
    lambda args: calculate2(*args)
).collect()