Flink — невозможно восстановить после завершения узла yarn

#apache-flink #hadoop-yarn #amazon-emr #flink-streaming

#apache-flink #hadoop-yarn #amazon-emr #flink-потоковая передача

Вопрос:

Мы запускаем flink на yarn. Мы проводили тестирование аварийного восстановления, и в рамках этого мы вручную завершили работу одного из узлов, на котором было запущено приложение flink. Как только экземпляр был восстановлен, приложение предприняло несколько попыток, и каждая попытка выдавала следующую ошибку :

 AM Container for appattempt_1602902099413_0006_000027 exited with exitCode: -1000
Failing this attempt.Diagnostics: Could not obtain block: BP-986419965-xx.xx.xx.xx-1602902058651:blk_1073743332_2508 
file=/user/hadoop/.flink/application_1602902099413_0006/application_1602902099413_0006-flink-conf.yaml1528536851005494481.tmp
org.apache.hadoop.hdfs.BlockMissingException:
 Could not obtain block: BP-986419965-10.61.71.85-1602902058651:blk_1073743332_2508 file=/user/hadoop/.flink/application_1602902099413_0006/application_1602902099413_0006-flink-conf.yaml1528536851005494481.tmp
 at org.apache.hadoop.hdfs.DFSInputStream.refetchLocations(DFSInputStream.java:1053)at
 org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInputStream.java:1036)at 
 org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInputStream.java:1015)at 
 org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:647)at 
 org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:926)at
 org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:982)at 
 java.io.DataInputStream.read(DataInputStream.java:100)at 
 org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:90)at 
 org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:64)at 
 org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:125)at
 org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:369)at 
 org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:267)at 
 org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)at 
 org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)at
 org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)at 
 java.security.AccessController.doPrivileged(Native Method)at 
 javax.security.auth.Subject.doAs(Subject.java:422)at 
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)at
 org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:359)at 
 org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62)at 
 java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at
 java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at 
 java.lang.Thread.run(Thread.java:748)For
 more detailed output, check the application tracking page: http://<>.compute.internal:8088/cluster/app/application_1602902099413_0006 Then click on links to logs of each attempt.
  

Может ли кто-нибудь сообщить нам, какой контент хранится в HDFS и может ли это быть перенаправлено на S3?

Добавление параметров, связанных с контрольной точкой :

 StateBackend rocksDbStateBackend = new RocksDBStateBackend("s3://Path", true);
streamExecutionEnvironment.setStateBackend(rocksDbStateBackend)
streamExecutionEnvironment.enableCheckpointing(10000);
streamExecutionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
streamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
streamExecutionEnvironment.getCheckpointConfig().setCheckpointTimeout(60000);
streamExecutionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(60000);
streamExecutionEnvironment.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
streamExecutionEnvironment.getCheckpointConfig().setPreferCheckpointForRecovery(true);
  

Комментарии:

1. Не могли бы вы указать свои настройки контрольных точек?

2. @MikalaiLushchytski, я добавил настройки контрольных точек.

Ответ №1:

У меня была такая же проблема,

Я исправил настройку для hdfs-сайта

 {
    "Classification": "hdfs-site",
    "Properties": {
        "dfs.client.use.datanode.hostname": "true",
        "dfs.replication": "2",
        "dfs.namenode.replication.min": "2",
        "dfs.namenode.maintenance.replication.min": "2"
    }
}
  

Я думаю, что hdfs потерял данные на завершенном узле, поэтому я реплицирую данные на нескольких узлах.