PySpark считывает csv из zip-файла в s3 с двумя различными типами файлов

#python #apache-spark #amazon-s3 #pyspark

Вопрос:

У меня есть zip — файл с CSV и файлом сопоставления json в нем. Я хотел бы прочитать csv в фрейм данных spark, а файл сопоставления json-в словарь. Я сделал последнюю часть, делая это:

 import boto3

obj = s3.get_object(Bucket='bucket', Key='key')

z = zipfile.ZipFile(io.BytesIO(obj["Body"].read()))

csvjson = json.loads(z.open(files[1]).read().decode('utf-8'))
 

В общем, я хотел бы сделать следующее, чтобы получить df из csv-файла:

 dfRaw = spark.read 
    .format("text") 
    .option("multiLine","true") 
    .option("inferSchema","false") 
    .option("header","true") 
    .option("ignoreLeadingWhiteSpace","true") 
    .option("ignoreTrailingWhiteSpace","true") 
    .load(z.open(files[0]).read().decode('utf-8'))
 

Однако это, очевидно, не работает, потому load() что ожидает путь к файлу, а не сами строки. Как я могу прочитать этот файл из zip-файла в фрейм данных spark?

Комментарии:

1. как насчет загрузки его sc.parallelize(...) с помощью, а затем использования to_csv ?

2. @pltc можете ли вы опубликовать пример? Я думаю, что та часть, на которой я здесь зациклился, — это доступ к нему из zip-архива

Ответ №1:

Поскольку вы вручную «распаковываете» CSV-файл и получаете вывод в виде строки, вы можете использовать parallelize следующее

 z = zipfile.ZipFile(io.BytesIO(obj["Body"].read()))
csv = [l.decode('utf-8').replace('n', '') for l in z.open(files[0]).readlines()]

(spark
    .sparkContext
    .parallelize(csv)
    .toDF(T.StringType())
    .withColumn('value', F.from_csv('value', 'ID int, Trxn_Date string')) # your schema goes here
    .select('value.*')
    .show(10, False)
)

# Output
 ---- ---------- 
|ID  |Trxn_Date |
 ---- ---------- 
|null|Trxn_Date |
|100 |2021-03-24|
|133 |2021-01-22|
 ---- ----------