Kafka .сетевой клиент не получает никаких сообщений при использовании Assign()

#.net #apache-kafka #confluent-platform

#.net #apache-kafka #confluent-платформа

Вопрос:

Я пытаюсь заставить свой сервис перечитать раздел kafka от начала до конца при запуске, чтобы инициализировать внутренние структуры данных. Я использую Confluent .СЕТЕВОЙ клиент. Насколько я понимаю, следующий код должен подписаться на меня в разделе, устанавливающем смещение к началу:

 consumer.Assign(new TopicPartitionOffset(topic, Partition.Any, Offset.Beginning));
  

Но по какой-то причине я не получаю ни сообщений, ранее существовавших в topic, ни новых.
Я неправильно понимаю метод Assign ()? Есть ли способ достичь желаемого результата с помощью Subscribe () без необходимости жесткого сброса смещений с использованием kafka CLI?

Вот полный тестовый клиент, на выходе у меня всегда «Нет сообщений …», несмотря на то, что в теме есть сообщения и поступают новые сообщения.

     static void Main(string[] args)
    {
        var config = new ConsumerConfig
        {
            BootstrapServers = "localhost:9092",
            GroupId = "test-consumer",
            AutoOffsetReset = AutoOffsetReset.Earliest,
        };
        var consumer = new ConsumerBuilder<Null, byte[]>(config).Build();
        var topic = "test-topic";
        consumer.Assign(new TopicPartitionOffset(topic, Partition.Any, Offset.Beginning));
        while (true)
        {
            var result = consumer.Consume(TimeSpan.FromSeconds(5));
            if (result == null)
                Console.WriteLine("No messages...");
            else
                Console.WriteLine($"Offset: {result.Offset}");
        }
    }
  

Комментарии:

1. Пожалуйста, удалите TimeSpan.FromSeconds(5) из consumer.Consume … затем повторите попытку… дайте мне знать, если это сработает 🙂

2. Он просто зависает навсегда на пользователе. Вызов Consume().

Ответ №1:

Проблема заключалась в том, что я вызвал Assign () с разделом.В любом случае, работает следующий код:
consumer.Assign(new TopicPartitionOffset(topic, new Partition(0), Offset.Beginning));

Ответ №2:

Почему вы хотели бы использовать Assign ? У вас должно сработать следующее:

 public static void Main(string[] args)
{
    var conf = new ConsumerConfig
    { 
        GroupId = "test-consumer",
        BootstrapServers = "localhost:9092",
        AutoOffsetReset = AutoOffsetReset.Earliest
    };

    using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
    {
        c.Subscribe("test-topic");

        CancellationTokenSource cts = new CancellationTokenSource();
        Console.CancelKeyPress  = (_, e) => {
            e.Cancel = true; // prevent the process from terminating.
            cts.Cancel();
        };

        try
        {
            while (true)
            {
                try
                {
                    var cr = c.Consume(cts.Token);
                    Console.WriteLine($"Message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                }
                catch (ConsumeException e)
                {
                    Console.WriteLine($"Error: {e.Error.Reason}");
                }
            }
        }
        catch (OperationCanceledException)
        {
            c.Close();
        }
    }
}
  

Комментарии:

1. При этом будут считаны события из последнего зафиксированного смещения. Моя цель — перечитать все события с самого начала.

2. @AlexeyUkolov Это контролируется AutoOffsetReset = AutoOffsetReset.Earliest . Если вы измените GroupId на "test-consumer-2" (т. Е. новую / ранее невидимую группу потребителей), вы сможете читать сообщения с самого начала. Надеюсь, это поможет.

3. @AlexeyUkolov Дайте мне знать, сработало ли это у вас, чтобы я мог обновить свой ответ.

4. AutoOffsetReset запускается, только если последнее зафиксированное смещение больше недоступно.

5. Изменение идентификатора группы могло бы сработать, но тогда мне нужно будет постоянно создавать новые идентификаторы групп. Метод Assign () не создает группу подписки в zookeeper, вот почему я пытался использовать его вместо Subscribe ().