интеграционные тесты с kafka на c#

#c# #.net #apache-kafka #integration-testing

#c# #.net #apache-kafka #интеграция-тестирование

Вопрос:

У меня есть библиотека для публикации и использования сообщений из kafka, и я пытаюсь выполнить интеграционные тесты на ней, но у меня большая проблема, поскольку потребитель «не» подключается должным образом.

Поэтому я сократил (для теста) до следующего потребителя:

 private void FakeConsumer(string kafkaServer)
{
    var config = GetSettings(kafkaServer);
    using (var c = new ConsumerBuilder<Ignore, string>(config).Build())
    {
        c.Subscribe(Topic);

        CancellationTokenSource cts = new CancellationTokenSource();
        Console.CancelKeyPress  = (_, e) =>
        {
            e.Cancel = true; 
            cts.Cancel();
        };

        try
        {
            while (true)
            {
                try
                {
                    var cr = c.Consume(cts.Token);

                    Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                }
                catch (ConsumeException e)
                {
                    Console.WriteLine($"Error occurred: {e.Error.Reason}");

                }
            }
        }
        catch (OperationCanceledException)
        {
            // Ensure the consumer leaves the group cleanly and final offsets are committed.
            c.Close();
        }
    }
}
 

затем в моем интеграционном тесте я запустил его как фоновый поток, как вы можете видеть:

 [Fact]
public async Task Test()
{
    new Thread(() => FakeConsumer()) { IsBackground = true }.Start();

    Thread.Sleep(5000);

    Publisher<Event> publisher = new Publisher<Event>(args);

    Event ev = new Event();
    await publisher.PublishAsync(ev, Topic);
    Thread.Sleep(10000);
    
    Assert.Single(MessageHandler.Messages);
}
 

Примечание: я удалил его для публикации, но в коде есть «messagehandler», который сохраняет в памяти прочитанное сообщение.

Я также добавил некоторые Thread.Sleep в основной тест, чтобы тема была создана и между обоими действиями не было «наложения».

код публикации publisher.PublishAsync работает отлично, так как я вижу сообщение в kafdrop.

Проблема в том, что код никогда не проходит var cr = c.Consume(cts.Token); , поток застрял там.

Что я должен сделать, чтобы получить информацию или разрешить фоновой программе получать информацию в var cr = c.Consume(cts.Token); ?

Последнее замечание: если я разделю код на два разных консольных приложения, это сработает.

Спасибо.

Комментарии:

1. Можете ли вы добавить .SetPartitionsAssignedHandler((c, partitions) { var committed = c.Committed(partitions, TimeSpan.FromSeconds(2)); Console.WriteLine($"Assigned partitions: [{string.Join(", ", }); свой ConsumerBuilder и проверить, правильно ли назначены разделы?

2. @rytisk спасибо за ответ, я добавил код, и точка останова там не останавливается, и я не вижу никаких связанных выходных данных. (Консоль. Строка записи)

3. это означает, что вы неправильно подписались на тему, поэтому вы не получаете никаких сообщений. Вы должны дважды проверить свои конфигурации. Может быть, вы указали неправильный адрес брокера или неправильное название темы?

4. @rytisk Я использую переменные для темы и брокера, так что все должно быть в порядке

5. Есть ли другие потребители (в той же группе потребителей), пытающиеся прочитать ту же тему?