#apache-flink #flink-streaming #rocksdb #flink-cep
#apache-flink #flink-потоковая передача #роксдб #флинк-кэп
Вопрос:
У меня есть простое задание Apache Flink:
** Источник данных (Apache Kafka) — Шаблон Filter — keyBy — CEP (с таймером) — Функция PatternProcessFunction — KeyedProcessFunction (* здесь у меня ValueState (логическое значение) и таймер регистрации на 5 минут. Если значение valueState не равно null, я обновлю значение valueState (нечего отправлять в сборщик) и обновлю таймер. Если значение valueState равно null, я сохраню в состоянии TRUE, затем отправлю событие ввода в сборщик и установлю таймер. Когда метод OnTimer будет готов, я очищу свой ValueState *) — Sink (Apache Kafka) **.
Настройки задания:
** Интервал контрольных точек: 5000 мс **
** Инкрементная контрольная точка: true **
** Семантический: ровно один раз **
** Серверная часть состояния: RocksDB **
** Параллелизм: 4 **
Логически моя работа работает отлично, но у меня есть некоторые проблемы.
У меня было два теста на моем кластере (2 менеджера заданий и 3 диспетчера задач):
** Первый тест: **
Я начал свою работу и подключился к пустой теме Apache Kafka, затем я увидел в ВЕБ-интерфейсе Flink ** Статистику контрольных точек: **
1) Время последнего подтверждения — запуска = 5000 мс (как и мой интервал контрольных точек)
2) Размер состояния = 340 КБ на каждом
интервале 5 секунд 3) Все статус завершен (синий).
** Второй тест: **
Я начал отправлять json-сообщения с другими ключами (от «1» до целого числа.MAX_VALUE) в теме Apache Kafka. Скорость отправки была: 1000 сообщений / сек, затем я увидел в веб-интерфейсе Flink ** Статистику контрольных точек: **
1) Время последнего подтверждения — запуска = 1-6 минут
** Мой вопрос № 1: почему это время растет? Это плохо или нормально? **
2) Размер состояния постоянно рос. Я отправлял сообщения в Kafka около 10 минут (1000 x 60 x 10 = 600000 сообщений). После отправки размер состояния составлял 100-150 МБ.
3) После отправки я подождал около часа и увидел, что:
Время последнего подтверждения — запуска = 5000 мс (например, интервал моей контрольной точки)
Размер состояния был: 100-150 МБ с каждым интервалом в 5 секунд.
** Мой вопрос № 2: почему он не уменьшается? В конце концов, я проверил свои журналы заданий и увидел 600000 записей: ValueState для ** key ** был очищен (метод OnTimer был успешно выполнен), а логика заданий (см. Описание my KeyedProcessFunction) работала отлично **
Что я пытался сделать?
1) установка паузы между контрольными
точками 2) отключение дополнительных контрольных
точек 3) включение асинхронных контрольных точек (в flink-conf.yml)
Это не дает никаких изменений!!!
** Мой вопрос № 3: Что мне делать?? Потому что на промышленном сервере скорость составляет: * 10 миллионов сообщений в час *, а размер контрольной точки увеличивается мгновенно.**
Комментарии:
1. Используете ли вы библиотеку CEP от Flink?
2. Да, Дэвид. Я использую библиотеку CEP Flink. Но я всегда использую шаблон с. в течение (Времени.секунд(….))