#apache-flink #flink-streaming #rocksdb #flink-cep #rocksdb-java
#apache-flink #flink-потоковая передача #rocksdb #flink-cep #rocksdb-java
Вопрос:
У меня есть задание, которое использует RabbitMQ, я использовал серверную часть состояния FS, но, похоже, размеры состояний стали больше, и тогда я решаю переместить свои состояния в RocksDB. Проблема в том, что в течение первых часов выполнения задания все в порядке, событие через некоторое время, если трафик замедляется, но затем, когда трафик снова становится высоким, у потребителя начинаются проблемы (события накапливаются как нераспакованные), а затем эти проблемы отражаются в остальной части приложения.
У меня есть: 4 ядра процессора,
локальный диск
, 16 ГБ ОЗУ,
среда Unix
, Flink 1.11,
Scala версии 2.11
, 1 одно задание, выполняемое с несколькими ключевыми потоками, и около 10 преобразований, и переход на Postgres
некоторые конфигурации
flink.buffer_timeout=50
flink.maxparallelism=4
flink.memory=16
flink.cpu.cores=4
#checkpoints
flink.checkpointing_compression=true
flink.checkpointing_min_pause=30000
flink.checkpointing_timeout=120000
flink.checkpointing_enabled=true
flink.checkpointing_time=60000
flink.max_current_checkpoint=1
#RocksDB configuration
state.backend.rocksdb.localdir=home/username/checkpoints (this is not working don't know why)
state.backend.rocksdb.thread.numfactory=4
state.backend.rocksdb.block.blocksize=16kb
state.backend.rocksdb.block.cache-size=512mb
#rocksdb or heap
state.backend.rocksdb.timer-service.factory=heap (I have test with rocksdb too and is the same)
state.backend.rocksdb.predefined-options=SPINNING_DISK_OPTIMIZED
Дайте мне знать, нужна ли дополнительная информация?
Ответ №1:
state.backend.rocksdb.localdir
должен быть абсолютный путь, а не относительный. И этот параметр не предназначен для указания того, куда идут контрольные точки (которых не должно быть на локальном диске), этот параметр предназначен для указания того, где сохраняется рабочее состояние (которое должно быть на локальном диске).
Ваша работа испытывает противодавление, а это означает, что какая-то часть конвейера не может поддерживать работу. Наиболее распространенными причинами противодавления являются (1) приемники, которые не могут поддерживать работу, и (2) недостаточные ресурсы (например, слишком низкий уровень параллелизма).
Вы можете проверить, является ли проблема postgres, запустив задание с удаленным приемником.
Просмотр различных показателей должен дать вам представление о том, какие ресурсы могут быть недостаточно предоставлены.
Комментарии:
1. Я проверял противодавление на панели инструментов Flink, и ни у одного из операторов нет противодавления, по крайней мере, так указано в показателях, но не уверен, что это на 100% верно. У меня всего 4 ядра процессора, как вы думаете, хорошая идея увеличить параллелизм? это
state.backend.rocksdb.localdir
отключено прямо сейчас. Я могу запустить тест с отбрасыванием приемника, но не уверен, что это будет решением, но я это сделаю.2. Локальный каталог rocksdb по умолчанию равен /tmp.
3. Как вы думаете, увеличить параллелизм — хорошая идея?
4. Монитор противодавления на панели инструментов Flink не является полностью надежным индикатором; он может сказать, что все в порядке, когда это не так. Но проверяли ли вы противодавление в каждой подзадаче? Если источники Flink не справляются с вводом из RabbitMQ, значит, что-то находится под давлением.
5. Увеличивая параллелизм, я получаю более высокую загрузку процессора и памяти, как и ожидалось, я думаю, контрольные точки немного больше, но их тоже следует ожидать, но в данный момент у меня стабильный потребитель и задержка.