#apache-kafka #apache-kafka-streams
#apache-kafka #apache-kafka-streams
Вопрос:
Я делаю эту простую оконную агрегацию в потоках kafka:
...
.groupByKey(/* Serde stuff */)
.windowedBy(TimeWindows.of(
Duration.ofSeconds(5)
).grace(Duration.ofSeconds(0)).until(5000))
.aggregate(
(key, val, agg) -> {
// aggregation here
},
Materialized.with(/* Serde stuff */)
)
.toStream()
.to("output")
И я ожидал бы, что каждые 5 секунд он будет выдавать в тему вывода один результат, но при его запуске это занимает около 17 секунд.
Итак, после обращения к этому руководству я изменил CACHE_MAX_BYTES_BUFFERING_CONFIG
0
и добавил код в это (только добавив suppress
шаг:
...
.groupByKey(/* Serde stuff */)
.windowedBy(TimeWindows.of(
Duration.ofSeconds(5)
).grace(Duration.ofSeconds(0)).until(5000))
.aggregate(
(key, val, agg) -> {
// aggregation here
},
Materialized.with(/* Serde stuff */)
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.to("output")
При выполнении второй попытки он ничего не выводит, при отладке я вижу, что сообщения обрабатываются через поток без исключений в журналах, но ничего не выходит.
Я что-нибудь пропустил? разве этот код не должен выдавать результаты каждые 5 секунд?
Комментарии:
1. Можете ли вы подробнее рассказать о случае, который вы пытаетесь решить?
2. Можете ли вы добавить некоторые недостающие детали в Materialized.with(/* Serde stuff */)
3. @HaimRaman, у меня есть события, которые мне нужно сгруппировать по ключу (который является типом события) и объединить их в одну запись, содержащую все типы событий, которые используются в течение 5-секундного окна
4. @HaimRaman, что касается деталей, в
Materialized.with
, это вообще не связано, но в случае, если вам нужно его:Materialized.with(Serdes.String(),SerdeFactory.JsonSerde(QueueChangedEvent.class))
где фабрика создает пользовательский Serde на основе Json, и нет проблем с сериализацией / десериализацией, потому что поток работает нормально, за исключением времени.5. @HaimRaman, вы правы, код в порядке, но тесты — нет, в этом посте объясняется, как их также протестировать (ссылка на который приведена в предоставленном вами сообщении), в основном я должен был сделать это в своих тестах, чтобы заставить их работать: (1) создать тестовые данные (2) создатьфиктивное событие с будущей меткой времени для освобождения результата окна (3). утверждение. И каждый тест должен быть изолирован (т.Е. Запускать Kafka broker и поток до и выключать после каждого отдельного теста. Спасибо, вы можете ответить на вопрос с этими деталями, и я приму его.
Ответ №1:
На основе этого сообщения https://www.nerd.vision/post/suppress-surprise-kafka-streams-and-the-suppress-operator
Оператор подавления основан на времени события, и пока новые записи не поступают, поток в основном заморожен.
В этом сообщении объясняется, как это проверить.
Для работы тестов вам нужно:
- создание тестовых данных
- создайте фиктивное событие с будущей меткой времени, чтобы выпустить утверждение результата окна.
Обратите внимание, что каждый тест должен быть изолирован (например, запускайте Kafka broker и поток до и выключайте после каждого отдельного теста или закрывайте драйвер тестирования).