#c# #masstransit
Вопрос:
Таким образом, одно из ста (посреднических)сообщений не потребляется, и я не могу понять, почему. Сообщения, отправленные через RabbitMQ, в порядке.
Исключение:
"InnerException": {
"Type": "MassTransit.MessageNotConsumedException",
"Uri": "loopback://localhost/response",
"TargetSite": "System.Threading.Tasks.Task Send(MassTransit.ReceiveContext, GreenPipes.IPipe`1[MassTransit.ReceiveContext])",
"Message": "loopback://localhost/response => The message was not consumed",
"Data": {},
"Source": "MassTransit",
"HResult": -2146233088
}
Так что же происходит. У меня есть API, который использует как посредника, так и RabbitMQ. Все работает большую часть времени. Я регистрирую оба в классе запуска, подобном этому:
services.AddMassTransit(cfg =>
{
cfg.UsingRabbitMq(ConfigureRabbitMq);
});
services.AddMassTransitHostedService();
services.AddMediator(cfg =>
{
cfg.AddMediatorHandlers();
});
....
public void ConfigureRabbitMq(IBusRegistrationContext context, IRabbitMqBusFactoryConfigurator configurator)
{
var rabbitConfig = RabbitMqConfig.Get<RabbitMqConfiguration>();
configurator.Host(rabbitConfig.Host, rabbitConfig.VirtualHost , hfg =>
{
hfg.Password(rabbitConfig.Password);
hfg.Username(rabbitConfig.UserName);
});
}
Теперь в моем контроллере, когда я получаю запрос, я передаю его через посредника обработчику, который должен сохранить запрос в базе данных, передать его в RabbitMQ и вернуть сообщение, если запрос был сохранен/отправлен обратно контроллеру.
Вот контроллер:
private readonly IRequestClient<CreateCommand> _createProcessor;
public async Task<IActionResult> CreateManageServicesRequest([FromBody] CreateRequest request)
{
try
{
var result = await _createProcessor.GetResponse<CreateResponse>(new CreateCommand() { ApiRequest = request});
if (result.Message.Queued.HasValue)
return Ok();
else
return Accepted();
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed httpRequest for {methodName}", nameof(CreateManageServicesRequest));
return StatusCode((int)StatusCodes.Status500InternalServerError);
}
}
Вот обработчик, в котором происходит исключение:
public async Task Consume(ConsumeContext<CreateCommand> context)
{
var endpoint = await bus.GetSendEndpoint(new Uri($"exchange:{rabbitOptions.ManageServiceQueueName}"));
using (var sql = sqlFactory.Cip)
{
/*SAVE REQUEST TO DATABASE*/
sql.StartTransaction(System.Data.IsolationLevel.Serializable);
.... /* Not important for stack-overflow, just saving request to database */
sql.Commit();
/* QUEUE REQUEST*/
try
{
/*Queue via Rabbit MQ*/
await endpoint.Send(new SendCommand() { QueuedRequest = request }, context.CancellationToken);
/* Send response ---- THIS FAILS RANDOMLY ---- */
await context.RespondAsync(new CreateResponse() { Queued = DateTimeOffset.Now });
}
catch (Exception ex)
{
using (logger.BeginScope(new Dictionary<string, object>() { { "ManageServiceRequest", transactionId } }))
logger.LogError(ex, "Failed to queque manage service request");
}
}
}
Я понятия не имею, как это решить, и я не нашел никакой соответствующей информации в интернете MassTransit или Git.
Комментарии:
1. Я не вижу никакого кода MediatR в тех фрагментах, которые вы предоставили.
2. Masstransit реализует своего собственного посредника
3. Значит, это не МедиатР? В своем вопросе вы упомянули «mediatr», отсюда и путаница.
4. Достигает ли неудачный запрос значения тайм-аута по умолчанию (30 секунд)?
5. Привет, Крис, я попытаюсь воспроизвести проблему, чтобы понять, в чем проблема. Я не считал, что тайм-аут даже действителен для посредника.
Ответ №1:
Так что проблема была в масштабах. Я не добавил промежуточное программное обеспечение для получения области для контроллеров. Как написано здесь: https://masstransit-project.com/usage/mediator.html
С момента добавления AddHttpContextAccessor проблема исчезла.