#apache-flink #hadoop-yarn #amazon-emr #flink-streaming
#apache-flink #hadoop-yarn #amazon-emr #flink-потоковая передача
Вопрос:
Мы запускаем flink на yarn. Мы проводили тестирование аварийного восстановления, и в рамках этого мы вручную завершили работу одного из узлов, на котором было запущено приложение flink. Как только экземпляр был восстановлен, приложение предприняло несколько попыток, и каждая попытка выдавала следующую ошибку :
AM Container for appattempt_1602902099413_0006_000027 exited with exitCode: -1000
Failing this attempt.Diagnostics: Could not obtain block: BP-986419965-xx.xx.xx.xx-1602902058651:blk_1073743332_2508
file=/user/hadoop/.flink/application_1602902099413_0006/application_1602902099413_0006-flink-conf.yaml1528536851005494481.tmp
org.apache.hadoop.hdfs.BlockMissingException:
Could not obtain block: BP-986419965-10.61.71.85-1602902058651:blk_1073743332_2508 file=/user/hadoop/.flink/application_1602902099413_0006/application_1602902099413_0006-flink-conf.yaml1528536851005494481.tmp
at org.apache.hadoop.hdfs.DFSInputStream.refetchLocations(DFSInputStream.java:1053)at
org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInputStream.java:1036)at
org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInputStream.java:1015)at
org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:647)at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:926)at
org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:982)at
java.io.DataInputStream.read(DataInputStream.java:100)at
org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:90)at
org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:64)at
org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:125)at
org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:369)at
org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:267)at
org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)at
org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)at
org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)at
java.security.AccessController.doPrivileged(Native Method)at
javax.security.auth.Subject.doAs(Subject.java:422)at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)at
org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:359)at
org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62)at
java.util.concurrent.FutureTask.run(FutureTask.java:266)at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at
java.util.concurrent.FutureTask.run(FutureTask.java:266)at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at
java.lang.Thread.run(Thread.java:748)For
more detailed output, check the application tracking page: http://<>.compute.internal:8088/cluster/app/application_1602902099413_0006 Then click on links to logs of each attempt.
Может ли кто-нибудь сообщить нам, какой контент хранится в HDFS и может ли это быть перенаправлено на S3?
Добавление параметров, связанных с контрольной точкой :
StateBackend rocksDbStateBackend = new RocksDBStateBackend("s3://Path", true);
streamExecutionEnvironment.setStateBackend(rocksDbStateBackend)
streamExecutionEnvironment.enableCheckpointing(10000);
streamExecutionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
streamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
streamExecutionEnvironment.getCheckpointConfig().setCheckpointTimeout(60000);
streamExecutionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(60000);
streamExecutionEnvironment.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
streamExecutionEnvironment.getCheckpointConfig().setPreferCheckpointForRecovery(true);
Комментарии:
1. Не могли бы вы указать свои настройки контрольных точек?
2. @MikalaiLushchytski, я добавил настройки контрольных точек.
Ответ №1:
У меня была такая же проблема,
Я исправил настройку для hdfs-сайта
{
"Classification": "hdfs-site",
"Properties": {
"dfs.client.use.datanode.hostname": "true",
"dfs.replication": "2",
"dfs.namenode.replication.min": "2",
"dfs.namenode.maintenance.replication.min": "2"
}
}
Я думаю, что hdfs потерял данные на завершенном узле, поэтому я реплицирую данные на нескольких узлах.