#apache-flink #flink-streaming
#apache-flink #flink-потоковая передача
Вопрос:
Я внедряю инкрементные контрольные точки, используя RocksDB в качестве statebackend в моем коде flink, но я хочу знать, происходят ли инкрементные контрольные точки, что я имел в виду, есть ли способ проверить журналы или панель мониторинга flink, выполняет ли она инкрементные контрольные точки или полные контрольные точки
- Я использую flink версии 1.10.0 в соответствии с документацией flink, я увидел, что механизм ведения журнала отключен в версии Flink 1.10.0. я перешел по этой ссылке Ververica, чтобы включить ведение журнала RocksDB. ниже приведен код для включения ведения журнала, который я использовал
import static org.apache.flink.configuration.ConfigOptions.key;
import java.util.Collection;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory;
import org.rocksdb.DBOptions;
import org.rocksdb.InfoLogLevel;
public class DefaultConfigurableOptionsFactoryWithLog extends DefaultConfigurableOptionsFactory {
private static final long serialVersionUID = 1L;
private String dbLogDir = "";
@Override
public DBOptions createDBOptions(DBOptions currentOptions,
Collection<AutoCloseable> handlesToClose) {
currentOptions = super.createDBOptions(currentOptions, handlesToClose);
currentOptions.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
currentOptions.setStatsDumpPeriodSec(60);
currentOptions.setDbLogDir(dbLogDir);
return currentOptions;
}
@Override
public String toString() {
return this.getClass().toString() "{" super.toString() '}';
}
/**
* Set directory where RocksDB writes its info LOG file (empty = data dir, otherwise the
* data directory's absolute path will be used as the log file prefix).
*/
public void setDbLogDir(String dbLogDir) {
this.dbLogDir = dbLogDir;
}
public static final ConfigOption<String> LOG_DIR =
key("state.backend.rocksdb.log.dir")
.stringType()
.noDefaultValue()
.withDescription("Location of RocksDB's info LOG file (empty = data dir, otherwise the "
"data directory's absolute path will be used as the log file prefix)");
@Override
public DefaultConfigurableOptionsFactory configure(Configuration configuration) {
DefaultConfigurableOptionsFactory optionsFactory =
super.configure(configuration);
this.dbLogDir = configuration.getOptional(LOG_DIR).orElse(this.dbLogDir);
return optionsFactory;
}
Я сделал следующие настройки в своем коде, чтобы включить ведение журнала
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(interval);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
RocksDBStateBackend stateBackend = new RocksDBStateBackend(incrementalCheckpointPath,true);
DefaultConfigurableOptionsFactoryWithLog options = new DefaultConfigurableOptionsFactoryWithLog();
options.setDbLogDir("file:///mnt/flink/storage/rocksdb/logging/");
env.setStateBackend(stateBackend);
stateBackend.setRocksDBOptions(options);
Я добавил ниже 2 параметра в свой конфигурационный файл flink, чтобы включить ведение журнала RocksDB
state.backend.rocksdb.log.dir: "file:///mnt/flink/storage/rocksdb/logging/"
state.backend.rocksdb.options-factory: com.myflinkcode.common.config.DefaultConfigurableOptionsFactoryWithLog
Я прошел полную панель инструментов flink, но я не получил ни малейшего представления о том, как проверить, происходит ли инкрементная контрольная точка или происходит полная контрольная точка. Пожалуйста, помогите мне, как я могу настроить ведение журнала для RocksDB, чтобы узнать, происходят ли инкрементные контрольные точки или нет. Я видел в документации ведение журнала RocksDB приведет к огромным затратам на производительность, а также на хранение, это для целей тестирования, после этого я отключу это
Ответ №1:
Я не уверен, что эта информация регистрируется или отображается где-либо, но в вашем коде вы могли бы использовать
stateBackend.isIncrementalCheckpointsEnabled()
чтобы определить, включены ли в вашем бэкэнде состояния RocksDB контрольные точки, а затем самостоятельно зарегистрируйте эту информацию.
Обратите внимание, что для включения инкрементных контрольных точек (которые по умолчанию отключены) вам необходимо настроить
state.backend.incremental: true