Как проверить инкрементные контрольные точки, работающие в Apache flink?

#apache-flink #flink-streaming

#apache-flink #flink-потоковая передача

Вопрос:

Я внедряю инкрементные контрольные точки, используя RocksDB в качестве statebackend в моем коде flink, но я хочу знать, происходят ли инкрементные контрольные точки, что я имел в виду, есть ли способ проверить журналы или панель мониторинга flink, выполняет ли она инкрементные контрольные точки или полные контрольные точки

  1. Я использую flink версии 1.10.0 в соответствии с документацией flink, я увидел, что механизм ведения журнала отключен в версии Flink 1.10.0. я перешел по этой ссылке Ververica, чтобы включить ведение журнала RocksDB. ниже приведен код для включения ведения журнала, который я использовал

     import static org.apache.flink.configuration.ConfigOptions.key;
    
    import java.util.Collection;
    import org.apache.flink.configuration.ConfigOption;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory;
    import org.rocksdb.DBOptions;
    import org.rocksdb.InfoLogLevel;
    
    public class DefaultConfigurableOptionsFactoryWithLog extends DefaultConfigurableOptionsFactory {
        private static final long serialVersionUID = 1L;
    
        private String dbLogDir = "";
    
        @Override
        public DBOptions createDBOptions(DBOptions currentOptions,
                                         Collection<AutoCloseable> handlesToClose) {
            currentOptions = super.createDBOptions(currentOptions, handlesToClose);
    
            currentOptions.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
            currentOptions.setStatsDumpPeriodSec(60);
            currentOptions.setDbLogDir(dbLogDir);
    
            return currentOptions;
        }
    
        @Override
        public String toString() {
            return this.getClass().toString()   "{"   super.toString()   '}';
        }
    
        /**
         * Set directory where RocksDB writes its info LOG file (empty = data dir, otherwise the
         * data directory's absolute path will be used as the log file prefix).
         */
        public void setDbLogDir(String dbLogDir) {
            this.dbLogDir = dbLogDir;
        }
    
        public static final ConfigOption<String> LOG_DIR =
                key("state.backend.rocksdb.log.dir")
                        .stringType()
                        .noDefaultValue()
                        .withDescription("Location of RocksDB's info LOG file (empty = data dir, otherwise the "  
                                "data directory's absolute path will be used as the log file prefix)");
    
        @Override
        public DefaultConfigurableOptionsFactory configure(Configuration configuration) {
            DefaultConfigurableOptionsFactory optionsFactory =
                    super.configure(configuration);
    
            this.dbLogDir = configuration.getOptional(LOG_DIR).orElse(this.dbLogDir);
    
            return optionsFactory;
        }
 

Я сделал следующие настройки в своем коде, чтобы включить ведение журнала

     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    env.enableCheckpointing(interval);
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    
    RocksDBStateBackend stateBackend = new RocksDBStateBackend(incrementalCheckpointPath,true);
    
    DefaultConfigurableOptionsFactoryWithLog options = new DefaultConfigurableOptionsFactoryWithLog();
    options.setDbLogDir("file:///mnt/flink/storage/rocksdb/logging/");
    
    env.setStateBackend(stateBackend);
    
    stateBackend.setRocksDBOptions(options);
 

Я добавил ниже 2 параметра в свой конфигурационный файл flink, чтобы включить ведение журнала RocksDB

     state.backend.rocksdb.log.dir: "file:///mnt/flink/storage/rocksdb/logging/"
    state.backend.rocksdb.options-factory: com.myflinkcode.common.config.DefaultConfigurableOptionsFactoryWithLog
 

Я прошел полную панель инструментов flink, но я не получил ни малейшего представления о том, как проверить, происходит ли инкрементная контрольная точка или происходит полная контрольная точка. Пожалуйста, помогите мне, как я могу настроить ведение журнала для RocksDB, чтобы узнать, происходят ли инкрементные контрольные точки или нет. Я видел в документации ведение журнала RocksDB приведет к огромным затратам на производительность, а также на хранение, это для целей тестирования, после этого я отключу это

Ответ №1:

Я не уверен, что эта информация регистрируется или отображается где-либо, но в вашем коде вы могли бы использовать

 stateBackend.isIncrementalCheckpointsEnabled()
 

чтобы определить, включены ли в вашем бэкэнде состояния RocksDB контрольные точки, а затем самостоятельно зарегистрируйте эту информацию.

Обратите внимание, что для включения инкрементных контрольных точек (которые по умолчанию отключены) вам необходимо настроить

 state.backend.incremental: true