Потоки Кафки. Получение упорядоченного по времени потока событий для каждого ключа после закрытия льготного периода окна

#apache-kafka #apache-kafka-streams

#apache-kafka #apache-kafka-streams

Вопрос:

Мой вариант использования предполагает, что мое приложение получает события, некоторые из которых, как мы можем ожидать, будут поступать не по времени (до 2 дней после времени события), которые мне нужно сгруппировать по ключу. Я не хочу агрегировать записи, а просто получать упорядоченный список этих временных окон событий за раз. Я сомневаюсь, что использование функции агрегирования для построения списка событий будет работать, поскольку я, скорее всего, получу исключение RecordTooLargeException, поскольку, вероятно, на каждое ключевое / временное окно будут тысячи событий.

Код, с которым я играл, создает повторяющееся окно с 2-дневным льготным периодом, которое работает в принципе, но требует от меня использования агрегации, и я чувствую, что мой вариант использования для создания списка сообщений выходит за рамки того, для чего изначально предназначалась агрегатная функция — например

 stream.stream[Key, Entry](inputTopic)(Consumed.`with`[EntryKey, Entry](timestampExtractor))
      .groupByKey(Grouped.`with`(keySerde, valueSerde))
      .windowedBy(TimeWindows.of(windowSize).grace(windowGrace))
      .aggregate[EntryGroup](
        EntryGroup(Seq.empty[Entry])
      )((_: EntryKey, newValue: Entry, aggregate: EntryGroup) => EntryGroup(aggregate.anprEntries :  newValue))
      .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
  

Существует ли идиоматический способ передачи упорядоченных по потоку групп событий для ключа / окна времени без проблем с размером записи?

Ответ №1:

Я бы рекомендовал вернуться к API процессора: использование a WindowStore allowDuplicates включенным) позволяет буферизировать все записи (по одной записи на «строку» в хранилище).

Таким образом, вы можете просто помещать входящие записи в хранилище и извлекать «старые записи» из хранилища (и удалять их из хранилища) по прошествии времени.

Комментарии:

1. Спасибо @matthias-j-sax. Я предполагаю, что мы создадим WindowStore с произвольно большим периодом удержания, чтобы гарантировать, что значения не будут удалены до истечения льготного периода — т. Е. в моем случае использования 2 дня. Затем используйте знак препинания для итерации хранилища и отправки записей, возраст которых достиг льготного периода?

2. Да, это должно сработать. — Использование знаков препинания или нет может быть дизайнерским решением: вы также можете выдавать данные с истекшим сроком действия каждый раз, когда добавляете новую запись внутри process() .

3. Идеальный. Я помещу нулевое значение в хранилище состояний для рассматриваемого ключа и временной метки после его отправки. Похоже, это так же хорошо, как удаление.. Большое спасибо!

4. Привет @matthias-j-sax. Я вернулся, чтобы завершить это, и трудность, которую я наблюдал при использовании WindowStore с включенными дубликатами, заключается в том, что я не думаю, что есть четкий способ удалить ключ / значение. Там, где я обычно помещаю нулевое значение для данного ключа, javadoc для Stores.persistenWindowStore, похоже, предполагает, что это невозможно (и мне пришлось бы удалить его из хранилища, как только я передал значение) — «Сохранить дубликаты — сохранять дубликаты или нет. Включение этого параметра автоматически отключит кэширование и означает, что значения null будут игнорироваться «.

5. Ах. Похоже, это ошибка — на самом деле мы недавно сами столкнулись с этим во время issues.apache.org/jira/browse/KAFKA-10847 — В DSL оконные хранилища никогда не вызывают delete() , а полагаются только на время хранения, поэтому ошибка была скрыта в течение длительного времени… Это должно быть исправлено в предстоящем выпуске 3.0 (см. github.com/apache/kafka/pull/10537 )