Образец MassTransit JobConsumer завершается ошибкой с исключением тайм-аута

#masstransit

#массовый переход

Вопрос:

Массовый переход на 7.0.4. Я получаю исключение, пытаясь продемонстрировать самый простой сценарий работы с потребителем, как описано здесь: https://masstransit-project.com/advanced/job-consumers.html . Сообщение об исключении:

 Timeout waiting for response, RequestId: 4ec50000-294e-000c-71b6-08d8768d0bc0
  

Я подозреваю, что в примере может отсутствовать конфигурация проводника.

У меня есть решение с 3 проектами. Project TestJob содержит ConvertVideo интерфейс для совместного использования между клиентом и потребителем:

 namespace JobSystem.Jobs
{
    using System;

    public interface ConvertVideo
    {
        Guid VideoId { get; }
        string Format { get; }
    }
}
  

Проект TestJobClient имеет один класс Program. Исключение возникает при вызове GetResponse .

 namespace JobSystemClient
{
    using System;
    using System.Threading;
    using System.Threading.Tasks;
    using JobSystem.Jobs;
    using MassTransit;
    using MassTransit.Contracts.JobService;

    public class Program
    {
        public static async Task Main()
        {
            var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => 
            {
                // Not in sample, I added this
                cfg.Host("localhost");
            });

            var source = new CancellationTokenSource(TimeSpan.FromSeconds(30));

            await busControl.StartAsync(source.Token);
            try
            {
                var serviceClient = busControl.CreateServiceClient();

                var requestClient = serviceClient.CreateRequestClient<ConvertVideo>();

                do
                {
                    string value = await Task.Run(() =>
                    {
                        Console.WriteLine("Enter video format (or quit to exit)");
                        Console.Write("> ");
                        return Console.ReadLine();
                    });

                    if ("quit".Equals(value, StringComparison.OrdinalIgnoreCase))
                        break;

                    // Exception comes up here!
                    var response = await requestClient.GetResponse<JobSubmissionAccepted>(new
                    {
                        VideoId = NewId.NextGuid(),
                        Format = value
                    });
                }
                while (true);
            }
            finally
            {
                await busControl.StopAsync();
            }
        }
    }
}
  

Проект TestJobConsumer имеет два класса: Program и ConvertVideoJobConsumer. Я добавил здесь небольшой фрагмент кода, чтобы запустить шину и убедиться, что она остается в рабочем состоянии для обработки запросов.

 namespace JobSystemConsoleService
{
    using System;
    using System.Threading.Tasks;
    using JobSystem.Jobs;
    using MassTransit;
    using MassTransit.Conductor;
    using MassTransit.JobService;

    public class Program
    {
        public static async Task Main()
        {
            var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                // Not in sample, I added this
                cfg.Host("localhost");

                var options = new ServiceInstanceOptions()
                    .EnableInstanceEndpoint();

                cfg.ServiceInstance(options, instance =>
                {
                    instance.ConfigureJobServiceEndpoints();

                    var queueName = instance.EndpointNameFormatter.Consumer<ConvertVideoJobConsumer>();

                    instance.ReceiveEndpoint(queueName, e =>
                    {
                        e.Consumer(() => new ConvertVideoJobConsumer(), c =>
                        {
                            // Note: edited this line from options to opts, as 
                            // the example has a compile-time error due to options 
                            // in already existing one of the parent scopes
                            c.Options<JobOptions<ConvertVideo>>(opts => opts
                                .SetJobTimeout(TimeSpan.FromMinutes(15))
                                .SetConcurrentJobLimit(10));
                        });
                    });
                });
            });

            await busControl.StartAsync();

            Console.WriteLine("Hit <Enter> to quit.");
            Console.Write("> ");
            Console.ReadLine();

            await busControl.StopAsync();
        }

        public class ConvertVideoJobConsumer :
            IJobConsumer<ConvertVideo>
        {
            public async Task Run(JobContext<ConvertVideo> context)
            {
                // simulate converting the video
                await Task.Delay(TimeSpan.FromSeconds(5));
            }
        }
    }
}
  

ОБНОВЛЕНИЕ 2020- 15 декабря:

Как и было предложено, я попытался запустить образец приложения должным образом, чтобы я мог сравнить его код с документацией, но я не смог заставить образец работать должным образом.

После загрузки он работает нормально — и первый вызов службы с использованием пользовательского интерфейса Swagger всегда завершается успешно. Однако мой второй или третий вызов постоянно завершается неудачей — похоже на ошибку блокировки PostgreSQL. Изменение образца для использования SQL Server вместо этого исправляет это.


ОБНОВЛЕНИЕ 2020- 17 декабря:

В образце документации, по-видимому, отсутствует конфигурация вокруг определений JobSaga, JobTypeSaga и JobAttemptSaga, а также информация о связанных репозиториях.

Мой образец выполняется на .NET Framework 4.8, что означает, что попытка имитировать образец JobConsumer не сработает, потому что MassTransit.EntityFramework зависит от .NET standard 2.1.

Перешел к MassTransit.NHibernate, для которого требуется только .NET Standard 2.0.

Все еще работаю над этим…

Комментарии:

1. Вы можете сравнить его с официальным образцом и поискать различия.

2. @ChrisPatterson Документация на сайте проекта не является «официальной»?

3. @ChrisPatterson Я имею в виду, я так и сделаю… Но я искал простейшую возможную настройку, чтобы заставить что-то работать, а затем перейти к моему текущему использованию оттуда (я перехожу от использования обходных путей), и я предполагал, что документы проекта дадут некоторые рекомендации.

4. Я думаю, я хочу сказать, что я не вижу ничего плохого в вышесказанном, поэтому я предложил попробовать образец, чтобы посмотреть, работает ли это для вас. Затем ищите различия. Вы можете отправить PR для исправления документации, если обнаружите, что они не совпадают.

5. @ChrisPatterson Я ценю разъяснение. В настоящее время я переключаю передачи, но я скоро вернусь к этому и посмотрю, что я не могу найти.