#python-3.x #apache-kafka #kafka-consumer-api #confluent-platform #confluent-kafka-python
#python-3.x #апач-кафка #кафка-потребитель-api #слияние-платформа #confluent-kafka-python
Вопрос:
Я использую confluent Kafka в своем проекте, где сообщения, отправленные на определенную тему, должны быть удалены по истечении определенного времени хранения. Поэтому я поставил retention.ms для отдельных тем, но это не работает (все еще я могу видеть сообщения по истечении времени их хранения)
Я просмотрел большинство вопросов стека, но все равно не могу найти подходящую причину/ решение для Кафки retention.ms не работает проблема.
Я выполнил следующие действия, чтобы создать и установить время хранения в ms.
- Создал тему, скажем. «статус пользователя»
- обновил свой retention.ms время, следуя приведенному ниже коду
from confluent_kafka.admin import AdminClient, ConfigResource, NewTopic, NewPartitions from confluent_kafka import Producer, Consumer, KafkaError, KafkaException topic_config = ConfigResource('topic', 'user_status') topic_config.set_config('retention.ms', '5000') admin.alter_configs([topic_config])
- Отправьте сообщение с конца производителя.
- подождал 6000 мс.
- Попытался получить сообщение из определенной темы. Но я получил сообщение, И все равно сообщение не было удалено в соответствии с политикой хранения.
Примечание: Я гарантировал ниже После обновления retention.ms Я проверил, что то же самое было обновлено в информации о теме кафки (Описание темы).
Кроме того, я обновил server.properties с помощью log.retention.check.interval.ms=1 мс и перезапустил службу Kafka после обновления файла свойств.
чего я ожидаю от вышеупомянутого вопроса
Я хочу установить retention.ms к отдельной теме, и сообщение, передающее это время, должно быть автоматически удалено, как определено в политике Кафки.
что происходит сейчас с моим текущим кодом. сообщения все еще принимаются потребителем даже после retention.ms время.