Массовый — случайный ответ посредника не потребляется

#c# #masstransit

Вопрос:

Таким образом, одно из ста (посреднических)сообщений не потребляется, и я не могу понять, почему. Сообщения, отправленные через RabbitMQ, в порядке.

Исключение:

 "InnerException": {
    "Type": "MassTransit.MessageNotConsumedException",
    "Uri": "loopback://localhost/response",
    "TargetSite": "System.Threading.Tasks.Task Send(MassTransit.ReceiveContext, GreenPipes.IPipe`1[MassTransit.ReceiveContext])",
    "Message": "loopback://localhost/response => The message was not consumed",
    "Data": {},
    "Source": "MassTransit",
    "HResult": -2146233088
}
 

Так что же происходит. У меня есть API, который использует как посредника, так и RabbitMQ. Все работает большую часть времени. Я регистрирую оба в классе запуска, подобном этому:

         services.AddMassTransit(cfg =>
        {
            cfg.UsingRabbitMq(ConfigureRabbitMq);
        });

        services.AddMassTransitHostedService();

        services.AddMediator(cfg => 
        { 
            cfg.AddMediatorHandlers();
        });

        ....

    public void ConfigureRabbitMq(IBusRegistrationContext context, IRabbitMqBusFactoryConfigurator configurator)
    {
        var rabbitConfig = RabbitMqConfig.Get<RabbitMqConfiguration>();

        configurator.Host(rabbitConfig.Host, rabbitConfig.VirtualHost , hfg => 
        {
            hfg.Password(rabbitConfig.Password);
            hfg.Username(rabbitConfig.UserName);
        });
    }
 

Теперь в моем контроллере, когда я получаю запрос, я передаю его через посредника обработчику, который должен сохранить запрос в базе данных, передать его в RabbitMQ и вернуть сообщение, если запрос был сохранен/отправлен обратно контроллеру.

Вот контроллер:

     private readonly IRequestClient<CreateCommand> _createProcessor;

    public async Task<IActionResult> CreateManageServicesRequest([FromBody] CreateRequest request)
      {
        try
        {
            var result = await _createProcessor.GetResponse<CreateResponse>(new CreateCommand() { ApiRequest = request});

            if (result.Message.Queued.HasValue)
                return Ok();
            else
                return Accepted();
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed httpRequest for {methodName}", nameof(CreateManageServicesRequest));
            return StatusCode((int)StatusCodes.Status500InternalServerError);
        }
    }
 

Вот обработчик, в котором происходит исключение:

     public async Task Consume(ConsumeContext<CreateCommand> context)
    {
        var endpoint = await bus.GetSendEndpoint(new Uri($"exchange:{rabbitOptions.ManageServiceQueueName}"));

        using (var sql = sqlFactory.Cip)
        {
            /*SAVE REQUEST TO DATABASE*/
            sql.StartTransaction(System.Data.IsolationLevel.Serializable);

            .... /* Not important for stack-overflow, just saving request to database */

            sql.Commit();

            /* QUEUE REQUEST*/
            try
            {
                    /*Queue via Rabbit MQ*/
                    await endpoint.Send(new SendCommand() { QueuedRequest = request }, context.CancellationToken);

                    /* Send response ---- THIS FAILS RANDOMLY ---- */
                    await context.RespondAsync(new CreateResponse() { Queued =  DateTimeOffset.Now });
                
            }
            catch (Exception ex)
            { 
                using (logger.BeginScope(new Dictionary<string, object>() { { "ManageServiceRequest", transactionId } }))
                    logger.LogError(ex, "Failed to queque manage service request");
            }
        }
    }
 

Я понятия не имею, как это решить, и я не нашел никакой соответствующей информации в интернете MassTransit или Git.

Комментарии:

1. Я не вижу никакого кода MediatR в тех фрагментах, которые вы предоставили.

2. Masstransit реализует своего собственного посредника

3. Значит, это не МедиатР? В своем вопросе вы упомянули «mediatr», отсюда и путаница.

4. Достигает ли неудачный запрос значения тайм-аута по умолчанию (30 секунд)?

5. Привет, Крис, я попытаюсь воспроизвести проблему, чтобы понять, в чем проблема. Я не считал, что тайм-аут даже действителен для посредника.

Ответ №1:

Так что проблема была в масштабах. Я не добавил промежуточное программное обеспечение для получения области для контроллеров. Как написано здесь: https://masstransit-project.com/usage/mediator.html

С момента добавления AddHttpContextAccessor проблема исчезла.