#apache-spark #apache-spark-standalone
#apache-spark #apache-spark-standalone
Вопрос:
Сколько исполнителей будет запущено для каждого рабочего узла в Spark? Могу ли я узнать математику, стоящую за этим?
например, у меня есть 6 рабочих узлов и 1 главный, и если я отправлю задание через spark-submit, сколько максимальное количество исполнителей будет запущено для задания?
Ответ №1:
чтобы откорректировать ответ @LiMuBei…
Во-первых, это то, что вы говорите
--num-executors 4
Если используется динамическое распределение, то это то, как оно решает за вас
Согласно этому документу (http://jerryshao.me/architecture/2015/08/22/spark-dynamic-allocation-investigation /),
Как Spark вычисляет максимальное количество исполнителей, которое требуется для выполнения ожидающих и выполняемых задач:
private def maxNumExecutorsNeeded(): Int = {
val numRunningOrPendingTasks = listener.totalPendingTasks listener.totalRunningTasks
(numRunningOrPendingTasks tasksPerExecutor - 1) / tasksPerExecutor
}
Если текущий номер исполнителя больше ожидаемого числа:
// The target number exceeds the number we actually need, so stop adding new
// executors and inform the cluster manager to cancel the extra pending requests
val oldNumExecutorsTarget = numExecutorsTarget
numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
numExecutorsToAdd = 1
// If the new target has not changed, avoid sending a message to the cluster manager
if (numExecutorsTarget < oldNumExecutorsTarget) {
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously "
s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
}
numExecutorsTarget - oldNumExecutorsTarget
Если текущий номер исполнителя больше желаемого, Spark уведомит менеджера кластера об отмене ожидающих запросов, поскольку они не нужны. Для тех, кто уже выделил исполнителей, они будут сокращены до разумного числа позже с помощью механизма тайм-аута.
Если текущий номер исполнителя не может удовлетворить желаемому числу:
val oldNumExecutorsTarget = numExecutorsTarget
// There's no point in wasting time ramping up to the number of executors we already have, so
// make sure our target is at least as much as our current allocation:
numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size)
// Boost our target with the number to add for this round:
numExecutorsTarget = numExecutorsToAdd
// Ensure that our target doesn't exceed what we need at the present moment:
numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded)
// Ensure that our target fits within configured bounds:
numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)
val delta = numExecutorsTarget - oldNumExecutorsTarget
// If our target has not changed, do not send a message
// to the cluster manager and reset our exponential growth
if (delta == 0) {
numExecutorsToAdd = 1
return 0
}
val addRequestAcknowledged = testing ||
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
if (addRequestAcknowledged) {
val executorsString = "executor" { if (delta > 1) "s" else "" }
logInfo(s"Requesting $delta new $executorsString because tasks are backlogged"
s" (new desired total will be $numExecutorsTarget)")
numExecutorsToAdd = if (delta == numExecutorsToAdd) {
numExecutorsToAdd * 2
} else {
1
}
delta
} else {
logWarning(
s"Unable to reach the cluster manager to request $numExecutorsTarget total executors!")
0
}
Ответ №2:
Два возможных ответа:
- Если вы укажете количество исполнителей при вызове
spark-submit
, вы должны получить запрашиваемую сумму--num-executors X
- Если вы не укажете, то по умолчанию Spark должен использовать динамическое распределение, которое при необходимости запустит больше исполнителей. В этом случае вы можете настроить поведение, например, максимальное количество исполнителей, см. http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation
Количество исполнителей на рабочий узел будет зависеть от доступных ресурсов.