#java #apache-pulsar
#java #apache-pulsar
Вопрос:
https://pulsar.apache.org/api/client/2.4.0/org/apache/pulsar/client/api/Consumer.html#seek-long-
При вызове метода seek(long timestamp) для потребителя должна ли временная метка соответствовать точному времени публикации сообщения? Например, если я отправил три сообщения при t = 1, 5, 7 и если я вызову consumer.seek(3), получу ли я сообщение об ошибке? или мой потребитель будет сброшен до t = 3, так что, если я вызову consumer.next() , я получу свое второе сообщение?
Заранее спасибо,
Ответ №1:
Это Consumer#seek(long timestamp)
позволяет вам сбросить вашу подписку на заданную временную метку. После поиска потребитель начнет получать сообщения со временем публикации, равным или большим, чем временная метка, переданная seek
методу.
В приведенном ниже примере показано, как сбросить пользователя на предыдущий час:
try (
// Create PulsarClient
PulsarClient client = PulsarClient
.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
// Create Consumer subscription
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionMode(SubscriptionMode.Durable)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
.subscribe()
) {
// Seek consumer to previous hour
consumer.seek(Instant.now().minus( Duration.ofHours(1)).toEpochMilli());
while (true) {
final Message<String> msg = consumer.receive();
System.out.printf(
"Message received: key=%s, value=%s, topic=%s, id=%s%n",
msg.getKey(),
msg.getValue(),
msg.getTopicName(),
msg.getMessageId().toString());
consumer.acknowledge(msg);
}
}
Обратите внимание, что если у вас есть несколько потребителей, которые принадлежат к одному и тому же subscriptio (например, Key_Shared ), то все потребители будут сброшены.