#apache-spark #hadoop #pyspark #parquet
#apache-spark #hadoop #pyspark #parquet
Вопрос:
Я пытаюсь перезаписать файл Parquet в S3 с помощью Pyspark. Для корзины включено управление версиями.
Я использую следующий код:
Написать версию v1:
df_v1.repartition(1).write.parquet(path='s3a://bucket/file1.parquet')
Обновление v2:
df_v1 = spark.read.parquet("s3a://bucket/file1.parquet")
df_v2 = df_v1.... <- transform
df_v2.repartition(1).write.mode("overwrite").parquet('s3a://bucket/file1.parquet')
Но когда я читаю df_v2, он содержит данные из обеих записей. Кроме того, когда записывается df_v1, я вижу одну часть — файл xxx.snappy.parquet, после записи df_v2 я вижу две. Он ведет себя как добавление, а не перезапись.
Чего мне не хватает? Спасибо
Spark = 2.4.4 Hadoop = 2.7.3
Ответ №1:
Проблема, вероятно, связана с тем фактом, что вы используете S3. в S3 файловая система основана на ключе / значении, что означает, что нет физической папки с именем file1.parquet
, есть только файлы, ключи которых являются чем-то вроде s3a://bucket/file1.parquet/part-XXXXX-b1e8fd43-ff42-46b4-a74c-9186713c26c6-c000.parquet
(это просто пример).
Итак, когда вы «перезаписываете», вы должны перезаписать папку, которая не может быть обнаружена. Таким образом, spark создает новые ключи: это похоже на режим «добавления».
Вероятно, вам нужно написать свою собственную функцию, которая перезаписывает «папку» — удаляет все ключи, которые содержат папку в их названии.