#java #mysql #hive #apache-flink #flink-streaming
#java #mysql #улей #apache-flink #flink-потоковая передача
Вопрос:
- Проблема
Прочитайте данные в hive и запишите их в mysql. Данные отсутствуют. При использовании JdbcUpsertTableSink для передачи данных в hive имеется 90 фрагментов данных, а в mysql — 70 фрагментов данных, но в локальном тестировании данные не теряются. У Flink нет какого-либо ненормального информационного запроса. - Информация о версии программного обеспечения:
- Информация о версии Flink: 1.11.1
- информация о версии hive: 1.2.1
- информация о версии mysql: 5.6
- Ниже приведена моя программа
public class MarketMonitorWriteMySQLJob {
public static void main(String[] args) throws IOException {
ParameterToolFactory parameterToolFactory = new ParameterToolFactory();
ParameterTool parameterTool = parameterToolFactory.createParameterTool();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();
TableEnvironment tableEnv =
TableEnvironment.create(settings);
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
HiveCatalog hiveCatalog = new HiveCatalog(CataLogEnum.NEW_BI.getCataLogName(),
CataLogEnum.NEW_BI.getDbName(),
parameterTool.get(FlinkProperEnum.FLINK_HIVE_CONF_DIR.key));
tableEnv.registerCatalog(CataLogEnum.NEW_BI.getCataLogName(), hiveCatalog);
tableEnv.useCatalog(CataLogEnum.NEW_BI.getCataLogName());
ParameterTool fromArgs = ParameterTool.fromArgs(args);
String currentDay = DateConversionUtils.getCurrentDateToString();
String currentDayNoBar = DateConversionUtils.getCurrentDateNoBarsToString();
SQLBase marketMonitorSQL = new MarketMonitorSQL();
String currentDateHour = String.valueOf(DateConversionUtils.getLastHour());
String sql = "select * from new_bi.market_monitor_job where dt = '2020-11-25' and hs = 09";
if (fromArgs.getNumberOfParameters() > 1) {
String currentday = fromArgs.get("currentday");
String currenthour = fromArgs.get("currenthour");
String[] externalParams = new String[]{currentday, currenthour};
// sql = String.format(sql, externalParams);
} else {
String[] dateArgs = {currentDay, currentDateHour};
// sql = String.format(sql, dateArgs);
}
System.out.println(sql);
JdbcOptions options = JdbcOptionsGenUtils.genJdbcOptions(
parameterTool.get(FlinkProperEnum.FLINK_MYSQL_CUSTOM_DATASOURCE_NEWBI_URL.key),
parameterTool.get(FlinkProperEnum.FLINK_MYSQL_CUSTOM_DATASOURCE_NEWBI_DRIVER_CLASS_NAME.key),
parameterTool.get(FlinkProperEnum.FLINK_MYSQL_CUSTOM_DATASOURCE_NEWBI_USERNAME.key),
parameterTool.get(FlinkProperEnum.FLINK_MYSQL_CUSTOM_DATASOURCE_NEWBI_PASSWORD.key),
"market_monitor_job");
TableSchema schema = TableSchema.builder()
.fields(marketMonitorSQL.getSinkTableFields(), marketMonitorSQL.getSinkTableFieldTypes())
.build();
JdbcUpsertTableSink tableSink = JdbcUpsertTableSink.builder()
.setOptions(options)
.setTableSchema(schema)
// .setFlushIntervalMills(1000)
.setFlushMaxSize(100)
.build();
tableSink.setKeyFields(marketMonitorSQL.getSinkTableKeys());
tableSink.setIsAppendOnly(false);
tableEnv.registerTableSink("market_monitor_job_mysql_sink", tableSink);
Table table = tableEnv.sqlQuery(sql);
table.printSchema();
StatementSet statement = tableEnv.createStatementSet();
statement.addInsert("market_monitor_job_mysql_sink", table);
statement.execute();
}
}
Кластер не предоставляет никакой ненормальной информации, и я понятия не имею, как справиться с проблемой! Спасибо всем за вашу помощь 🙂
Комментарии:
1. Вы отладили программу?
2. @JustanotherJavaprogrammer Мне нужно только распечатать информацию о результате запроса. Результаты онлайн-запроса кластера Flink являются нормальными, но данные теряются при записи в mysql. Локальный тест может быть записан. Теперь debug не знает, как с этим справиться?