#apache-spark #pyspark #etl #aws-glue
Вопрос:
Я пытаюсь написать код в AWS Glue ETL для записи фрейма данных в формате XML, сжатого в ZIP для загрузки в папку s3. Я смог написать код для JSON, parquet, orc, но не смог найти его для XML.
Главной ошибкой было:
DataFrameWriter не имеет XML-кода attr
d0.write.format("xml").option("compression", "gzip").save("/content/sample_data/compressed_xml")
d0.write.format("xml").option("codec", "gzip").save("/content/sample_data/compressed_xml")
Комментарии:
1. Spark не обеспечивает встроенную поддержку XML-кода источника данных. Вам нужно использовать spark-xml .
2. Можно ли его использовать в производстве? Плюс, скажем, даже для личного проекта pyspark в Google Colab, как бы вы это сделали?
3. Существует ли какой-либо другой подход, скажем, использование функции службы Lambda для извлечения XML-файла из s3 и его архивирования в другое местоположение s3?
4. Можно сделать это с помощью лямбды
Ответ №1:
Можно сделать это с помощью лямбды :
импорт json импорт lzma, импорт ОС boto3 импорт zip-файла
def lambda_handler(событие, контекст):
sourceSystem = str(event['gzip_args']['sourcesystem'])
input_filename = str(event['gzip_args']['filename'])
bucket_in_name = str(event['gzip_args']['bucket_in_name'])
bucket_out_name = str(event['gzip_args']['bucket_out_name'])
xml_file_read_location = str(event['gzip_args']['xml_file_read_location'])
zip_file_write_location = str(event['gzip_args']['zip_file_write_location'])
s3=boto3.client('s3')
key=xml_file_read_location sourceSystem '/' input_filename
localFilename = '/tmp/{}'.format(os.path.basename(key))
s3.download_file(Bucket=bucket_in_name, Key=key, Filename=localFilename)
os.chdir('/tmp/')
file_list = os.listdir()
#Zip and compress file
zf = zipfile.ZipFile(input_filename.replace("xml","zip"), mode='w', compression=zipfile.ZIP_DEFLATED)
#reading xml file name and zf object is writting in zip format with compression
zf.write(input_filename)
# zf.write(file_list[0])
zf.close()
##Uploading final zip file to respective target location
s3.upload_file(input_filename.replace("xml","zip"),bucket_out_name , zip_file_write_location sourceSystem '/' input_filename.replace("xml","zip"))
os.remove(localFilename)
return {
'statusCode': 200,
'body': json.dumps('HelloWorld!')
}