Данные записи кластера Flink в mysql потеряны

#java #mysql #hive #apache-flink #flink-streaming

#java #mysql #улей #apache-flink #flink-потоковая передача

Вопрос:

  1. Проблема
    Прочитайте данные в hive и запишите их в mysql. Данные отсутствуют. При использовании JdbcUpsertTableSink для передачи данных в hive имеется 90 фрагментов данных, а в mysql — 70 фрагментов данных, но в локальном тестировании данные не теряются. У Flink нет какого-либо ненормального информационного запроса.
  2. Информация о версии программного обеспечения:
  • Информация о версии Flink: 1.11.1
  • информация о версии hive: 1.2.1
  • информация о версии mysql: 5.6
  1. Ниже приведена моя программа
 public class MarketMonitorWriteMySQLJob {
    public static void main(String[] args) throws IOException {

        ParameterToolFactory parameterToolFactory = new ParameterToolFactory();
        ParameterTool parameterTool = parameterToolFactory.createParameterTool();

        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inBatchMode()
                .build();

        TableEnvironment tableEnv =
                TableEnvironment.create(settings);

        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);


        HiveCatalog hiveCatalog = new HiveCatalog(CataLogEnum.NEW_BI.getCataLogName(),
                CataLogEnum.NEW_BI.getDbName(),
                parameterTool.get(FlinkProperEnum.FLINK_HIVE_CONF_DIR.key));

        tableEnv.registerCatalog(CataLogEnum.NEW_BI.getCataLogName(), hiveCatalog);

        tableEnv.useCatalog(CataLogEnum.NEW_BI.getCataLogName());



        ParameterTool fromArgs = ParameterTool.fromArgs(args);
        String currentDay = DateConversionUtils.getCurrentDateToString();
        String currentDayNoBar = DateConversionUtils.getCurrentDateNoBarsToString();

        SQLBase marketMonitorSQL = new MarketMonitorSQL();
        String currentDateHour = String.valueOf(DateConversionUtils.getLastHour());
        String sql = "select * from new_bi.market_monitor_job where dt = '2020-11-25' and hs = 09";
        if (fromArgs.getNumberOfParameters() > 1) {
            String currentday = fromArgs.get("currentday");
            String currenthour = fromArgs.get("currenthour");

            String[] externalParams = new String[]{currentday, currenthour};
//            sql = String.format(sql, externalParams);
        } else {
            String[] dateArgs = {currentDay, currentDateHour};
//            sql = String.format(sql, dateArgs);
        }

        System.out.println(sql);

        JdbcOptions options = JdbcOptionsGenUtils.genJdbcOptions(
                parameterTool.get(FlinkProperEnum.FLINK_MYSQL_CUSTOM_DATASOURCE_NEWBI_URL.key),
                parameterTool.get(FlinkProperEnum.FLINK_MYSQL_CUSTOM_DATASOURCE_NEWBI_DRIVER_CLASS_NAME.key),
                parameterTool.get(FlinkProperEnum.FLINK_MYSQL_CUSTOM_DATASOURCE_NEWBI_USERNAME.key),
                parameterTool.get(FlinkProperEnum.FLINK_MYSQL_CUSTOM_DATASOURCE_NEWBI_PASSWORD.key),
                "market_monitor_job");


        TableSchema schema = TableSchema.builder()
                .fields(marketMonitorSQL.getSinkTableFields(), marketMonitorSQL.getSinkTableFieldTypes())
                .build();


        JdbcUpsertTableSink tableSink = JdbcUpsertTableSink.builder()
                .setOptions(options)
                .setTableSchema(schema)
//                .setFlushIntervalMills(1000)
                .setFlushMaxSize(100)
                .build();

        tableSink.setKeyFields(marketMonitorSQL.getSinkTableKeys());
        tableSink.setIsAppendOnly(false);

        tableEnv.registerTableSink("market_monitor_job_mysql_sink", tableSink);



        Table table = tableEnv.sqlQuery(sql);
        table.printSchema();

        StatementSet statement = tableEnv.createStatementSet();
        statement.addInsert("market_monitor_job_mysql_sink", table);
        statement.execute();
    }
}

 

Кластер не предоставляет никакой ненормальной информации, и я понятия не имею, как справиться с проблемой! Спасибо всем за вашу помощь 🙂

Комментарии:

1. Вы отладили программу?

2. @JustanotherJavaprogrammer Мне нужно только распечатать информацию о результате запроса. Результаты онлайн-запроса кластера Flink являются нормальными, но данные теряются при записи в mysql. Локальный тест может быть записан. Теперь debug не знает, как с этим справиться?