Сохранение в потоках Kafka> 2.10

#java #apache-kafka-streams

#java #apache-kafka-streams

Вопрос:

Я присоединяюсь к KStreams, поэтому мне нужно установить JoinWindows. Объединенные данные имеют вычисленные временные метки из CustomTimestampExtractors. Данные могут быть не в порядке. Я могу установить сохранение с помощью устаревшего JoinWindows.until(long) метода, но поскольку он устарел, я ищу другое решение.

Я нашел свойство windowstore.changelog.additional.retention.ms — должен ли я его использовать? Это приведет к тому, что все хранилища состояний будут сохраняться так долго.

Ответ №1:

Как указано в документации:

Вместо этого используйте JoinWindows#grace (продолжительность)

ИМХО документ немного вводит в заблуждение, в отличие от:

Установите длительность сохранения окна (время сохранения) в миллисекундах. Это время хранения является гарантированной нижней границей того, как долго будет поддерживаться окно.

Но будьте осторожны, я хотел, чтобы мое хранилище состояний сохранялось как можно дольше, поэтому я использовал JoinWindows.until(Long.MAX_VALUE) раньше. Использование JoinWindows.grace(Duration.ofMillis(Long.Max_VALUE)) жалуется, что льготный период не может быть отрицательным. Там для вас нужно использовать меньшие значения.

Комментарии:

1. Кажется, это ограничение с Duration типом — но если вы просто установите Duration.ofYears(100L) , оно должно быть фактически неограниченным 🙂