#apache-flink
#apache-flink
Вопрос:
Недавно я перенес наш Flink из 1.9.0
1.11.1
кластера заданий без HA. Я сталкиваюсь со следующей ошибкой, которая приводит JobManager
к сбою через каждые 5 минут, и задания Flink застревают в этом цикле перезапуска через AWS ECS.
Раньше он работал в Flink 1.9.0 после обновления 1.11.1
, но не работает. Поскольку у меня нет JM HA, я генерирую fixed --job-id
для каждого задания flink вместо идентификатора по умолчанию 00000000000. Я новичок в Flink.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the pending checkpoint
5.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:837)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756)
at org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 's3://data/flink/checkpoints/<unique_job_id>/chk-5/_metadata'
already exists
Комментарии:
1. Не могли бы вы уточнить
<unique_job_id>
, действительно ли это две папки? Странно, что нет идентификатора попытки.2. @ArvidHeise Нет уникального идентификатора задания — поддерживается Flink UUID без тире
3. Примечание: здесь было слишком много форматирования кода. Названия частей программного обеспечения следует рассматривать как имена собственные, и поэтому они пишутся с большой буквы. Вот и все — нет выделения жирным шрифтом, форматирования кода и т. Д.
Ответ №1:
Проблема, похоже, в том, что вы повторно используете один и тот же идентификатор задания при нескольких запусках, и это приводит к коллизиям. Если вы не используете HA, вы всегда должны генерировать уникальный идентификатор задания для каждого запуска / отправки задания. Самый простой способ сделать это — сгенерировать случайный идентификатор. Вам нужно только исправить идентификатор задания, если вы хотите восстановить выполнение задания из состояния, сохраненного в хранилищах HA.
Комментарии:
1. Если я использую случайный идентификатор задания, то в случае сбоя и перезапуска я теряю контрольные точки, как он восстанавливает и восстанавливает смещения кафки? Вот почему я использую фиксированный идентификатор задания, чтобы в случае сбоя он мог восстанавливать / считывать контрольные точки из той же папки с идентификатором задания. Я мог видеть это в пользовательском интерфейсе Flink. Если я использую случайный идентификатор задания, он никогда не восстанавливает смещения из контрольных точек. Пожалуйста, руководство.
2. Вы хотите сказать, что мы не можем восстановить смещения с контрольных точек без HA?
3. Да, если вы хотите терпеть сбои JobManager, тогда вам нужно включить HA. Без HA вы можете восстанавливаться только после сбоев задачи или сбоев TaskManager. Но если JobManager перезапущен, вы не сможете восстановить задание.
4. OK без JM HA с фиксированным идентификатором задания при нескольких запусках, которые раньше отлично работали для меня в Flink 1.9.0, включая перезапуск и восстановление JM. Он перестал работать после обновления Flink 1.11.1. Будет ли это работать в Flink 1.11.2? Это потому, что изменения, связанные с уведомлением о завершении контрольных точек, внесены в 1.11.1 github.com/apache/flink/commit /… ?
5. Не уверен, почему он работал с Flink 1.9.0, но это никогда официально не было функцией. Следовательно, я не думаю, что он изменится между 1.11.1 и 1.11.2.