Режим перезаписи в spark вызывает проблемы

#amazon-web-services #apache-spark #pyspark #aws-glue #aws-glue-spark

Вопрос:

Я запускаю AWS Pyspark Glue Job приложение, в котором я читаю S3 raw путь, по которому были загружены данные Redshift , и делаю некоторые transformations из них поверх него. Ниже приведен мой код:

 data = spark.read.parquet(rawPath) # complete dataset. Incase of incremental it will be directly loaded from different dataframe so it would be like data = incrLoad(which has incremental records)  .   . #some transformations  .  app8 = app7.withColumn("edl_created_at", current_timestamp()) #Final line of transformation     if (incrementalLoad == str(0)):  app8.write.mode("overwrite").parquet(transformedPath)#loc1  print(":::::Transformed data written for fullload::::::")    elif (incrementalLoad == str(1)):  app8.write.mode("append").parquet(transformedPath)#loc1  print(":::Incremental Transformed data has been written::::::::")  transformedData = spark.read.parquet(transformedPath)  print("::::::Transformed data has been written:::::")  finalDF = transformedData.groupBy(col("mobilenumber")).agg(  sum(col("times_app_uninstalled")).alias("times_app_uninstalled"),  sum(col("times_uninstall_l30d")).alias("times_uninstall_l30d"),  sum(col("times_uninstall_l60d")).alias("times_uninstall_l60d"),  sum(col("times_uninstall_l90d")).alias("times_uninstall_l90d"),  sum(col("times_uninstall_l180d")).alias("times_uninstall_l180d"),  sum(col("times_uninstall_l270d")).alias("times_uninstall_l270d"),  sum(col("times_uninstall_l365d")).alias("times_uninstall_l365d"),  max(col("latest_uninstall_date")).alias("latest_uninstall_date"),  min(col("first_uninstall_date")).alias("first_uninstall_date"))  finalDF.write.mode("overwrite").parquet(transformedPath)#loc1  

Где incrementalLoad==0 указывает на а full Load и 1 указывает на а incremental transformed data load . Итак, здесь для полной загрузки я читаю полный набор данных и app8 являюсь последним преобразованным фреймом данных, в который записывается запись S3 . Теперь, в случае инкрементного, я выполняю преобразования только для incremental загруженного необработанного набора данных. Как видно из elif цикла, я добавляю преобразованный набор данных в existing transformed path . Позже, читая тот же путь, выполните некоторые агрегации и попытайтесь записать его в тот же путь, что приводит меня к следующей ошибке:

 No such file or directory   

Это из spark's lazy evaluation -за того . Потому что, когда он сталкивается с « write with overwrite режимом, он сначала удаляет каталог, а затем пытается прочитать его и так далее.

Чтобы избежать этого, я подумал о двух solutions :

  1. Хранение raw данных (complete incremental ) в одном месте, а затем выполняет преобразования, которые будут работать должным образом, но, поскольку размер данных превышает 1,5 миллиона, и каждый день размер будет увеличиваться, это не лучший способ считывания данных из S3.
  2. Создание temp каталога. Я тоже не могу этого сделать. Предположим, у меня есть два местоположения, и я читаю из одного каталога, скажем loc1 , и записываю преобразованные данные в другой каталог, скажем loc2 , но когда моя работа снова запускается на следующий день, она должна считываться, из loc2 которой я не вижу, что происходит в моем случае.

Мы очень ценим любую помощь. Что я могу сделать лучше всего в своем случае?