#apache-spark #kubernetes
#apache-spark #kubernetes
Вопрос:
Я запускаю задание Spark в kubernetes, и при больших объемах данных я часто получаю сообщение «Исполнитель потерян», а исполнители уничтожаются, и задания завершаются сбоем. Я уже сделал kubectl logs -f
для всех запущенных модулей-исполнителей, но я никогда не вижу, чтобы выдавалось какое-либо исключение (я бы ожидал чего-то вроде OutOfMemoryError
или подобного). Модули просто внезапно прекращают вычисления, а затем удаляются напрямую, поэтому они даже не остаются в Error
состоянии, позволяющем копаться в них и устранять неполадки. Они просто исчезают.
Как мне следует устранить это? Мне кажется, что Kubernetes сам уничтожает модули, потому что я думаю, что они превышают некоторые границы, но, насколько я понимаю, модули тогда должны быть в Evicted
состоянии (или не должны?)
Похоже, это связано с использованием памяти, потому что, когда я включаюсь spark.executor.memory
, моя работа завершается (но тогда с гораздо меньшим количеством исполнителей, что приводит к гораздо более низкой скорости).
При запуске задания с local[*]
в качестве ведущего оно выполняется до завершения даже при гораздо меньших настройках памяти.
Продолжение 1
Я запустил задание только с одним исполнителем и выполнил kubectl logs -f
над модулем-исполнителем и посмотрел выходные данные драйвера (запущенного в клиентском режиме). Сначала в драйвере появляется сообщение «Исполнитель потерян», затем модуль-исполнитель просто завершает работу без каких-либо исключений или сообщений об ошибках вообще.
Продолжение 2
Когда исполнитель умирает, журнал драйвера выглядит следующим образом:
20/08/18 10:36:40 INFO TaskSchedulerImpl: Removed TaskSet 15.0, whose tasks have all completed, from pool
20/08/18 10:36:40 INFO TaskSetManager: Starting task 3.0 in stage 18.0 (TID 1554, 10.244.1.64, executor 1, partition 3, NODE_LOCAL, 7717 bytes)
20/08/18 10:36:40 INFO DAGScheduler: ShuffleMapStage 15 (parquet at DataTasks.scala:208) finished in 5.913 s
20/08/18 10:36:40 INFO DAGScheduler: looking for newly runnable stages
20/08/18 10:36:40 INFO DAGScheduler: running: Set(ShuffleMapStage 18)
20/08/18 10:36:40 INFO DAGScheduler: waiting: Set(ShuffleMapStage 20, ShuffleMapStage 21, ResultStage 22)
20/08/18 10:36:40 INFO DAGScheduler: failed: Set()
20/08/18 10:36:40 INFO BlockManagerInfo: Added broadcast_11_piece0 in memory on 10.244.1.64:43809 (size: 159.0 KiB, free: 2.2 GiB)
20/08/18 10:36:40 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 1 to 10.93.111.35:20221
20/08/18 10:36:41 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 3 to 10.93.111.35:20221
20/08/18 10:36:49 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Disabling executor 1.
20/08/18 10:36:49 INFO DAGScheduler: Executor lost: 1 (epoch 12)
20/08/18 10:36:49 INFO BlockManagerMasterEndpoint: Trying to remove executor 1 from BlockManagerMaster.
20/08/18 10:36:49 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(1, 10.244.1.64, 43809, None)
20/08/18 10:36:49 INFO BlockManagerMaster: Removed 1 successfully in removeExecutor
20/08/18 10:36:49 INFO DAGScheduler: Shuffle files lost for executor: 1 (epoch 12)
На исполнителе это выглядит следующим образом:
20/08/18 10:36:40 INFO Executor: Running task 3.0 in stage 18.0 (TID 1554)
20/08/18 10:36:40 INFO TorrentBroadcast: Started reading broadcast variable 11 with 1 pieces (estimated total size 4.0 MiB)
20/08/18 10:36:40 INFO MemoryStore: Block broadcast_11_piece0 stored as bytes in memory (estimated size 159.0 KiB, free 2.2 GiB)
20/08/18 10:36:40 INFO TorrentBroadcast: Reading broadcast variable 11 took 7 ms
20/08/18 10:36:40 INFO MemoryStore: Block broadcast_11 stored as values in memory (estimated size 457.3 KiB, free 2.2 GiB)
20/08/18 10:36:40 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 1, fetching them
20/08/18 10:36:40 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@node01.maas:34271)
20/08/18 10:36:40 INFO MapOutputTrackerWorker: Got the output locations
20/08/18 10:36:40 INFO ShuffleBlockFetcherIterator: Getting 30 (142.3 MiB) non-empty blocks including 30 (142.3 MiB) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
20/08/18 10:36:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
20/08/18 10:36:41 INFO CodeGenerator: Code generated in 3.082897 ms
20/08/18 10:36:41 INFO CodeGenerator: Code generated in 5.132359 ms
20/08/18 10:36:41 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 3, fetching them
20/08/18 10:36:41 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@node01.maas:34271)
20/08/18 10:36:41 INFO MapOutputTrackerWorker: Got the output locations
20/08/18 10:36:41 INFO ShuffleBlockFetcherIterator: Getting 0 (0.0 B) non-empty blocks including 0 (0.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) remote blocks
20/08/18 10:36:41 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
20/08/18 10:36:41 INFO CodeGenerator: Code generated in 6.770762 ms
20/08/18 10:36:41 INFO CodeGenerator: Code generated in 3.150645 ms
20/08/18 10:36:41 INFO CodeGenerator: Code generated in 2.81799 ms
20/08/18 10:36:41 INFO CodeGenerator: Code generated in 2.989827 ms
20/08/18 10:36:41 INFO CodeGenerator: Code generated in 3.024777 ms
20/08/18 10:36:41 INFO CodeGenerator: Code generated in 4.32011 ms
Затем исполнитель завершает работу.
Странная вещь: этап 18.0 начинается с задачи 3.0, а не с 1.0
Продолжение 3
Теперь я изменил уровень журнала исполнителя на DEBUG
и заметил кое-что интересное, за секунду до завершения работы исполнителя:
20/08/18 14:19:26 DEBUG TaskMemoryManager: Task 1155 acquired 64.0 KiB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d
20/08/18 14:19:26 DEBUG TaskMemoryManager: Task 1155 acquired 64.0 KiB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@4ef2dc4a
20/08/18 14:19:26 DEBUG TaskMemoryManager: Task 1155 release 64.0 KiB from org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@4ef2dc4a
20/08/18 14:19:26 DEBUG TaskMemoryManager: Task 1155 acquired 64.0 MiB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d
20/08/18 14:19:26 DEBUG TaskMemoryManager: Task 1155 acquired 128.0 KiB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d 20/08/18 14:19:26 DEBUG TaskMemoryManager: Task 1155 release 64.0 KiB from org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d
20/08/18 14:19:26 DEBUG TaskMemoryManager: Task 1155 acquired 256.0 KiB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d 20/08/18 14:19:26 DEBUG TaskMemoryManager: Task 1155 release 128.0 KiB from org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d 20/08/18 14:19:26 DEBUG TaskMemoryManager: Task 1155 acquired 512.0 KiB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d 20/08/18 14:19:26 DEBUG TaskMemoryManager: Task 1155 release 256.0 KiB from org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d 20/08/18 14:19:26 DEBUG TaskMemoryManager: Task 1155 acquired 1024.0 KiB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d20/08/18 14:19:26 DEBUG TaskMemoryManager: Task 1155 release 512.0 KiB from org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d 20/08/18 14:19:26 DEBUG TaskMemoryManager: Task 1155 acquired 2.0 MiB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d
20/08/18 14:19:26 DEBUG TaskMemoryManager: Task 1155 release 1024.0 KiB from org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d20/08/18 14:19:26 DEBUG TaskMemoryManager: Task 1155 acquired 4.0 MiB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d
20/08/18 14:19:26 DEBUG TaskMemoryManager: Task 1155 release 2.0 MiB from org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d
20/08/18 14:19:27 DEBUG TaskMemoryManager: Task 1155 acquired 8.0 MiB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d
20/08/18 14:19:27 DEBUG TaskMemoryManager: Task 1155 release 4.0 MiB from org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d
20/08/18 14:19:27 DEBUG TaskMemoryManager: Task 1155 acquired 16.0 MiB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d
20/08/18 14:19:27 DEBUG TaskMemoryManager: Task 1155 release 8.0 MiB from org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d
20/08/18 14:19:27 DEBUG TaskMemoryManager: Task 1155 acquired 32.0 MiB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d
20/08/18 14:19:27 DEBUG TaskMemoryManager: Task 1155 release 16.0 MiB from org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d
20/08/18 14:19:29 DEBUG TaskMemoryManager: Task 1155 acquired 64.0 MiB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d
20/08/18 14:19:30 DEBUG TaskMemoryManager: Task 1155 acquired 64.0 MiB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d
20/08/18 14:19:30 DEBUG TaskMemoryManager: Task 1155 release 32.0 MiB from org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d
20/08/18 14:19:34 DEBUG TaskMemoryManager: Task 1155 acquired 64.0 MiB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d
20/08/18 14:19:34 DEBUG TaskMemoryManager: Task 1155 acquired 128.0 MiB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d 20/08/18 14:19:34 DEBUG TaskMemoryManager: Task 1155 release 64.0 MiB from org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d
20/08/18 14:19:36 DEBUG TaskMemoryManager: Task 1155 acquired 64.0 MiB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d
20/08/18 14:19:36 DEBUG TaskMemoryManager: Task 1155 acquired 64.0 MiB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d
20/08/18 14:19:37 DEBUG TaskMemoryManager: Task 1155 acquired 256.0 MiB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d 20/08/18 14:19:37 DEBUG TaskMemoryManager: Task 1155 release 128.0 MiB from org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d 20/08/18 14:19:37 DEBUG TaskMemoryManager: Task 1155 acquired 64.0 MiB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d
20/08/18 14:19:38 DEBUG TaskMemoryManager: Task 1155 acquired 64.0 MiB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d
20/08/18 14:19:38 DEBUG TaskMemoryManager: Task 1155 acquired 64.0 MiB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d
20/08/18 14:19:39 DEBUG TaskMemoryManager: Task 1155 acquired 64.0 MiB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d
20/08/18 14:19:39 DEBUG TaskMemoryManager: Task 1155 acquired 512.0 MiB for org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@5050038d
Я выделил исполнителю 4 ГБ памяти через spark.executor.memory
, и эти выделения в сумме составляют 1344 МБ. При 4 ГБ памяти и настройках разделения памяти по умолчанию 40% составляют 1400 МБ.
Могу ли я как-то ограничить объем памяти, который UnsafeExternalSorter
занимает?
Продолжение 4
Я столкнулся с редким случаем, когда по какой-то причине Spark не уничтожил «готовых» исполнителей, и я увидел, что модули были OOMKilled
. Похоже, что spark.executor.memory
задает как запрошенную память модуля, так И конфигурацию памяти в Spark executor.
Комментарии:
1. Я рекомендую вам запустить модуль описания kubectl в модуле-исполнителе, вы можете увидеть в описании причину, по которой он умер
2. Я не могу, потому что он удаляется, как только он умирает.
3. Вам удалось решить эту проблему?
Ответ №1:
Ответом было продолжение 4. Я снова запустил задание с помощью kubectl get pod -w
и увидел, что модули-исполнители получают OOMKilled
. Сейчас я запускаю с spark.kubernetes.memoryOverheadFactor=0.5
и spark.memory.fraction=0.2
, настраивая spark.executor.memory
так высоко, что едва запускается один исполнитель на узел, и я устанавливаю spark.executor.cores
количество ядер на узел минус 1. Таким образом, он запускается.
Я также изменил свой алгоритм, потому что у него был большой перекос раздела и приходилось выполнять некоторые вычисления, которые нелегко распараллелить, что вызывало МНОГО перетасовки.
Ответ №2:
Я рекомендую добавить -verbose:gc -XX: PrintGCDetails -XX: PrintGCTimeStamps
к параметрам Java. Чтобы вы могли видеть, как память используется в исполнителе. Вы можете передать этот параметр выше, добавив их в «spark.executor.defaultJavaOptions». Просмотрите журнал, и вы точно поймете, почему вы получаете OOM