Доступ к значению df.write.partitionBy в имени файла и выполнение преобразований при сохранении

#scala #apache-spark

#scala #apache-spark

Вопрос:

Я делаю что-то вроде

 df.write.mode("overwrite").partitionBy("sourcefilename").format("orc").save("s3a://my/dir/path/output-data");
  

Приведенный выше код успешно генерирует имя файла orc с каталогом раздела, однако имя является чем-то вроде part-0000.

Мне нужно изменить раздел на значение (sourcefilename) при сохранении, например, если имя исходного файла равно ABC, то каталог раздела (который будет создан при выполнении записи) должен быть 123, если DEF, то 345 и так далее.

Как мы можем выполнить вышеуказанные требования? Я использую AWS S3 для чтения и записи файлов.

Я использую Spark 2.x и Scala 2.11.

Комментарии:

1. Ник, как ты относишься к ответам? Вы решили иначе?

2. Да, ответы есть, но не то, что я искал. Более того, почему два отрицательных отзыва, определенно неверно и не мотивирует!

3. Ну, ответ может быть не тем, что вы хотите услышать, конечно. Я не голосовал против и был подвергнут этому. Это природа некоторых здесь, тем не менее, это очень информативный сайт.

Ответ №1:

Учитывая, что в этом примере показан общий способ записи DF

 df.write.partitionBy("EVENT_NAME","dt","hour").save("/apps/hive/warehouse/db/sample")
  

формат, тогда ваш подход должен заключаться в создании дополнительного столбца xc, который задается с помощью UDF или некоторого def или val, который устанавливает xc в соответствии с именем, например ABC —> 123 и т.д. Затем вы разбиваете по этому каталогу xc и принимаете эту часть — xxxxx именно так это работает в Spark.

Впоследствии вы могли бы сами переименовать файлы с помощью скрипта.

Комментарии:

1. Спасибо, но я знаю, что мог бы переименовать их позже. Мне было интересно, есть ли способ внедрить имя файла во время выполнения операции .save

2. Нет, вы не можете обойти аспект части заранее. Не имеет смысла. Следовательно, ответ правильный.

Ответ №2:

Стиль part-1234 заключается в том, как разделяется работа: разные задачи получают свой собственный раздел разделенного источника данных и сохраняют его с нумерацией, чтобы гарантировать, что никакая другая задача не генерирует выходные данные с таким же именем.

Это фундаментально для получения производительности параллельного выполнения.