#apache-spark #amazon-s3 #pyspark #apache-spark-sql
#apache-искра #amazon-s3 #пыспарк #apache-spark-sql
Вопрос:
Допустим, три файла в папке s3, и будет ли чтение через spark.read.csv(s3:имя корзины/папка 1/*.csv) считывать файлы по порядку или нет ? Если нет, есть ли способ упорядочить файлы при чтении всей папки с несколькими файлами, полученными в разное время внутри.
Имя файла | загруженный файл s3/Время последнего изменения |
---|---|
s3:имя корзины/папка 1/файл1.csv | 01:00:00 |
s3:имя корзины/папка 1/файл2.csv | 01:10:00 |
s3:имя корзины/папка 1/файл3.csv | 01:20:00 |
Комментарии:
1. Как указывается полученное время? Это столбец в ваших данных или что-то вроде метки времени в файле?
2. это время загрузки файла s3 / время последнего изменения.
Ответ №1:
Вы можете достичь этого, используя следующее
- Повторите все файлы в корзине и загрузите этот csv-файл с добавлением нового столбца
last_modified
. Ведите список всех файлов dfs, которые будут загруженыdfs_list
. Поскольку pyspark выполняет ленивую оценку, он не будет загружать данные мгновенно.
import boto3 s3 = boto3.resource('s3') my_bucket = s3.Bucket('bucketname') dfs_list = [] for file_object in my_bucket.objects.filter(Prefix="folder1/"): df = spark.read.parquet('s3a://' file_object.name).withColumn("modified_date", file_object.last_modified) dfs_list.append(df)
- Теперь возьмите объединение всех dfs с помощью
unionAll
функции pyspark, а затем отсортируйте данные в соответствии сmodified_date
.
from functools import reduce from pyspark.sql import DataFrame df_combined = reduce(DataFrame.unionAll, dfs_list) df_combined = df_combined.orderBy('modified_date')