Для записи фрейма данных в формате xml, сжатого в zip в целевое хранилище

#apache-spark #pyspark #etl #aws-glue

Вопрос:

Я пытаюсь написать код в AWS Glue ETL для записи фрейма данных в формате XML, сжатого в ZIP для загрузки в папку s3. Я смог написать код для JSON, parquet, orc, но не смог найти его для XML.

Главной ошибкой было:

DataFrameWriter не имеет XML-кода attr

 d0.write.format("xml").option("compression", "gzip").save("/content/sample_data/compressed_xml")

d0.write.format("xml").option("codec", "gzip").save("/content/sample_data/compressed_xml")
 

Комментарии:

1. Spark не обеспечивает встроенную поддержку XML-кода источника данных. Вам нужно использовать spark-xml .

2. Можно ли его использовать в производстве? Плюс, скажем, даже для личного проекта pyspark в Google Colab, как бы вы это сделали?

3. Существует ли какой-либо другой подход, скажем, использование функции службы Lambda для извлечения XML-файла из s3 и его архивирования в другое местоположение s3?

4. Можно сделать это с помощью лямбды

Ответ №1:

Можно сделать это с помощью лямбды :

импорт json импорт lzma, импорт ОС boto3 импорт zip-файла

def lambda_handler(событие, контекст):

 sourceSystem            = str(event['gzip_args']['sourcesystem'])
input_filename          = str(event['gzip_args']['filename'])
bucket_in_name          = str(event['gzip_args']['bucket_in_name'])
bucket_out_name         = str(event['gzip_args']['bucket_out_name'])
xml_file_read_location  = str(event['gzip_args']['xml_file_read_location'])
zip_file_write_location = str(event['gzip_args']['zip_file_write_location'])

s3=boto3.client('s3')

key=xml_file_read_location sourceSystem '/' input_filename

localFilename = '/tmp/{}'.format(os.path.basename(key))

s3.download_file(Bucket=bucket_in_name, Key=key, Filename=localFilename)
os.chdir('/tmp/')

file_list = os.listdir()

#Zip and compress file
zf = zipfile.ZipFile(input_filename.replace("xml","zip"), mode='w', compression=zipfile.ZIP_DEFLATED)


#reading xml file name and zf object is writting in zip format with compression
zf.write(input_filename)

# zf.write(file_list[0])
zf.close()

##Uploading final zip file to respective target location
s3.upload_file(input_filename.replace("xml","zip"),bucket_out_name , zip_file_write_location sourceSystem '/' input_filename.replace("xml","zip"))

os.remove(localFilename)

return {
    'statusCode': 200,
    'body': json.dumps('HelloWorld!')
}