Слияние Кафки: Установка Retention.ms для отдельной темы, которая работает не так, как ожидалось

#python-3.x #apache-kafka #kafka-consumer-api #confluent-platform #confluent-kafka-python

#python-3.x #апач-кафка #кафка-потребитель-api #слияние-платформа #confluent-kafka-python

Вопрос:

Я использую confluent Kafka в своем проекте, где сообщения, отправленные на определенную тему, должны быть удалены по истечении определенного времени хранения. Поэтому я поставил retention.ms для отдельных тем, но это не работает (все еще я могу видеть сообщения по истечении времени их хранения)

Я просмотрел большинство вопросов стека, но все равно не могу найти подходящую причину/ решение для Кафки retention.ms не работает проблема.

Я выполнил следующие действия, чтобы создать и установить время хранения в ms.

  1. Создал тему, скажем. «статус пользователя»
  2. обновил свой retention.ms время, следуя приведенному ниже коду
     from confluent_kafka.admin import AdminClient, ConfigResource, NewTopic, NewPartitions  from confluent_kafka import Producer, Consumer, KafkaError, KafkaException   topic_config = ConfigResource('topic', 'user_status')  topic_config.set_config('retention.ms', '5000')  admin.alter_configs([topic_config])  
  3. Отправьте сообщение с конца производителя.
  4. подождал 6000 мс.
  5. Попытался получить сообщение из определенной темы. Но я получил сообщение, И все равно сообщение не было удалено в соответствии с политикой хранения.

Примечание: Я гарантировал ниже После обновления retention.ms Я проверил, что то же самое было обновлено в информации о теме кафки (Описание темы).

Кроме того, я обновил server.properties с помощью log.retention.check.interval.ms=1 мс и перезапустил службу Kafka после обновления файла свойств.

чего я ожидаю от вышеупомянутого вопроса

Я хочу установить retention.ms к отдельной теме, и сообщение, передающее это время, должно быть автоматически удалено, как определено в политике Кафки.

что происходит сейчас с моим текущим кодом. сообщения все еще принимаются потребителем даже после retention.ms время.