Удаленное выполнение задания Flink с запросом к улью в кластере Flink

#java #hive #apache-flink #flink-table-api

#java #улей #apache-flink #flink-table-api

Вопрос:

Я использую Flink 1.11.2, Hive 2.1.1, Java 8. Попытка выполнить удаленный запрос к улью, упаковала его в jar и запустила с помощью RestClient Flink:

 private static String jar = "/path/Job.jar";
Configuration config = RemoteConfiguration.getConfiguration(host, port);
PackagedProgram packagedProgram = PackagedProgram.newBuilder()
                                                     .setJarFile(new File(jar))
                                                     .setArguments(arguments)
                                                     .build();
    RestClusterClient<StandaloneClusterId> client =
        new RestClusterClient<StandaloneClusterId>(config, StandaloneClusterId.getInstance());
    JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, config, 1, false);
    client.submitJob(jobGraph).get();
 

где задание:

 StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStreamSource<String> source = streamExecutionEnvironment.fromElements(
        tableName
    );
    source
        .map(new MapFunction<String, String>() {
          String hiveConfDir = "hive-conf";
          String hiveCatalogName = "myhive";
          String databaseName = "default";
          String location = "'hdfs:///tmp/location'";

          @Override
          public String map(String tableName) {
            HiveCatalog hive = new HiveCatalog(hiveCatalogName, databaseName, hiveConfDir, "2.1.1");
            EnvironmentSettings batchSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
            TableEnvironment tableEnv = TableEnvironment.create(batchSettings);
            tableEnv.registerCatalog(hiveCatalogName, hive);
            tableEnv.useCatalog(hiveCatalogName);
            tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

            hive.getHiveConf().set("hive.vectorized.execution.enabled", "false");
            hive.getHiveConf().set("hive.vectorized.execution.reduce.enabled", "false");
            hive.getHiveConf().set("hive.vectorized.execution.reduce.groupby.enabled", "false");
            tableEnv.executeSql("CREATE TABLE "   tableName   "(n"
                                      "  test INT,n"
                                      "  age INTn"
                                      ") STORED AS ORC LOCATION "   location   " TBLPROPERTIES ('orc'n"
                                      "'.compress'='NONE')");
            
            return tableName;
          }
        })
        .print();
    streamExecutionEnvironment.execute();
 

В flink-conf.yaml только один дополнительный параметр:

 env.java.home: /path/to/JAVA_HOME
 

И когда я его запускаю, эти ошибки возникают через раз:

 java.lang.OutOfMemoryError: Java heap space
 

или:

 MetaException(message:Got exception: java.lang.ClassCastException class [Ljava.lang.Object; cannot be cast to class [Ljava.net.URI; ([Ljava.lang.Object; and [Ljava.net.URI; are in module java.base of loader 'bootstrap'))
 

Можете ли вы это объяснить?