Присоединяйтесь ко мне дважды на pyspark, может быть, я не понимаю лени?

#apache-spark #pyspark #apache-spark-sql

Вопрос:

слишком давно я не использовал spark в прошлый раз, я снова подключился к нему с помощью Spark 3.1, и вот моя проблема: у меня осталось 20 млн строк, соединенных с 400 млн строк, исходный код:

 times= [50000,20000,10000,1000]
for time in times:
    join = (df_a.join(df_b,
                                 [
                                     df_a["a"] == df_b["a"],
                                     (unix_timestamp(events["date"]) - unix_timestamp(details["date"])) / 3600
                                     > 5,
                                     (df_a["task"]) = (df_b["task"]-time))
                                 ], 'left')
 

зная, что каждая итерация (переменная времени) содержит следующую, я подумал о том, чтобы облегчить фрейм данных перед сравнением с каждым значением, поэтому закодировал это:

 times= [50000,20000,10000,1000]

join = (df_a.join(df_b,
                                 [
                                     df_a["a"] == df_b["a"],
                                     (unix_timestamp(events["date"]) - unix_timestamp(details["date"])) / 3600
                                     > 5,
                                     (df_a["task"]) = (df_b["task"]-50000))
                                 ], 'left')

join.checkpoint() # Save current state, and cleaned dataframe

for time in times:
    step_join = join_df.where((join_df["task"]) = (join_df["task"]-time)))
    # Make calculations and Store result for the iteration...
 

При взгляде на визуальную диаграмму SQL на сервере истории Spark кажется, что мое улучшенное решение (?) для второго соединения не используется, оно заставляет все левое соединение снова соединяться на каждой итерации, не используя более чистый и легкий фрейм данных.

Моей последней идеей было использовать новый df для следующей итерации, чтобы каждый фильтр был легче. Верны ли мои мысли? я что-то упускаю?

Изображение того, как это выглядит, это все еще работающий код, соединение Sortmergejo в середине отсоединено для фильтрации, второй «Фильтр» фильтрует только немного больше, но слева и справа вы можете видеть, что он снова вычисляет соединение sortmergejo вместо повторного использования ранее вычисленного. введите описание изображения здесь

И вот как выглядит обработка: каждый раз одни и те же вычисления плюс фильтр введите описание изображения здесь

В прошлый раз пришлось удалить контрольную точку, потому что с 55B строками в соединении было трудно хранить данные (>100 ТБ).

Моя конфигурация кластера для 30 экземпляров 64vcore 488 ГБ ОЗУ драйвер

         "spark.executor.instances", "249").config("spark.executor.memoryOverhead", "10240").config(
        "spark.executor.memory", "87g").config("spark.executor.cores", "12").config("spark.driver.cores", "12").config(
        "spark.default.parallelism", "5976").config("spark.sql.adaptive.enabled", "true").config(
        "spark.sql.adaptive.skewJoin.enabled", "true").config("spark.sql.shuffle.partitions", "3100").config(
        "spark.yarn.driver.memoryOverhead", "10240").config("spark.sql.autoBroadcastJoinThreshold", "2100").config(
        "spark.sql.legacy.timeParserPolicy", "LEGACY").getOrCreate()
 

Я использую калькулятор excel на этом сайте для настройки всего, кроме spark.sql.shuffle.разделов https://www.c2fo.io/c2fo/spark/aws/emr/2016/07/06/apache-spark-config-cheatsheet/ теперь используется 10 исполнителей на узел

Попробовал использовать .cache() для соединения, оно все еще медленнее, чем 4 параллельных соединения, первое соединение намного медленнее. Обратите внимание, что .cache() хорош для подмножества, но для результата объединения 100 ТБ это будет медленнее, потому что он будет кэшироваться на диск. Спасибо!

Комментарии:

1. Из вопроса довольно сложно понять, что вы пытаетесь сделать, но я бы предположил, что зацикливание 4 соединений в одних и тех же наборах данных, вероятно, не является правильным решением. Может быть, вы подробнее объясните проблему, которую пытаетесь решить, это модель атрибуции ?

2. Я создаю несколько групп, привязанных ко времени, а затем создаю несколько агрегаций, эти группы фильтруются с использованием времени в задаче «a» и задаче «b», поэтому мне нужно присоединиться к ним, затем для каждой группы я произвел некоторые вычисления, такие как сумма/среднее значение/и т. Д., И создал фрейм данных со всеми агрегациями для каждого временного окна

Ответ №1:

Обновленный ответ(5/9/2021):

Я думаю, что вы можете попытаться указать столбец раздела в своих данных с withColumn помощью метода, указав значения when(.. ,.. ).otherwise(..) после объединения (вы можете вложить несколько блоков when/в противном случае для 4 разных значений). Чем просто записывать свои данные partitionBy . В этом случае вам не нужно будет пересчитывать 4 раза. Одного расчета будет достаточно.

Старый Ответ:

Я думаю, что вы, возможно, захотите использовать df.cache() функцию для предотвращения тех же вычислений.

 join_df = (df_a.join(df_b,
                                 [
                                     df_a["a"] == df_b["a"],
                                     (unix_timestamp(events["date"]) - unix_timestamp(details["date"])) / 3600
                                     > 5,
                                     (df_a["task"]) = (df_b["task"]-50000))
                                 ], 'left').cache()
 

Spark вычислит и сохранит все результаты в памяти и на диске. Он будет повторно использовать предварительно рассчитанные join_df для новых фильтров.

Комментарии:

1. Я не думаю, что у меня есть 100 ТБ для кэширования, может быть, я смогу добавить немного диска (дешевле, чем оперативная память на AWS). Не слишком ли много для кэширования? Спасибо!

2. Да, диск был бы намного дешевле, но память намного быстрее. К сожалению, в spark, чтобы предотвратить вычисления, нам нужно кэшировать его, иначе он будет пытаться пересчитать каждое действие. Если один из ваших столов маленький, вы также можете попытаться транслировать его, как df_a.join(broadcast(df_b), ... это предотвратит перетасовку.

3. Гб данных с обеих сторон, трансляция замедляет работу, нужно посмотреть, меньше ли времени для сохранения на диске, чем сделать это соединение в 4 раза

4. С помощью кэша памяти это было медленнее, чем выполнение 4 соединений…не знаю почему, основное соединение заняло больше времени, чем 4 отдельных соединения в цикле for

5. Хм, в этом случае ваш общий объем базовых данных для чтения намного ниже, чем объединенное значение, из-за этого кэширование в вашем случае происходит медленнее. Я думаю, что вы можете попытаться указать столбец раздела в своих данных с withColumn помощью when(.. ,.. ).otherwise) mathob, указав значения после объединения. чем просто записывать свои данные partitionBy . В этом случае вам не нужно будет пересчитывать 4 раза. одного расчета будет достаточно.