Как сократить время, затрачиваемое на запись файлов parquet в s3 с помощью AWS Glue

#apache-spark #amazon-s3 #pyspark #aws-glue

#apache-spark #amazon-s3 #pyspark #aws-glue

Вопрос:

Я создаю задание на склеивание, которому необходимо обрабатывать ежедневный объем данных объемом 4 ТБ из s3 path — s3://<path>/<year>/<month>/<day>/<hour>/ . Поэтому я создал цикл, который считывает данные в spark df по часовым папкам (по 155 Гб каждая), фильтрует по определенным категориям и записывает обратно в s3 в виде файлов parquet, разделенных по отфильтрованным категориям ( s3://<path>/category=<category>/year=<year>/month=<month>/day=<day>/hour=<hour>/ ). Я использую 60 рабочих узлов G2.X, каждый из которых (8 vCPU, 32 ГБ памяти, 128 ГБ диска). Процесс записи в S3 происходит чрезвычайно медленно, для завершения работы требуется более 10 часов. Есть ли способ ускорить / оптимизировать запись в s3, помимо увеличения количества узлов?

 
def s3_load_job(input_list):

    hour, year, month, day = input_list
    logger.info(f"hour in s3 func {hour}")
    
    # get data from s3
    s3_path = f"s3://<path>/{year}/{month}/{day}/{hour}/"
    logger.info(f"print s3 path {s3_path}")

    #user defined library function that return spark df
    df = get_df_from_s3(glueContext, s3_path)

    df = df.withColumn('category', F.lower(F.col('category')))

    df_rep = df.where(F.col('category').isin({ "A", "B", "C","D"}))

    #write to s3
    datasink4 = DynamicFrame.fromDF(df_rep, glueContext, "datasink4")
    
    glueContext.write_dynamic_frame.from_options(frame = datasink4,
                                                             connection_type = "s3",
                                                             connection_options = 
                                                             {"path":"s3://<path>/"
                                           ,"partitionKeys"["category","year","month","day","hour"]}
                                                             ,format = "glueparquet" )



def main():
    
    year = '2020'
    month = '08'
    day = '01'
    hours = ["%.2d" % i for i in range(24)]

    input_list = [[hour, year, month, day] for hour in hours]
    logger.info(f"input_list {input_list}")

    for i in input_list:
        s3_load_job(i)
    
    job.commit()



if __name__ == "__main__":
    main()            
       

  

Комментарии:

1. Каков средний размер файла в destination? Это блоки размером более 256 МБ??

2. После фильтрации размер каждой категории для записи меняется (искажается): ‘A’ составляет около 600 ГБ, ‘B’ — 400 ГБ, а остальные — по 250 г каждая. Размер записываемых файлов назначения рассчитывается внутри glue (оптимизирован для использования всех ядер процессора), который, насколько я видел, составляет от 1 до 15 МБ каждый. Я не использую никакого объединения / перераспределения при записи динамического фрейма в s3, чтобы избежать ошибок памяти.

Ответ №1:

Похоже, вы, должно быть, нашли способ справиться с этим. Хотел бы поделиться тем, что сработало у меня. Я запускал задание glue каждый час, включал закладки для заданий, чтобы не обрабатывать повторно старые файлы. Убедитесь, что вы не создаете слишком много разделов, что не только увеличивает время выполнения, но и в случае, если вы захотите выполнить запрос через Athena, в долгосрочной перспективе ваши запросы могут затянуться. Сведите количество разделов к минимуму. При перераспределении ваша работа может потратить слишком много времени на перетасовку данных, что может увеличить время выполнения работы. Однако частые ежечасные запуски могли бы помочь. Поделитесь, пожалуйста, тем, что у вас сработало.

Ответ №2:

Если вы используете S3 (хранилище объектов), попробуйте установить следующие конфигурации:

 spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored -> true
mapreduce.fileoutputcommitter.algorithm.version -> 2
  

Комментарии:

1. Спасибо, Не могли бы вы, пожалуйста, объяснить, как приведенный выше код влияет на оптимизацию записи в s3?

2. Для хранилищ объектов, модель согласованности которых означает, что коммиты на основе переименования безопасны (например, S3), используйте алгоритм v.2 для повышения производительности. Это приводит к меньшему переименованию в конце задания, чем алгоритм v.1. Поскольку он по-прежнему использует rename() для фиксации файлов, его небезопасно использовать, когда хранилище объектов не имеет согласованных метаданных / списков. Коммиттер также может быть настроен на игнорирование сбоев при очистке временных файлов; это снижает риск того, что временная проблема с сетью перерастет в сбой задания. Поскольку хранение временных файлов может привести к большим расходам.

Ответ №3:

Вы можете попробовать следующее

  1. Не конвертируйте pyspark df в dynamicFrame, поскольку вы можете напрямую сохранить фрейм данных pyspark в s3.
  2. Поскольку вы получаете файлы размером от 1 МБ до 15 МБ КАЖДЫЙ, вам необходимо выполнить оптимизацию. Поэтому попробуйте перераспределить фрейм данных перед записью его в s3.

ЕСЛИ размер вашего раздела составляет 250 ГБ, то вам следует создать выходной файл размером не менее 256 МБ или, в случае G2.x, вы также можете создать файл размером 512 МБ каждый.

Для достижения этой цели вы можете сделать

Вы можете сгенерировать 500 файлов в каждом разделе следующим образом 500*512 = 250 GB

 df.repartition(500,partitionCol).write.partitionBy(partitionCol).parquet(path)
  

Комментарии:

1. спасибо, попробовал этот подход, но время выполнения задания фактически увеличилось. Я полагаю, что увеличение времени вызвано тем, что работники пытаются повторно перетасовать данные между узлами во время повторного разделения.

Ответ №4:

Попробуйте использовать коммиттеры Hadoop s3a: https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/committer_architecture.html

Настройка в Glue непростая, я написал процедуру здесь, если вы хотите: https://www.romainardiet.com/2022-06-21/aws-glue-and-s3a-committers /

Это будет иметь большое значение, поскольку запись данных больше не зависит от временного каталога s3 переименования, которые выполняются медленно на s3, особенно если вы пишете большие файлы.

Комментарии:

1. Работает ли это также только для Glue 3.0 или Glue 2.0?

2. Это может работать только на Glue 3, поскольку для доступа к этим коммиттерам требуется как минимум hadoop 3