kafka stream windowedBy не дает ожидаемых результатов

#apache-kafka #apache-kafka-streams

#apache-kafka #apache-kafka-streams

Вопрос:

Я делаю эту простую оконную агрегацию в потоках kafka:

 ...
.groupByKey(/* Serde stuff */)
.windowedBy(TimeWindows.of(
        Duration.ofSeconds(5)
).grace(Duration.ofSeconds(0)).until(5000))
.aggregate(
  (key, val, agg) -> {
    // aggregation here
  }, 
  Materialized.with(/* Serde stuff */)
)
.toStream()
.to("output")
 

И я ожидал бы, что каждые 5 секунд он будет выдавать в тему вывода один результат, но при его запуске это занимает около 17 секунд.

Итак, после обращения к этому руководству я изменил CACHE_MAX_BYTES_BUFFERING_CONFIG 0 и добавил код в это (только добавив suppress шаг:

 ...
.groupByKey(/* Serde stuff */)
.windowedBy(TimeWindows.of(
        Duration.ofSeconds(5)
    ).grace(Duration.ofSeconds(0)).until(5000))
.aggregate(
      (key, val, agg) -> {
    // aggregation here
  }, 
  Materialized.with(/* Serde stuff */)
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.to("output")  
 

При выполнении второй попытки он ничего не выводит, при отладке я вижу, что сообщения обрабатываются через поток без исключений в журналах, но ничего не выходит.

Я что-нибудь пропустил? разве этот код не должен выдавать результаты каждые 5 секунд?

Комментарии:

1. Можете ли вы подробнее рассказать о случае, который вы пытаетесь решить?

2. Можете ли вы добавить некоторые недостающие детали в Materialized.with(/* Serde stuff */)

3. @HaimRaman, у меня есть события, которые мне нужно сгруппировать по ключу (который является типом события) и объединить их в одну запись, содержащую все типы событий, которые используются в течение 5-секундного окна

4. @HaimRaman, что касается деталей, в Materialized.with , это вообще не связано, но в случае, если вам нужно его: Materialized.with(Serdes.String(),SerdeFactory.JsonSerde(QueueChangedEvent.class)) где фабрика создает пользовательский Serde на основе Json, и нет проблем с сериализацией / десериализацией, потому что поток работает нормально, за исключением времени.

5. @HaimRaman, вы правы, код в порядке, но тесты — нет, в этом посте объясняется, как их также протестировать (ссылка на который приведена в предоставленном вами сообщении), в основном я должен был сделать это в своих тестах, чтобы заставить их работать: (1) создать тестовые данные (2) создатьфиктивное событие с будущей меткой времени для освобождения результата окна (3). утверждение. И каждый тест должен быть изолирован (т.Е. Запускать Kafka broker и поток до и выключать после каждого отдельного теста. Спасибо, вы можете ответить на вопрос с этими деталями, и я приму его.

Ответ №1:

На основе этого сообщения https://www.nerd.vision/post/suppress-surprise-kafka-streams-and-the-suppress-operator

Оператор подавления основан на времени события, и пока новые записи не поступают, поток в основном заморожен.

В этом сообщении объясняется, как это проверить.

Для работы тестов вам нужно:

  1. создание тестовых данных
  2. создайте фиктивное событие с будущей меткой времени, чтобы выпустить утверждение результата окна.

Обратите внимание, что каждый тест должен быть изолирован (например, запускайте Kafka broker и поток до и выключайте после каждого отдельного теста или закрывайте драйвер тестирования).