Как TTL (время жить) применяется к пространству имен?

#apache-pulsar

#apache-pulsar

Вопрос:

В Apache Pulsar есть функция TTL, описанная в разделе официальной документации «Хранение сообщений и истечение срока их действия». Срок действия сообщения. Однако я не могу определить, где в конфигурации задано, как часто выполняется эта проверка. Используя стандартную bin/pulsar standalone команду, с пользовательским пространством имен, с настроенным ttl в 5 секунд bin/pulsar-admin namespaces set-message-ttl public/ttl-test --messageTTL 5 .

Я вижу, что срок действия сообщений истекает только через установленный интервал, и следующее сообщение журнала выводится на консоль:

15:11:59.337 [pulsar-msg-expiry-monitor-52-1] INFO org.apache.pulsar.broker.service.постоянный.PersistentMessageExpiryMonitor — [постоянный: // общедоступный/ttl-test /my-topic][spark-shell] Запуск проверки истечения срока действия сообщения, ttl = 5 секунд

Суть моего вопроса такова: как я могу увеличить скорость, с которой сообщения проверяются на предмет того, превысили ли они TTL?

Комментарии:

1. Когда я писал свой вопрос, я понял, что мне нужно было искать, но я все равно опубликовал его и добавил ответ, который я нашел.

Ответ №1:

Конфигурация messageExpiryCheckIntervalInMinutes внутри брокера определяет, как часто разделы пространства имен проверяются на наличие сообщений с истекшим сроком действия.

Согласно официальной документации по конфигурации

Ответ №2:

Используйте команду set-message-ttl и укажите имя пространства имен (по умолчанию общедоступное / default для постоянной темы) и время.

 bin/pulsar-admin namespaces set-message-ttl public/default --messageTTL 120
  

пример кода производителя и потребителя для достижения ttl (клиент python)

 import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('my-topic-reader1')

producer.send(('Hello-Pulsar1').encode('utf-8'))
producer.send(('Hello-Pulsar2').encode('utf-8'))
producer.send(('Hello-Pulsar3').encode('utf-8'))

producer.close()
client.close()
  

вы можете отправить несколько сообщений, используя метод send. Название темы должно быть одинаковым в обоих классах.

 import pulsar
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe("my-topic-reader1", "my-subscription")

//receive all the messages.whatever we publish
msg = consumer.receive()
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))

//Here we are not acknowledge all the messages.

//close the consumer and client
consumer.close()
client.close()
  

В течение 120 секунд мы снова открываем клиента и потребителя и пытаемся прочитать те же сообщения, которые не публикуются. снова мы закрываем клиента и потребителя.

Позже (через 120 секунд) мы снова откроем client и consumer, а затем попытаемся получить сообщение. Но оно не должно появиться. В этом условии вы достигаете времени жизни.