#apache-pulsar
#apache-pulsar
Вопрос:
В Apache Pulsar есть функция TTL, описанная в разделе официальной документации «Хранение сообщений и истечение срока их действия». Срок действия сообщения. Однако я не могу определить, где в конфигурации задано, как часто выполняется эта проверка. Используя стандартную bin/pulsar standalone
команду, с пользовательским пространством имен, с настроенным ttl в 5 секунд bin/pulsar-admin namespaces set-message-ttl public/ttl-test --messageTTL 5
.
Я вижу, что срок действия сообщений истекает только через установленный интервал, и следующее сообщение журнала выводится на консоль:
15:11:59.337 [pulsar-msg-expiry-monitor-52-1] INFO org.apache.pulsar.broker.service.постоянный.PersistentMessageExpiryMonitor — [постоянный: // общедоступный/ttl-test /my-topic][spark-shell] Запуск проверки истечения срока действия сообщения, ttl = 5 секунд
Суть моего вопроса такова: как я могу увеличить скорость, с которой сообщения проверяются на предмет того, превысили ли они TTL?
Комментарии:
1. Когда я писал свой вопрос, я понял, что мне нужно было искать, но я все равно опубликовал его и добавил ответ, который я нашел.
Ответ №1:
Конфигурация messageExpiryCheckIntervalInMinutes
внутри брокера определяет, как часто разделы пространства имен проверяются на наличие сообщений с истекшим сроком действия.
Согласно официальной документации по конфигурации
Ответ №2:
Используйте команду set-message-ttl и укажите имя пространства имен (по умолчанию общедоступное / default для постоянной темы) и время.
bin/pulsar-admin namespaces set-message-ttl public/default --messageTTL 120
пример кода производителя и потребителя для достижения ttl (клиент python)
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('my-topic-reader1')
producer.send(('Hello-Pulsar1').encode('utf-8'))
producer.send(('Hello-Pulsar2').encode('utf-8'))
producer.send(('Hello-Pulsar3').encode('utf-8'))
producer.close()
client.close()
вы можете отправить несколько сообщений, используя метод send. Название темы должно быть одинаковым в обоих классах.
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe("my-topic-reader1", "my-subscription")
//receive all the messages.whatever we publish
msg = consumer.receive()
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
//Here we are not acknowledge all the messages.
//close the consumer and client
consumer.close()
client.close()
В течение 120 секунд мы снова открываем клиента и потребителя и пытаемся прочитать те же сообщения, которые не публикуются. снова мы закрываем клиента и потребителя.
Позже (через 120 секунд) мы снова откроем client и consumer, а затем попытаемся получить сообщение. Но оно не должно появиться. В этом условии вы достигаете времени жизни.