Вычисляются ли .withColumn и .agg параллельно в pyspark?

#python #apache-spark #pyspark #apache-spark-sql

#python #apache-spark #pyspark #apache-spark-sql

Вопрос:

Рассмотрим , например,

 df.withColumn("customr_num", col("customr_num").cast("integer")).
withColumn("customr_type", col("customr_type").cast("integer")).
agg(myMax(sCollect_list("customr_num")).alias("myMaxCustomr_num"), 
    myMean(sCollect_list("customr_type")).alias("myMeanCustomr_type"), 
    myMean(sCollect_list("customr_num")).alias("myMeancustomr_num"),
    sMin("customr_num").alias("min_customr_num")).show()
  

Вычисляются ли .withColumn и список функций внутри agg (sMin, myMax, myMean и т.д.) Параллельно с помощью Spark или последовательно?

Если последовательно, как мы их распараллеливаем?

Ответ №1:

По сути, пока у вас более одного раздела, операции всегда распараллеливаются в spark. Если вы имеете в виду, будут ли withColumn операции вычисляться за один проход над набором данных, то ответ также положительный. В общем, вы можете использовать пользовательский интерфейс Spark, чтобы узнать больше о том, как вычисляются данные.

Давайте рассмотрим пример, который очень похож на ваш.

 spark.range(1000)
    .withColumn("test", 'id cast "double")
    .withColumn("test2", 'id   10)
    .agg(sum('id), mean('test2), count('*))
    .show
  

И давайте взглянем на пользовательский интерфейс.

введите описание изображения здесь

Range соответствует созданию данных, затем у вас есть project (две withColumn операции), а затем агрегирование ( agg ) внутри каждого раздела (здесь у нас их 2). В данном разделе эти действия выполняются последовательно, но для всех разделов одновременно. Кроме того, они находятся на одном этапе (в синем поле), что означает, что все они вычисляются за один проход по данным.

Затем происходит перетасовка ( exchange ), которая означает, что данные обмениваются по сети (результат агрегирования для каждого раздела) и выполняется окончательное агрегирование ( HashAggregate ), а затем отправляются драйверу ( collect )

Комментарии:

1. Что я имел в виду: рассмотрим ваш пример, есть пара операций с столбцами: с COLUMN_TEST, с column_test2, agg_sum, agg_mean, agg_count. Если я правильно понимаю ваш ответ, у нас есть (с column_test, Сcolumn_test2) параллельно, затем, после того, как это будет сделано, у нас есть 3 операции (agg_sum, agg_mean, agg_count) параллельно. Чего я боюсь, так это (с column_test) -> (С column_tes2) -> (agg_sum, agg_mean, agg_count) или, что еще хуже, все последовательно. У меня есть некоторые операции со всеми столбцами, и я боюсь, что они не вычисляются параллельно.

2. Параллельно — это не то слово, которое я бы использовал, но в вашем примере, а также в моем, я могу заверить вас, что spark не передает данные несколько раз. Все вычисления (с помощью столбца и разделенных по разделам agg) вычисляются одновременно (т. Е. во время одного и того же прохода данных).

3. В принципе, пока вы используете SparkSQL, если вы можете придумать разумную оптимизацию, которую вы могли бы использовать для ускорения вашего запроса, Spark, вероятно, выполняет ее. Чтобы убедиться, просто проверьте пользовательский интерфейс. Не стесняйтесь, если это все еще неясно.

4. Итак, что он хочет знать, так это то, выполняются ли значения withColumns параллельно внутри раздела или одно за другим. Вы излагаете последовательно, насколько я понимаю. Нюанс в том, что может работать меньше исполнителей, чем ожидалось.

5. Возможно, distributed — это то слово, которое вы ищете.