Apache Pulsar — каково поведение метода Consumer.seek() по метке времени?

#java #apache-pulsar

#java #apache-pulsar

Вопрос:

https://pulsar.apache.org/api/client/2.4.0/org/apache/pulsar/client/api/Consumer.html#seek-long-

При вызове метода seek(long timestamp) для потребителя должна ли временная метка соответствовать точному времени публикации сообщения? Например, если я отправил три сообщения при t = 1, 5, 7 и если я вызову consumer.seek(3), получу ли я сообщение об ошибке? или мой потребитель будет сброшен до t = 3, так что, если я вызову consumer.next() , я получу свое второе сообщение?

Заранее спасибо,

Ответ №1:

Это Consumer#seek(long timestamp) позволяет вам сбросить вашу подписку на заданную временную метку. После поиска потребитель начнет получать сообщения со временем публикации, равным или большим, чем временная метка, переданная seek методу.

В приведенном ниже примере показано, как сбросить пользователя на предыдущий час:

 try (
    // Create PulsarClient
    PulsarClient client = PulsarClient
        .builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();
    // Create Consumer subscription
    Consumer<String> consumer = client.newConsumer(Schema.STRING)
        .topic("my-topic")
        .subscriptionName("my-subscription")
        .subscriptionMode(SubscriptionMode.Durable)
        .subscriptionType(SubscriptionType.Key_Shared)
        .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
        .subscribe()
) {
    // Seek consumer to previous hour
    consumer.seek(Instant.now().minus( Duration.ofHours(1)).toEpochMilli());
    while (true) {
        final Message<String> msg = consumer.receive();
        System.out.printf(
            "Message received: key=%s, value=%s, topic=%s, id=%s%n",
            msg.getKey(),
            msg.getValue(),
            msg.getTopicName(),
            msg.getMessageId().toString());
        consumer.acknowledge(msg);
    }
}
  

Обратите внимание, что если у вас есть несколько потребителей, которые принадлежат к одному и тому же subscriptio (например, Key_Shared ), то все потребители будут сброшены.