#apache-spark #pyspark #shuffle
Вопрос:
Привет, сообщество Spark,
Я работаю с pyspark, Spark 3.0.
Я сталкиваюсь со следующей проблемой (псевдокод):
df1 # 90k rows, 10MB
df2 # 20k rows, 2MB
sql.shuffling.partitions=200
df3=df1.crossjoin(F.broadcast(df2)) # 2bn rows, few giga bytes
df3 some functions # use of UDF for ceja/jellyfish
df3 some filtering # results in a small file 400k rows, 200Mb
df3.count() # to force execution
df3=df3.cache() # to avoid recomputation of previous steps
df3.write.partitionby('date').parquet('some_file_name') # this partitioning creates 30-40 partitions
Во время записи файла parquet снова вызывается перетасовка, и она взрывается до небес, запись в случайном порядке 300 ГБ. Что я не понимаю, так как df3 на данный момент-это небольшие 400 тысяч строк, 200 метров. (запись в случайном порядке взрывается во время вычисления перекрестного соединения, но это ожидается, если я прав)
Я никогда не могу написать файл паркета и получаю различные ошибки, такие как
«Не осталось места на диске» или «Исключение сокета: сброс подключения».
Почему запись в случайном порядке происходит в небольшом файле, который ранее был кэширован? Ребята, видите ли вы что-нибудь, что вы сделали бы по-другому?
Заранее благодарю вас за вашу помощь!
Комментарии:
1. кэширование должно быть вызвано до «.count».
2. Я бы сказал, зависит от того, что делает ваш UDF.
3. в зависимости от количества разделов с двух сторон
crossjoin
, выходной фрейм данных может содержать миллионы разделов. попробуйте что-нибудь вроде этогоdf3=df1.repartition(100).crossjoin(F.broadcast(df2.repartition(10)))