Как получить номер Flink taskmanager перед отправкой задания?

#apache-flink #flink-streaming

Вопрос:

У меня есть задание по потоку данных Flink, начатое

 val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(taskmanagernumber * x) // set env parallelism this line

env.addSource...map...addSink...
env.execute()
 

Я хочу контролировать параллелизм env, связанный с номером диспетчера задач, как в приведенном выше коде.

Есть ли способ сделать это? Или какой-либо обходной путь для установки параллелизма, связанного с номером диспетчера задач?

Ответ №1:

Вы можете использовать реактивный планировщик, который автоматически адаптирует параллелизм к тому, что предоставляет кластер.

Вам не нужно устанавливать параллелизм в самой работе. Вы можете установить его в командной строке при запуске задания:

 flink run -p <parallelism> <jar-file> <arguments>
 

Если вы не знаете, сколько слотов доступно в кластере, вы можете получить информацию из REST API. /overview возвращает что-то вроде этого:

 {
  taskmanagers: 2,
  slots-total: 2,
  slots-available: 2,
  jobs-running: 0,
  jobs-finished: 0,
  jobs-cancelled: 0,
  jobs-failed: 0,
  flink-version: "1.13.1",
  flink-commit: "a7f3192"
}
 

slots-available это то, что вы ищете. Так что вы можете сделать что-то вроде

 flink run -p `curl -s http://localhost:8081/overview | jq '.["slots-available"]'` ...
 

Комментарии:

1. Что делать, если я хочу установить его в коде? как будто у меня есть конфигурация загрузки кода и я установил конфигурацию на Flink, я думаю, что командная строка-не лучший способ справиться с этим сценарием.

2. В общем, я думаю, что лучше не включать операционную конфигурацию в код. Но вы можете передать значение в качестве параметра заданию или вызвать REST API из своей работы. Вы также можете установить параллелизм в flink-conf.yaml.