Перезаписать файл Parquet с помощью Pyspark

#apache-spark #hadoop #pyspark #parquet

#apache-spark #hadoop #pyspark #parquet

Вопрос:

Я пытаюсь перезаписать файл Parquet в S3 с помощью Pyspark. Для корзины включено управление версиями.

Я использую следующий код:

Написать версию v1:

 df_v1.repartition(1).write.parquet(path='s3a://bucket/file1.parquet')
  

Обновление v2:

 df_v1 = spark.read.parquet("s3a://bucket/file1.parquet")
df_v2 = df_v1.... <- transform
df_v2.repartition(1).write.mode("overwrite").parquet('s3a://bucket/file1.parquet')
  

Но когда я читаю df_v2, он содержит данные из обеих записей. Кроме того, когда записывается df_v1, я вижу одну часть — файл xxx.snappy.parquet, после записи df_v2 я вижу две. Он ведет себя как добавление, а не перезапись.

Чего мне не хватает? Спасибо

Spark = 2.4.4 Hadoop = 2.7.3

Ответ №1:

Проблема, вероятно, связана с тем фактом, что вы используете S3. в S3 файловая система основана на ключе / значении, что означает, что нет физической папки с именем file1.parquet , есть только файлы, ключи которых являются чем-то вроде s3a://bucket/file1.parquet/part-XXXXX-b1e8fd43-ff42-46b4-a74c-9186713c26c6-c000.parquet (это просто пример).

Итак, когда вы «перезаписываете», вы должны перезаписать папку, которая не может быть обнаружена. Таким образом, spark создает новые ключи: это похоже на режим «добавления».

Вероятно, вам нужно написать свою собственную функцию, которая перезаписывает «папку» — удаляет все ключи, которые содержат папку в их названии.