#amazon-web-services #scala #apache-spark #cpu #amazon-emr
#amazon-веб-сервисы #scala #apache-spark #процессор #amazon-emr
Вопрос:
Я пытаюсь выполнить свое приложение spark scala в кластере AWS EMR, создав пошаговое приложение Spark.
Мой кластер содержит 4 м3.xlarge
Я запускаю свое приложение с помощью этой команды :
spark-submit --deploy-mode cluster --class Main s3://mybucket/myjar_2.11-0.1.jar s3n://oc-mybucket/folder arg1 arg2
Мое приложение принимает 3 параметра, первый из которых — это папка.
К сожалению, после запуска приложения я вижу, что активен только один Исполнитель ( мастер), а у меня 3 исполнителя мертвы, поэтому все задачи работают только над первой. смотрите изображение
Я перепробовал много способов активировать эти средства, но без какого-либо результата («spark.default.parallelism», «spark.executor.instances» и «spark.executor.cores»). Что я должен сделать , чтобы весь исполнитель был активен и обрабатывал данные ?
Кроме того, при взгляде на ганглии у меня процессор всегда ниже 35%, есть ли способ, чтобы процессор работал более чем на 75%?
Спасибо
UPDTAE
это содержимое stderr для мертвых исполнителей
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/mnt/yarn/usercache/hadoop/filecache/14/__spark_libs__3671437061469038073.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
20/08/15 23:28:56 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 14765@ip-172-31-39-255
20/08/15 23:28:56 INFO SignalUtils: Registered signal handler for TERM
20/08/15 23:28:56 INFO SignalUtils: Registered signal handler for HUP
20/08/15 23:28:56 INFO SignalUtils: Registered signal handler for INT
20/08/15 23:28:57 INFO SecurityManager: Changing view acls to: yarn,hadoop
20/08/15 23:28:57 INFO SecurityManager: Changing modify acls to: yarn,hadoop
20/08/15 23:28:57 INFO SecurityManager: Changing view acls groups to:
20/08/15 23:28:57 INFO SecurityManager: Changing modify acls groups to:
20/08/15 23:28:57 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, hadoop); groups with view permissions: Set(); users with modify permissions: Set(yarn, hadoop); groups with modify permissions: Set()
20/08/15 23:28:58 INFO TransportClientFactory: Successfully created connection to ip-172-31-36-83.eu-west-1.compute.internal/172.31.36.83:37115 after 186 ms (0 ms spent in bootstraps)
20/08/15 23:28:58 INFO SecurityManager: Changing view acls to: yarn,hadoop
20/08/15 23:28:58 INFO SecurityManager: Changing modify acls to: yarn,hadoop
20/08/15 23:28:58 INFO SecurityManager: Changing view acls groups to:
20/08/15 23:28:58 INFO SecurityManager: Changing modify acls groups to:
20/08/15 23:28:58 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, hadoop); groups with view permissions: Set(); users with modify permissions: Set(yarn, hadoop); groups with modify permissions: Set()
20/08/15 23:28:58 INFO TransportClientFactory: Successfully created connection to ip-172-31-36-83.eu-west-1.compute.internal/172.31.36.83:37115 after 2 ms (0 ms spent in bootstraps)
20/08/15 23:28:58 INFO DiskBlockManager: Created local directory at /mnt1/yarn/usercache/hadoop/appcache/application_1597532473783_0002/blockmgr-d0d258ba-4345-45d1-9279-f6a97b63f81c
20/08/15 23:28:58 INFO DiskBlockManager: Created local directory at /mnt/yarn/usercache/hadoop/appcache/application_1597532473783_0002/blockmgr-e7ae1e29-85fa-4df9-acf1-f9923f0664bc
20/08/15 23:28:58 INFO MemoryStore: MemoryStore started with capacity 2.6 GB
20/08/15 23:28:59 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@ip-172-31-36-83.eu-west-1.compute.internal:37115
20/08/15 23:28:59 INFO CoarseGrainedExecutorBackend: Successfully registered with driver
20/08/15 23:28:59 INFO Executor: Starting executor ID 3 on host ip-172-31-39-255.eu-west-1.compute.internal
20/08/15 23:28:59 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40501.
20/08/15 23:28:59 INFO NettyBlockTransferService: Server created on ip-172-31-39-255.eu-west-1.compute.internal:40501
20/08/15 23:28:59 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/08/15 23:29:00 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(3, ip-172-31-39-255.eu-west-1.compute.internal, 40501, None)
20/08/15 23:29:00 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(3, ip-172-31-39-255.eu-west-1.compute.internal, 40501, None)
20/08/15 23:29:00 INFO BlockManager: external shuffle service port = 7337
20/08/15 23:29:00 INFO BlockManager: Registering executor with local external shuffle service.
20/08/15 23:29:00 INFO TransportClientFactory: Successfully created connection to ip-172-31-39-255.eu-west-1.compute.internal/172.31.39.255:7337 after 20 ms (0 ms spent in bootstraps)
20/08/15 23:29:00 INFO BlockManager: Initialized BlockManager: BlockManagerId(3, ip-172-31-39-255.eu-west-1.compute.internal, 40501, None)
20/08/15 23:29:03 INFO CoarseGrainedExecutorBackend: eagerFSInit: Eagerly initialized FileSystem at s3://does/not/exist in 3363 ms
20/08/15 23:30:02 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
20/08/15 23:30:02 INFO DiskBlockManager: Shutdown hook called
20/08/15 23:30:02 INFO ShutdownHookManager: Shutdown hook called
эта проблема должна быть с памятью?
Комментарии:
1. Вы должны посмотреть на stderr мертвых исполнителей и посмотреть, есть ли какие-либо ошибки. Тогда, возможно, разместите здесь stacktrace .
2. @mazaneicha после проверки всех этих мертвых исполнителей я обнаружил, что они содержат ту же ошибку, содержимое обновлено
3. Не уверен, что
Eagerly initialized FileSystem at s3://does/not/exist
означает, но очевидно, что вашим исполнителям было приказано завершить работу через 1 минуту, время ожидания по умолчанию.
Ответ №1:
Вы не используете всех исполнителей по умолчанию с помощью spark-submit
, вы можете указать количество исполнителей --num-executors
, executor-core
и executor-memory
.
Например, чтобы увеличить количество исполнителей (которых по умолчанию 2)
spark-submit --num-executors N #where N is desired number of executors like 5,10,50
Смотрите пример в документах здесь
Если это не помогает или заменяет spark-submit, вы можете переопределить его spark.executor.instances
в conf/spark-defaults.conf
файле или подобном, чтобы вам не нужно было указывать это явно в командной строке
Для загрузки процессора вам следует изучить executor-core
и executor-core
и либо изменить их в spark-submit, либо conf. Надеюсь, увеличение количества ядер процессора увеличит использование.
Обновить:
Как указал @Lamanus, и я дважды проверил, что значение emr больше 4.4 spark.dynamicAllocation.enabled
установлено на true
, я предлагаю вам дважды проверить разделы ваших данных, поскольку при включенном динамическом распределении количество экземпляров executor зависит от количества разделов, которые варьируются в зависимости от стадии выполнения DAG. Кроме того, с помощью динамического распределения вы можете попробовать spark.dynamicAllocation.initialExecutors
, spark.dynamicAllocation.maxExecutors
spark.dynamicAllocation.maxExecutors
управлять исполнителями.
Комментарии:
1. По умолчанию включена опция EMR динамического распределения и используется весь ресурс без какой-либо конкретной опции исполнителя.
2. @Lamanus, да, действительно, это
true (emr-4.4.0 or greater)
, спасибо, что указали, я обновил ответ3.
spark.dynamicAllocation.initialExecutors
не работает, по-видимому, это просто указывает мастеру, сколько исполнителей должно быть там, но по-прежнему активен только один, а остальные мертвы
Ответ №2:
Возможно, это немного запоздало, но я нашел этот блог AWS Big Data полезным для обеспечения того, чтобы большая часть моего кластера использовалась и чтобы я мог достичь максимально возможного параллелизма.
Более конкретно:
Количество исполнителей на экземпляр = (общее количество виртуальных ядер на экземпляр — 1)/ spark.executors.cores
Общая память исполнителя = общая оперативная память на экземпляр / количество исполнителей на экземпляр
Затем вы можете контролировать количество параллельных задач на этапах с помощью spark.default.parallelism
или repartitioning
.