#apache-spark #amazon-s3 #pyspark #aws-glue
#apache-spark #amazon-s3 #pyspark #aws-glue
Вопрос:
Я создаю задание на склеивание, которому необходимо обрабатывать ежедневный объем данных объемом 4 ТБ из s3 path — s3://<path>/<year>/<month>/<day>/<hour>/
. Поэтому я создал цикл, который считывает данные в spark df по часовым папкам (по 155 Гб каждая), фильтрует по определенным категориям и записывает обратно в s3 в виде файлов parquet, разделенных по отфильтрованным категориям ( s3://<path>/category=<category>/year=<year>/month=<month>/day=<day>/hour=<hour>/
). Я использую 60 рабочих узлов G2.X, каждый из которых (8 vCPU, 32 ГБ памяти, 128 ГБ диска). Процесс записи в S3 происходит чрезвычайно медленно, для завершения работы требуется более 10 часов. Есть ли способ ускорить / оптимизировать запись в s3, помимо увеличения количества узлов?
def s3_load_job(input_list):
hour, year, month, day = input_list
logger.info(f"hour in s3 func {hour}")
# get data from s3
s3_path = f"s3://<path>/{year}/{month}/{day}/{hour}/"
logger.info(f"print s3 path {s3_path}")
#user defined library function that return spark df
df = get_df_from_s3(glueContext, s3_path)
df = df.withColumn('category', F.lower(F.col('category')))
df_rep = df.where(F.col('category').isin({ "A", "B", "C","D"}))
#write to s3
datasink4 = DynamicFrame.fromDF(df_rep, glueContext, "datasink4")
glueContext.write_dynamic_frame.from_options(frame = datasink4,
connection_type = "s3",
connection_options =
{"path":"s3://<path>/"
,"partitionKeys"["category","year","month","day","hour"]}
,format = "glueparquet" )
def main():
year = '2020'
month = '08'
day = '01'
hours = ["%.2d" % i for i in range(24)]
input_list = [[hour, year, month, day] for hour in hours]
logger.info(f"input_list {input_list}")
for i in input_list:
s3_load_job(i)
job.commit()
if __name__ == "__main__":
main()
Комментарии:
1. Каков средний размер файла в destination? Это блоки размером более 256 МБ??
2. После фильтрации размер каждой категории для записи меняется (искажается): ‘A’ составляет около 600 ГБ, ‘B’ — 400 ГБ, а остальные — по 250 г каждая. Размер записываемых файлов назначения рассчитывается внутри glue (оптимизирован для использования всех ядер процессора), который, насколько я видел, составляет от 1 до 15 МБ каждый. Я не использую никакого объединения / перераспределения при записи динамического фрейма в s3, чтобы избежать ошибок памяти.
Ответ №1:
Похоже, вы, должно быть, нашли способ справиться с этим. Хотел бы поделиться тем, что сработало у меня. Я запускал задание glue каждый час, включал закладки для заданий, чтобы не обрабатывать повторно старые файлы. Убедитесь, что вы не создаете слишком много разделов, что не только увеличивает время выполнения, но и в случае, если вы захотите выполнить запрос через Athena, в долгосрочной перспективе ваши запросы могут затянуться. Сведите количество разделов к минимуму. При перераспределении ваша работа может потратить слишком много времени на перетасовку данных, что может увеличить время выполнения работы. Однако частые ежечасные запуски могли бы помочь. Поделитесь, пожалуйста, тем, что у вас сработало.
Ответ №2:
Если вы используете S3 (хранилище объектов), попробуйте установить следующие конфигурации:
spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored -> true
mapreduce.fileoutputcommitter.algorithm.version -> 2
Комментарии:
1. Спасибо, Не могли бы вы, пожалуйста, объяснить, как приведенный выше код влияет на оптимизацию записи в s3?
2. Для хранилищ объектов, модель согласованности которых означает, что коммиты на основе переименования безопасны (например, S3), используйте алгоритм v.2 для повышения производительности. Это приводит к меньшему переименованию в конце задания, чем алгоритм v.1. Поскольку он по-прежнему использует rename() для фиксации файлов, его небезопасно использовать, когда хранилище объектов не имеет согласованных метаданных / списков. Коммиттер также может быть настроен на игнорирование сбоев при очистке временных файлов; это снижает риск того, что временная проблема с сетью перерастет в сбой задания. Поскольку хранение временных файлов может привести к большим расходам.
Ответ №3:
Вы можете попробовать следующее
- Не конвертируйте pyspark df в dynamicFrame, поскольку вы можете напрямую сохранить фрейм данных pyspark в s3.
- Поскольку вы получаете файлы размером от 1 МБ до 15 МБ КАЖДЫЙ, вам необходимо выполнить оптимизацию. Поэтому попробуйте перераспределить фрейм данных перед записью его в s3.
ЕСЛИ размер вашего раздела составляет 250 ГБ, то вам следует создать выходной файл размером не менее 256 МБ или, в случае G2.x, вы также можете создать файл размером 512 МБ каждый.
Для достижения этой цели вы можете сделать
Вы можете сгенерировать 500 файлов в каждом разделе следующим образом 500*512 = 250 GB
df.repartition(500,partitionCol).write.partitionBy(partitionCol).parquet(path)
Комментарии:
1. спасибо, попробовал этот подход, но время выполнения задания фактически увеличилось. Я полагаю, что увеличение времени вызвано тем, что работники пытаются повторно перетасовать данные между узлами во время повторного разделения.
Ответ №4:
Попробуйте использовать коммиттеры Hadoop s3a: https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/committer_architecture.html
Настройка в Glue непростая, я написал процедуру здесь, если вы хотите: https://www.romainardiet.com/2022-06-21/aws-glue-and-s3a-committers /
Это будет иметь большое значение, поскольку запись данных больше не зависит от временного каталога s3 переименования, которые выполняются медленно на s3, особенно если вы пишете большие файлы.
Комментарии:
1. Работает ли это также только для Glue 3.0 или Glue 2.0?
2. Это может работать только на Glue 3, поскольку для доступа к этим коммиттерам требуется как минимум hadoop 3