#amazon-web-services #apache-spark #pyspark #aws-glue #aws-glue-spark
Вопрос:
Я запускаю AWS Pyspark Glue Job
приложение, в котором я читаю S3 raw
путь, по которому были загружены данные Redshift
, и делаю некоторые transformations
из них поверх него. Ниже приведен мой код:
data = spark.read.parquet(rawPath) # complete dataset. Incase of incremental it will be directly loaded from different dataframe so it would be like data = incrLoad(which has incremental records) . . #some transformations . app8 = app7.withColumn("edl_created_at", current_timestamp()) #Final line of transformation if (incrementalLoad == str(0)): app8.write.mode("overwrite").parquet(transformedPath)#loc1 print(":::::Transformed data written for fullload::::::") elif (incrementalLoad == str(1)): app8.write.mode("append").parquet(transformedPath)#loc1 print(":::Incremental Transformed data has been written::::::::") transformedData = spark.read.parquet(transformedPath) print("::::::Transformed data has been written:::::") finalDF = transformedData.groupBy(col("mobilenumber")).agg( sum(col("times_app_uninstalled")).alias("times_app_uninstalled"), sum(col("times_uninstall_l30d")).alias("times_uninstall_l30d"), sum(col("times_uninstall_l60d")).alias("times_uninstall_l60d"), sum(col("times_uninstall_l90d")).alias("times_uninstall_l90d"), sum(col("times_uninstall_l180d")).alias("times_uninstall_l180d"), sum(col("times_uninstall_l270d")).alias("times_uninstall_l270d"), sum(col("times_uninstall_l365d")).alias("times_uninstall_l365d"), max(col("latest_uninstall_date")).alias("latest_uninstall_date"), min(col("first_uninstall_date")).alias("first_uninstall_date")) finalDF.write.mode("overwrite").parquet(transformedPath)#loc1
Где incrementalLoad==0
указывает на а full Load
и 1
указывает на а incremental transformed data load
. Итак, здесь для полной загрузки я читаю полный набор данных и app8
являюсь последним преобразованным фреймом данных, в который записывается запись S3
. Теперь, в случае инкрементного, я выполняю преобразования только для incremental
загруженного необработанного набора данных. Как видно из elif
цикла, я добавляю преобразованный набор данных в existing transformed path
. Позже, читая тот же путь, выполните некоторые агрегации и попытайтесь записать его в тот же путь, что приводит меня к следующей ошибке:
No such file or directory
Это из spark's lazy evaluation
-за того . Потому что, когда он сталкивается с « write with overwrite
режимом, он сначала удаляет каталог, а затем пытается прочитать его и так далее.
Чтобы избежать этого, я подумал о двух solutions
:
- Хранение
raw
данных(complete incremental
) в одном месте, а затем выполняет преобразования, которые будут работать должным образом, но, поскольку размер данных превышает 1,5 миллиона, и каждый день размер будет увеличиваться, это не лучший способ считывания данных из S3. - Создание
temp
каталога. Я тоже не могу этого сделать. Предположим, у меня есть два местоположения, и я читаю из одного каталога, скажемloc1
, и записываю преобразованные данные в другой каталог, скажемloc2
, но когда моя работа снова запускается на следующий день, она должна считываться, изloc2
которой я не вижу, что происходит в моем случае.
Мы очень ценим любую помощь. Что я могу сделать лучше всего в своем случае?