#apache-kafka-streams
#apache-kafka-streams
Вопрос:
По какой-то причине мои старые хранилища состояния не очищаются после истечения срока действия политики хранения. Я тестирую его локально, поэтому я просто отправляю одно тестовое сообщение каждые 5 минут или около того. У меня установлена низкая продолжительность хранения только для тестирования. retentionPeriod = 120, retentionWindowSize = 15, и я предполагаю, что сохранение дубликатов должно быть ложным. Когда это должно быть правдой?
Stores.persistentWindowStore(storeName,
Duration.of(retentionPeriod, ChronoUnit.SECONDS),
Duration.of(retentionWindowSize, ChronoUnit.SECONDS),
false)
Когда я нахожусь в каталоге хранилища состояния, я вижу старые хранилища задолго до истечения срока хранения. Например, хранилище.1554238740000 (при условии, что число равно эпохе мс). Я хорошо переношу 2-минутное время хранения, и этот каталог все еще там.
Чего мне не хватает?
Обратите внимание, что в конечном итоге он очищается намного позже, чем я ожидал. Что запускает очистку?
Ответ №1:
Время хранения — это минимальная гарантия того, как долго хранятся данные. Для повышения эффективности истечения срока действия используются так называемые сегменты для разделения временной шкалы на «сегменты». Только после того, как время для всех данных в сегменте может истечь, сегмент удаляется. По умолчанию Kafka Streams использует 3 сегмента. Таким образом, для вашего примера со временем хранения 120 секунд каждый сегмент будет иметь размер 60 секунд (а не 40 секунд). Причина в том, что самый старый сегмент может быть удален только из всех данных в нем, если прошло время хранения. Если бы размер сегмента составлял всего 40 секунд, для достижения этого потребовалось бы 4 сегмента:
S1 [0-40) -- S2 [40,80) -- S3 [80,120)
Если запись с отметкой времени 121 должна быть сохранена, S1 пока нельзя удалить, поскольку она содержит данные для временных меток от 1 до 40, для которых еще не пройден период хранения. Таким образом, потребуется новый сегмент S4. Для размера сегмента 60 достаточно 3 сегментов:
S1 [0-60) -- S2 [60,120) -- S3 [120,180)
В этом случае, если поступает запись с отметкой времени 181, всем данным в первом сегменте передается время хранения 181-120 = 61, и, таким образом, S1 может быть удален до создания S4.
Обратите внимание, что начиная с Kafka 2.1 внутренний механизм остается тем же самым, однако потоки Kafka строго соблюдают период хранения на уровне приложения, т. Е. Запись отбрасывается, а чтение возвращается null
для всех данных, прошедших период хранения (даже если данные все еще там, поскольку сегмент все еще используется).