Сбой конвейера Apache Beam, работающего на кластере Flink

#apache-flink #apache-beam

#apache-flink #apache-beam

Вопрос:

У меня есть конвейер Apache Beam, который я пытаюсь развернуть в кластере Flink Docker, развернутом локально.

Сбой конвейера с

 The RemoteEnvironment cannot be instantiated when running in a pre-defined context (such as Command Line Client, Scala Shell, or TestEnvironment)
org.apache.flink.api.java.RemoteEnvironmentConfigUtils.validate(RemoteEnvironmentConfigUtils.java:52)
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.validateAndGetEffectiveConfiguration(RemoteStreamEnvironment.java:178)
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.<init>(RemoteStreamEnvironment.java:158)
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.<init>(RemoteStreamEnvironment.java:144)
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.<init>(RemoteStreamEnvironment.java:113)
org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.<init>(FlinkExecutionEnvironments.java:319)
org.apache.beam.runners.flink.FlinkExecutionEnvironments.createStreamExecutionEnvironment(FlinkExecutionEnvironments.java:177)
org.apache.beam.runners.flink.FlinkExecutionEnvironments.createStreamExecutionEnvironment(FlinkExecutionEnvironments.java:139)
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:98)
org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:108)
ApacheBeamPocJava.main(ApacheBeamPocJava.java:262)
  

Вот как я настраиваю конвейер

 Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); options.setRunner(FlinkRunner.class);
options.setFlinkMaster(“localhost:6123”);
options.setFilesToStage(Arrays.asList("path to the beam jar"));
FlinkRunner flinkRunner = FlinkRunner.fromOptions(options); 
Pipeline p= Pipeline.create(options);
  

И после определения шагов конвейера. Я запускаю его следующим образом

 flinkRunner.run(p);
  

Вот как я отправляю задание

 flink run -c ClassName PATH_TO_JAR
  

Может кто-нибудь посоветовать, что здесь происходит не так?

Также, если у кого-то есть Beam <-> Flink examples, удобные для Java. Я бы определенно оценил это тоже.

Ответ №1:

Похоже, вы определили среду выполнения внутри самого конвейера. Вы пробовали запускать свой конвейер, как описано в документации Flink runner? (Удалите части вашего кода, в которых вы определяете runner или настраиваете его.)

Поскольку Beam — это фреймворк, который отделяет ваш код от исполняющего его раннера, нет необходимости иметь конфигурацию Flink runner в самом коде вашего конвейера. Если вы можете выполнить свой конвейер локально с помощью direct runner, он также будет работать на Flink runner (или любом другом, который поддерживается) при компиляции с правильным профилем.

bin/flink run -c org.apache.beam.examples.WordCount /path/to/your.jar –runner=FlinkRunner –other-parameters-for-your-pipeline-or-the-runner

Пожалуйста, имейте в виду, что в настоящее время в Beam 2.25.0 для Flink Runner есть ошибка, поэтому попробуйте ее с версией 2.24.0 или более поздней версией, когда она будет выпущена.