Диспетчер Задач Flink Внезапно Разбился

#apache-flink #flink-streaming

#апач-флинк #мерцание-потоковое

Вопрос:

Flink TM внезапно разбился после 3 месяцев работы с приведенной ниже трассировкой стека ошибок.

 2021-12-05 07:22:05,369 WARN org.apache.flink.runtime.taskmanager.Task [] - Task 'GlobalWindowAggregate(groupBy=[org, $f4], window=[HOP(slice_end=[$slice_end], size=[15 min], slide=[1 min])], select=[org, $f4, COUNT(distinct$0 count$0) AS $f2, COUNT(count1$1) AS window_start, start('w$) AS window_end]) -gt; Calc(select=[window_start, window_end, org, $f4, $f2 AS $f4_0]) (1/24)#6' did not react to cancelling signal for 30 seconds, but is stuck in method:  org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296) org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507) org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494) org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460) org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182) org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110) org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156) org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399) org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200) org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182) org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110) org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156) org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:95) org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95) org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) org.apache.flink.streaming.runtime.tasks.StreamTask$Lambda$615/1465249724.runDefaultAction(Unknown Source) org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) org.apache.flink.streaming.runtime.tasks.StreamTask$Lambda$1480/994476387.run(Unknown Source) org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) java.lang.Thread.run(Thread.java:748)  2021-12-05 07:22:05,370 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:7, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=2.0000000000000000, taskHeapMemory=2.656gb (2852126690 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.875gb (2013265950 bytes), networkMemory=128.000mb (134217728 bytes)}, allocationId: 2b2d5beb481130d88a1eaaa0d3be2f7d, jobId: a5ed6a11efac85d315195eb9e7534316). 2021-12-05 07:22:05,370 INFO org.apache.flink.runtime.taskmanager.Task [] - Attempting to fail task externally GlobalWindowAggregate(groupBy=[org, $f4], window=[HOP(slice_end=[$slice_end], size=[15 min], slide=[1 min])], select=[org, $f4, COUNT(distinct$0 count$0) AS $f2, COUNT(count1$1) AS window_start, start('w$) AS window_end]) -gt; Calc(select=[window_start, window_end, org, $f4, $f2 AS $f4_0]) (1/24)#6 (5e34a8de7bcff882f37c073f250c2594). 2021-12-05 07:22:05,370 INFO org.apache.flink.runtime.taskmanager.Task [] - Task GlobalWindowAggregate(groupBy=[org, $f4], window=[HOP(slice_end=[$slice_end], size=[15 min], slide=[1 min])], select=[org, $f4, COUNT(distinct$0 count$0) AS $f2, COUNT(count1$1) AS window_start, start('w$) AS window_end]) -gt; Calc(select=[window_start, window_end, org, $f4, $f2 AS $f4_0]) (1/24)#6 is already in state CANCELING 2021-12-05 07:22:05,372 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:7, state:RELEASING, resource profile: ResourceProfile{cpuCores=2.0000000000000000, taskHeapMemory=2.656gb (2852126690 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.875gb (2013265950 bytes), networkMemory=128.000mb (134217728 bytes)}, allocationId: 2b2d5beb481130d88a1eaaa0d3be2f7d, jobId: a5ed6a11efac85d315195eb9e7534316). 2021-12-05 07:22:15,362 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Terminating TaskManagerRunner with exit code 1. org.apache.flink.util.FlinkException: Unexpected failure during runtime of TaskManagerRunner.  at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:382) ~[flink-dist_2.12-1.13.1.jar:1.13.1]  at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerProcessSecurely$3(TaskManagerRunner.java:413) ~[flink-dist_2.12-1.13.1.jar:1.13.1]  at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) ~[flink-dist_2.12-1.13.1.jar:1.13.1]  at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:413) [flink-dist_2.12-1.13.1.jar:1.13.1]  at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:396) [flink-dist_2.12-1.13.1.jar:1.13.1]  at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:354) [flink-dist_2.12-1.13.1.jar:1.13.1] Caused by: java.util.concurrent.TimeoutException  at org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255) ~[flink-dist_2.12-1.13.1.jar:1.13.1]  at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217) ~[flink-dist_2.12-1.13.1.jar:1.13.1]  at org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582) ~[flink-dist_2.12-1.13.1.jar:1.13.1]  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_232]  at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_232]  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_232]  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_232]  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_232]  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_232]  at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_232] 2021-12-05 07:22:15,365 INFO org.apache.flink.runtime.blob.TransientBlobCache [] - Shutting down BLOB cache 2021-12-05 07:22:15,365 INFO org.apache.flink.runtime.blob.PermanentBlobCache [] - Shutting down BLOB cache 2021-12-05 07:22:15,365 INFO org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - Shutting down TaskExecutorLocalStateStoresManager. 2021-12-05 07:22:15,365 INFO org.apache.flink.runtime.filecache.FileCache [] - removed file cache directory /tmp/flink-dist-cache-9fad861a-b657-4625-a184-db126c423c2f  

Во время отладки я обнаружил, что использование буфера ввода и вывода достигло 100% на панели мониторинга datadog.введите описание изображения здесь введите описание изображения здесь

Также выяснилось, что последние 2 контрольные точки потерпели неудачу Checkpoint expired before completing. , время ожидания контрольной точки сообщения составляет 2 минуты. Как я могу исправить эту проблему. введите описание изображения здесь

Комментарии:

1. Каковы ваши свойства контрольной env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) env.getCheckpointConfig.setTolerableCheckpointFailureNumber(1) точки ?

2. Максимальная одновременная контрольная точка равна 1, а допустимые контрольные точки также равны 1

3. Может быть, в этом твоя проблема ? Вы пишете о 2 контрольных точках, которые потерпели неудачу, когда сбой контрольных точек номер 1. Другой вопрос, почему они потерпели неудачу …

4. обе контрольные точки вышли из строя с ошибкой: контрольная точка не завершена в требуемое время

Ответ №1:

Тайм-ауты контрольных точек, как правило, вызваны либо

  1. противодавление, приводящее к слишком медленному перемещению барьеров контрольных точек по графику выполнения, или
  2. какое-то узкое место, мешающее Flink достаточно быстро записывать данные в хранилище контрольных точек (например, нехватка сети, недостаточная квота ввода-вывода)

Похоже, вы используете несогласованные контрольные точки. Это должно помочь с пунктом № 1 выше, но может привести к тому, что пункт № 2 станет проблемой, поскольку несогласованные контрольные точки увеличивают объем проверяемых данных (в вашем случае, похоже, примерно на 1 ГБ).

Возможно, вы просто захотите увеличить время ожидания контрольной точки. Наличие тайм-аута контрольных точек почти никогда не помогает.

Но также похоже, что у вас значительное противодавление. Выяснить, что является причиной этого, и что-то с этим сделать, должно помочь. (Если вы можете перейти на Flink 1.13 (или более позднюю версию), улучшенный мониторинг противодавления облегчит это.) Возможно, у вас перекос в данных, или, возможно, вам нужно увеличить масштаб кластера.

Комментарии:

1. Я уже проверил показатели противодавления, в тот момент никакого противодавления не было, только контрольные точки выходят из строя с ошибкой: checkpoint not completed in require time

2. Я думаю, что в период, когда выходные буферы использовались на 100%, было противодавление.

3. Так что же мне делать, увеличить буферную память или время ожидания контрольной точки?

4. Увеличение буферов ухудшит ситуацию; не делайте этого. Я предполагаю, что у вас было редкое событие, которое вызвало значительное временное противодавление и помешало своевременному завершению проверки. Я бы увеличил время ожидания. Если это произойдет снова, возможно, вам захочется выяснить причину противодавления и определить, можно ли это предотвратить.

5. Кроме того, Дэвид, я не понимаю, почему это убивает диспетчер задач, потому что после перезагрузки он работает плавно, и задержка также была восстановлена в кратчайшие сроки, не должен ли диспетчер задач перезапускаться вместо выхода