AWS Glue назначил все задачи одному и тому же работнику

#aws-glue #aws-glue-data-catalog #aws-glue-spark #aws-glue-connection

Вопрос:

У меня есть работа с клеем AWS, работа которой очень проста: разбейте большие файлы CSV gzip на 1 ГБ.

В своем тесте я загрузил в корзину 4 файла, каждый из которых составляет около 5 ГБ. Тем не менее, задание всегда назначает все файлы одному работнику, а не распределяет их между всеми работниками.

Журнал активных работников:

 [Executor task launch worker for task 3] s3n.S3NativeFileSystem (S3NativeFileSystem.java:open(1323)): Opening 's3://input/IN-4.gz' for reading
[Executor task launch worker for task 0] s3n.S3NativeFileSystem (S3NativeFileSystem.java:open(1323)): Opening 's3://input/IN-1.gz' for reading
[Executor task launch worker for task 2] s3n.S3NativeFileSystem (S3NativeFileSystem.java:open(1323)): Opening 's3://input/IN-3.gz' for reading
[Executor task launch worker for task 1] s3n.S3NativeFileSystem (S3NativeFileSystem.java:open(1323)): Opening 's3://input/IN-2.gz' for reading
[Executor task launch worker for task 0] zlib.ZlibFactory (ZlibFactory.java:<clinit>(49)): Successfully loaded amp; initialized native-zlib librar
 

Один из отдыхающих рабочих заходит в журнал:

 storage.BlockManager (Logging.scala:logInfo(54)): Initialized BlockManager: BlockManagerId(3, 172.31.0.109, 35849, None)
 

Остальные работники застряли на этом шаге и ждут бесконечно, и все файлы объемом 20 ГБ назначены одной активной задаче

Его сценарий работы приведен ниже:

 import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "crawled-database", table_name = "input", transformation_ctx = "datasource0", additional_options = {"groupFiles": "inPartition", "compressionType": "gzip"})

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [ ("tagids", "string", "internal_tagids", "string"), ("channel", "long", "internal_channel", "long")], transformation_ctx = "applymapping1")

datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://glue-report-staging", "groupFiles": "inPartition", "groupSize": "1073741824", "compression": "gzip"}, format = "csv", transformation_ctx = "datasink2")
job.commit()
 

Ответ №1:

Формат сжатия, используемый в ваших файлах, является причиной высокой производительности и перекоса в параллелизме. Сжатые файлы не подлежат разделению. Именно по этой причине вся работа поручается одному исполнителю. Если бы вы читали это в чистой spark, вы могли бы прочитать как rdd и разделить на более мелкие задачи/ разделы.

Динамический фрейм также имеет ту же опцию. Вы можете использовать datasource0.repartition(n) , n — желаемое количество разделов. Для G. 1X у вас может быть 8 параллельных задач на одного работника, поэтому вы можете определить их размер в соответствии с рабочими, которые должны максимизировать параллелизм. Вы можете найти больше об этом здесь