#java #apache-flink
#java #apache-flink
Вопрос:
Я хочу развернуть настроенное задание flink в отдельный кластер с помощью Java-приложения. Я бы хотел сделать это с помощью RestClusterClient.
Насколько я понимаю, я могу сделать это следующим образом:
- Создайте jar с моим заданием (например, простой поток из одной темы kafka в другую с преобразованием данных);
- С помощью PackagedProgramUtils создайте JobGraph из файла jar;
- Инициализируйте RestClusterClient и отправьте JobGraph в кластер.
PackagedProgram packagedProgram = PackagedProgram.newBuilder()
.setJarFile(new File("path/to/my/jar/file"))
.setArguments(arguments)
.build();
JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, flinkConfiguration, 1, false);
try (RestClusterClient<StandaloneClusterId> client = new RestClusterClient<>(flinkConfiguration, StandaloneClusterId.getInstance())) {
client.submitJob(jobGraph).get();
}
Однако я хочу настроить свое задание и развернуть его в кластере в одном приложении Java. Я нашел следующий способ сделать это:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// add source, any data processing and sink
StreamGraph streamGraph = graphFlow.getConfiguredEnvironment().getStreamGraph("myGraphName", false);
JobGraph graph = streamGraph.getJobGraph();
try (RestClusterClient<StandaloneClusterId> client = new RestClusterClient<>(flinkConfiguration, StandaloneClusterId.getInstance())) {
client.submitJob(jobGraph).get();
}
Это решение не представляется приемлемым, поскольку методы StreamExecutionEnvironment аннотируются как @Internal . Какие другие способы можно использовать для создания объекта JobGraph, который можно загрузить с помощью RestClusterClient?
Поскольку мне нужно загрузить задание в кластер flink, я не могу использовать RemoteStreamEnvironment.
Версия Flink — 1.11.2