#java #hive #apache-flink #flink-table-api
#java #улей #apache-flink #flink-table-api
Вопрос:
Я использую Flink 1.11.2, Hive 2.1.1, Java 8. Попытка выполнить удаленный запрос к улью, упаковала его в jar и запустила с помощью RestClient Flink:
private static String jar = "/path/Job.jar";
Configuration config = RemoteConfiguration.getConfiguration(host, port);
PackagedProgram packagedProgram = PackagedProgram.newBuilder()
.setJarFile(new File(jar))
.setArguments(arguments)
.build();
RestClusterClient<StandaloneClusterId> client =
new RestClusterClient<StandaloneClusterId>(config, StandaloneClusterId.getInstance());
JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, config, 1, false);
client.submitJob(jobGraph).get();
где задание:
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = streamExecutionEnvironment.fromElements(
tableName
);
source
.map(new MapFunction<String, String>() {
String hiveConfDir = "hive-conf";
String hiveCatalogName = "myhive";
String databaseName = "default";
String location = "'hdfs:///tmp/location'";
@Override
public String map(String tableName) {
HiveCatalog hive = new HiveCatalog(hiveCatalogName, databaseName, hiveConfDir, "2.1.1");
EnvironmentSettings batchSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment tableEnv = TableEnvironment.create(batchSettings);
tableEnv.registerCatalog(hiveCatalogName, hive);
tableEnv.useCatalog(hiveCatalogName);
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
hive.getHiveConf().set("hive.vectorized.execution.enabled", "false");
hive.getHiveConf().set("hive.vectorized.execution.reduce.enabled", "false");
hive.getHiveConf().set("hive.vectorized.execution.reduce.groupby.enabled", "false");
tableEnv.executeSql("CREATE TABLE " tableName "(n"
" test INT,n"
" age INTn"
") STORED AS ORC LOCATION " location " TBLPROPERTIES ('orc'n"
"'.compress'='NONE')");
return tableName;
}
})
.print();
streamExecutionEnvironment.execute();
В flink-conf.yaml только один дополнительный параметр:
env.java.home: /path/to/JAVA_HOME
И когда я его запускаю, эти ошибки возникают через раз:
java.lang.OutOfMemoryError: Java heap space
или:
MetaException(message:Got exception: java.lang.ClassCastException class [Ljava.lang.Object; cannot be cast to class [Ljava.net.URI; ([Ljava.lang.Object; and [Ljava.net.URI; are in module java.base of loader 'bootstrap'))
Можете ли вы это объяснить?