#apache-flink #flink-streaming
Вопрос:
У меня есть задание по потоку данных Flink, начатое
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(taskmanagernumber * x) // set env parallelism this line
env.addSource...map...addSink...
env.execute()
Я хочу контролировать параллелизм env, связанный с номером диспетчера задач, как в приведенном выше коде.
Есть ли способ сделать это? Или какой-либо обходной путь для установки параллелизма, связанного с номером диспетчера задач?
Ответ №1:
Вы можете использовать реактивный планировщик, который автоматически адаптирует параллелизм к тому, что предоставляет кластер.
Вам не нужно устанавливать параллелизм в самой работе. Вы можете установить его в командной строке при запуске задания:
flink run -p <parallelism> <jar-file> <arguments>
Если вы не знаете, сколько слотов доступно в кластере, вы можете получить информацию из REST API. /overview
возвращает что-то вроде этого:
{
taskmanagers: 2,
slots-total: 2,
slots-available: 2,
jobs-running: 0,
jobs-finished: 0,
jobs-cancelled: 0,
jobs-failed: 0,
flink-version: "1.13.1",
flink-commit: "a7f3192"
}
slots-available
это то, что вы ищете. Так что вы можете сделать что-то вроде
flink run -p `curl -s http://localhost:8081/overview | jq '.["slots-available"]'` ...
Комментарии:
1. Что делать, если я хочу установить его в коде? как будто у меня есть конфигурация загрузки кода и я установил конфигурацию на Flink, я думаю, что командная строка-не лучший способ справиться с этим сценарием.
2. В общем, я думаю, что лучше не включать операционную конфигурацию в код. Но вы можете передать значение в качестве параметра заданию или вызвать REST API из своей работы. Вы также можете установить параллелизм в flink-conf.yaml.