EMR многих мертвых исполнителей

#amazon-web-services #scala #apache-spark #cpu #amazon-emr

#amazon-веб-сервисы #scala #apache-spark #процессор #amazon-emr

Вопрос:

Я пытаюсь выполнить свое приложение spark scala в кластере AWS EMR, создав пошаговое приложение Spark.

Мой кластер содержит 4 м3.xlarge

Я запускаю свое приложение с помощью этой команды :

spark-submit --deploy-mode cluster --class Main s3://mybucket/myjar_2.11-0.1.jar s3n://oc-mybucket/folder arg1 arg2

Мое приложение принимает 3 параметра, первый из которых — это папка.

К сожалению, после запуска приложения я вижу, что активен только один Исполнитель ( мастер), а у меня 3 исполнителя мертвы, поэтому все задачи работают только над первой. смотрите изображение

введите описание изображения здесь

Я перепробовал много способов активировать эти средства, но без какого-либо результата («spark.default.parallelism», «spark.executor.instances» и «spark.executor.cores»). Что я должен сделать , чтобы весь исполнитель был активен и обрабатывал данные ?

Кроме того, при взгляде на ганглии у меня процессор всегда ниже 35%, есть ли способ, чтобы процессор работал более чем на 75%?

Спасибо

UPDTAE

это содержимое stderr для мертвых исполнителей

 SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/mnt/yarn/usercache/hadoop/filecache/14/__spark_libs__3671437061469038073.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
20/08/15 23:28:56 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 14765@ip-172-31-39-255
20/08/15 23:28:56 INFO SignalUtils: Registered signal handler for TERM
20/08/15 23:28:56 INFO SignalUtils: Registered signal handler for HUP
20/08/15 23:28:56 INFO SignalUtils: Registered signal handler for INT
20/08/15 23:28:57 INFO SecurityManager: Changing view acls to: yarn,hadoop
20/08/15 23:28:57 INFO SecurityManager: Changing modify acls to: yarn,hadoop
20/08/15 23:28:57 INFO SecurityManager: Changing view acls groups to: 
20/08/15 23:28:57 INFO SecurityManager: Changing modify acls groups to: 
20/08/15 23:28:57 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(yarn, hadoop); groups with view permissions: Set(); users  with modify permissions: Set(yarn, hadoop); groups with modify permissions: Set()
20/08/15 23:28:58 INFO TransportClientFactory: Successfully created connection to ip-172-31-36-83.eu-west-1.compute.internal/172.31.36.83:37115 after 186 ms (0 ms spent in bootstraps)
20/08/15 23:28:58 INFO SecurityManager: Changing view acls to: yarn,hadoop
20/08/15 23:28:58 INFO SecurityManager: Changing modify acls to: yarn,hadoop
20/08/15 23:28:58 INFO SecurityManager: Changing view acls groups to: 
20/08/15 23:28:58 INFO SecurityManager: Changing modify acls groups to: 
20/08/15 23:28:58 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(yarn, hadoop); groups with view permissions: Set(); users  with modify permissions: Set(yarn, hadoop); groups with modify permissions: Set()
20/08/15 23:28:58 INFO TransportClientFactory: Successfully created connection to ip-172-31-36-83.eu-west-1.compute.internal/172.31.36.83:37115 after 2 ms (0 ms spent in bootstraps)
20/08/15 23:28:58 INFO DiskBlockManager: Created local directory at /mnt1/yarn/usercache/hadoop/appcache/application_1597532473783_0002/blockmgr-d0d258ba-4345-45d1-9279-f6a97b63f81c
20/08/15 23:28:58 INFO DiskBlockManager: Created local directory at /mnt/yarn/usercache/hadoop/appcache/application_1597532473783_0002/blockmgr-e7ae1e29-85fa-4df9-acf1-f9923f0664bc
20/08/15 23:28:58 INFO MemoryStore: MemoryStore started with capacity 2.6 GB
20/08/15 23:28:59 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@ip-172-31-36-83.eu-west-1.compute.internal:37115
20/08/15 23:28:59 INFO CoarseGrainedExecutorBackend: Successfully registered with driver
20/08/15 23:28:59 INFO Executor: Starting executor ID 3 on host ip-172-31-39-255.eu-west-1.compute.internal
20/08/15 23:28:59 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40501.
20/08/15 23:28:59 INFO NettyBlockTransferService: Server created on ip-172-31-39-255.eu-west-1.compute.internal:40501
20/08/15 23:28:59 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/08/15 23:29:00 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(3, ip-172-31-39-255.eu-west-1.compute.internal, 40501, None)
20/08/15 23:29:00 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(3, ip-172-31-39-255.eu-west-1.compute.internal, 40501, None)
20/08/15 23:29:00 INFO BlockManager: external shuffle service port = 7337
20/08/15 23:29:00 INFO BlockManager: Registering executor with local external shuffle service.
20/08/15 23:29:00 INFO TransportClientFactory: Successfully created connection to ip-172-31-39-255.eu-west-1.compute.internal/172.31.39.255:7337 after 20 ms (0 ms spent in bootstraps)
20/08/15 23:29:00 INFO BlockManager: Initialized BlockManager: BlockManagerId(3, ip-172-31-39-255.eu-west-1.compute.internal, 40501, None)
20/08/15 23:29:03 INFO CoarseGrainedExecutorBackend: eagerFSInit: Eagerly initialized FileSystem at s3://does/not/exist in 3363 ms
20/08/15 23:30:02 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
20/08/15 23:30:02 INFO DiskBlockManager: Shutdown hook called
20/08/15 23:30:02 INFO ShutdownHookManager: Shutdown hook called
  

эта проблема должна быть с памятью?

Комментарии:

1. Вы должны посмотреть на stderr мертвых исполнителей и посмотреть, есть ли какие-либо ошибки. Тогда, возможно, разместите здесь stacktrace .

2. @mazaneicha после проверки всех этих мертвых исполнителей я обнаружил, что они содержат ту же ошибку, содержимое обновлено

3. Не уверен, что Eagerly initialized FileSystem at s3://does/not/exist означает, но очевидно, что вашим исполнителям было приказано завершить работу через 1 минуту, время ожидания по умолчанию.

Ответ №1:

Вы не используете всех исполнителей по умолчанию с помощью spark-submit , вы можете указать количество исполнителей --num-executors , executor-core и executor-memory .

Например, чтобы увеличить количество исполнителей (которых по умолчанию 2)

 spark-submit --num-executors N   #where N is desired number of executors like 5,10,50
  

Смотрите пример в документах здесь

Если это не помогает или заменяет spark-submit, вы можете переопределить его spark.executor.instances в conf/spark-defaults.conf файле или подобном, чтобы вам не нужно было указывать это явно в командной строке

Для загрузки процессора вам следует изучить executor-core и executor-core и либо изменить их в spark-submit, либо conf. Надеюсь, увеличение количества ядер процессора увеличит использование.

Обновить:

Как указал @Lamanus, и я дважды проверил, что значение emr больше 4.4 spark.dynamicAllocation.enabled установлено на true , я предлагаю вам дважды проверить разделы ваших данных, поскольку при включенном динамическом распределении количество экземпляров executor зависит от количества разделов, которые варьируются в зависимости от стадии выполнения DAG. Кроме того, с помощью динамического распределения вы можете попробовать spark.dynamicAllocation.initialExecutors , spark.dynamicAllocation.maxExecutors spark.dynamicAllocation.maxExecutors управлять исполнителями.

Комментарии:

1. По умолчанию включена опция EMR динамического распределения и используется весь ресурс без какой-либо конкретной опции исполнителя.

2. @Lamanus, да, действительно, это true (emr-4.4.0 or greater) , спасибо, что указали, я обновил ответ

3. spark.dynamicAllocation.initialExecutors не работает, по-видимому, это просто указывает мастеру, сколько исполнителей должно быть там, но по-прежнему активен только один, а остальные мертвы

Ответ №2:

Возможно, это немного запоздало, но я нашел этот блог AWS Big Data полезным для обеспечения того, чтобы большая часть моего кластера использовалась и чтобы я мог достичь максимально возможного параллелизма.

https://aws.amazon.com/blogs/big-data/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/

Более конкретно:

Количество исполнителей на экземпляр = (общее количество виртуальных ядер на экземпляр — 1)/ spark.executors.cores

Общая память исполнителя = общая оперативная память на экземпляр / количество исполнителей на экземпляр

Затем вы можете контролировать количество параллельных задач на этапах с помощью spark.default.parallelism или repartitioning .