#apache-flink #apache-beam
#apache-flink #apache-beam
Вопрос:
У меня есть конвейер Apache Beam, который я пытаюсь развернуть в кластере Flink Docker, развернутом локально.
Сбой конвейера с
The RemoteEnvironment cannot be instantiated when running in a pre-defined context (such as Command Line Client, Scala Shell, or TestEnvironment)
org.apache.flink.api.java.RemoteEnvironmentConfigUtils.validate(RemoteEnvironmentConfigUtils.java:52)
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.validateAndGetEffectiveConfiguration(RemoteStreamEnvironment.java:178)
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.<init>(RemoteStreamEnvironment.java:158)
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.<init>(RemoteStreamEnvironment.java:144)
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.<init>(RemoteStreamEnvironment.java:113)
org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.<init>(FlinkExecutionEnvironments.java:319)
org.apache.beam.runners.flink.FlinkExecutionEnvironments.createStreamExecutionEnvironment(FlinkExecutionEnvironments.java:177)
org.apache.beam.runners.flink.FlinkExecutionEnvironments.createStreamExecutionEnvironment(FlinkExecutionEnvironments.java:139)
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:98)
org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:108)
ApacheBeamPocJava.main(ApacheBeamPocJava.java:262)
Вот как я настраиваю конвейер
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); options.setRunner(FlinkRunner.class);
options.setFlinkMaster(“localhost:6123”);
options.setFilesToStage(Arrays.asList("path to the beam jar"));
FlinkRunner flinkRunner = FlinkRunner.fromOptions(options);
Pipeline p= Pipeline.create(options);
И после определения шагов конвейера. Я запускаю его следующим образом
flinkRunner.run(p);
Вот как я отправляю задание
flink run -c ClassName PATH_TO_JAR
Может кто-нибудь посоветовать, что здесь происходит не так?
Также, если у кого-то есть Beam <-> Flink examples, удобные для Java. Я бы определенно оценил это тоже.
Ответ №1:
Похоже, вы определили среду выполнения внутри самого конвейера. Вы пробовали запускать свой конвейер, как описано в документации Flink runner? (Удалите части вашего кода, в которых вы определяете runner или настраиваете его.)
Поскольку Beam — это фреймворк, который отделяет ваш код от исполняющего его раннера, нет необходимости иметь конфигурацию Flink runner в самом коде вашего конвейера. Если вы можете выполнить свой конвейер локально с помощью direct runner, он также будет работать на Flink runner (или любом другом, который поддерживается) при компиляции с правильным профилем.
bin/flink run -c org.apache.beam.examples.WordCount /path/to/your.jar –runner=FlinkRunner –other-parameters-for-your-pipeline-or-the-runner
Пожалуйста, имейте в виду, что в настоящее время в Beam 2.25.0 для Flink Runner есть ошибка, поэтому попробуйте ее с версией 2.24.0 или более поздней версией, когда она будет выпущена.