запись spark shuffle взрывается на небольших, кэшированных и обработанных df

#apache-spark #pyspark #shuffle

Вопрос:

Привет, сообщество Spark,

Я работаю с pyspark, Spark 3.0.

Я сталкиваюсь со следующей проблемой (псевдокод):


 df1 #  90k rows, 10MB

df2  # 20k rows, 2MB

sql.shuffling.partitions=200

df3=df1.crossjoin(F.broadcast(df2)) # 2bn rows, few giga bytes

df3 some functions # use of UDF for ceja/jellyfish

df3 some filtering # results in a small file 400k rows, 200Mb

df3.count() #  to force execution

df3=df3.cache()  # to avoid recomputation of previous steps


df3.write.partitionby('date').parquet('some_file_name') #  this partitioning  creates 30-40 partitions
 

Во время записи файла parquet снова вызывается перетасовка, и она взрывается до небес, запись в случайном порядке 300 ГБ. Что я не понимаю, так как df3 на данный момент-это небольшие 400 тысяч строк, 200 метров. (запись в случайном порядке взрывается во время вычисления перекрестного соединения, но это ожидается, если я прав)

Я никогда не могу написать файл паркета и получаю различные ошибки, такие как

«Не осталось места на диске» или «Исключение сокета: сброс подключения».

Почему запись в случайном порядке происходит в небольшом файле, который ранее был кэширован? Ребята, видите ли вы что-нибудь, что вы сделали бы по-другому?

Заранее благодарю вас за вашу помощь!

Комментарии:

1. кэширование должно быть вызвано до «.count».

2. Я бы сказал, зависит от того, что делает ваш UDF.

3. в зависимости от количества разделов с двух сторон crossjoin , выходной фрейм данных может содержать миллионы разделов. попробуйте что-нибудь вроде этого df3=df1.repartition(100).crossjoin(F.broadcast(df2.repartition(10)))