Как получить ответ через Masstransit?

#rabbitmq #masstransit

#кролик #масстрансит

Вопрос:

Я хочу получить ответ (GetStatusResponse) от потребителя (GetStatusConsumer).

Мой запрос помещается в очередь кролика «getstatus», но мой потребитель не поднимается, и происходит тайм-аут.

Метод публикации и потребитель, вложенные в один проект

Мне кажется, проблема в Startup.cs. Не могли бы вы мне помочь?

У меня есть следующий код в Startup.cs

 ... services.AddSingleton(provider =gt;  {  var getStatusBusOptions = provider.GetRequiredServicelt;IOptionslt;BusOptionsgt;gt;().Value;  var bus = Bus.Factory.CreateUsingRabbitMq(sbc =gt;  {  var host = sbc.Host(new Uri(getStatusBusOptions.HostUri), h =gt;  {  h.Username(getStatusBusOptions.UserName);  h.Password(getStatusBusOptions.Password);  });  sbc.ReceiveEndpoint("getstatus", ep =gt;  {  ep.Consumerlt;GetStatusConsumergt;(provider);  ep.PrefetchCount = getStatusBusOptions.PrefetchCount;  ep.UseConcurrencyLimit(getStatusBusOptions.UseConcurrencyLimit);  });   });    return new GetStatusBus  {   Bus = bus,  HostUri = getStatusBusOptions.HostUri  };  }); ...  

Следующий код в классе GetStatusPublisher.cs

 public class GetStatusPublisher : IGetStatusPublisher {  readonly GetStatusBus _bus;   public GetStatusPublisher(GetStatusBus bus)  {  _bus = bus;  }   public async Tasklt;Toutgt; GetResponselt;Tin, Toutgt;(Tin request) where Tin : class where Tout : class  {  var serviceAddress = new Uri($"rabbitmq://rabbitmq.test.com/jgt/getstatus");   var timeout = TimeSpan.FromSeconds(30);    var client = new MessageRequestClientlt;Tin, Toutgt;(_bus.Bus, serviceAddress, timeout);   var resp = await client.Request(request); // lt;== Timeout here and don't rise consumer (GetStatusConsumer)   return resp;  }  

Вот метод публикации:

 ...  readonly IGetStatusPublisher _getStatusPublisher; ...  var resp = await _getStatusPublisher.GetResponselt;GetStatusRequest, GetStatusResponsegt;(statusReq);  

Потребитель имеет следующий код:

 public class GetStatusConsumer : MetricWriter, IConsumerlt;GetStatusRequestgt; {  public GetStatusConsumer(IMetrics metrics) : base(metrics)  { ......  }   public async Task Consume(ConsumeContextlt;GetStatusRequestgt; context)  { ....  }  }  

Ответ №1:

Перво-наперво, я не думаю, что ты собираешься садиться в автобус. Это главная проблема.

Однако…

Вы используете старинную версию MassTransit? Ваш код серьезно устарел, и я бы предложил обновить его, чтобы использовать текущую версию/синтаксис, поскольку в приведенном выше примере кода так много ошибок.

 services.AddMassTransit(x =gt; {  x.AddConsumerlt;GetStatusConsumergt;();   x.UsingRabbitMq((context, cfg) =gt;   {  var getStatusBusOptions = context.GetRequiredServicelt;IOptionslt;BusOptionsgt;gt;().Value;   cfg.Host(new Uri(getStatusBusOptions.HostUri), h =gt;  {  h.Username(getStatusBusOptions.UserName);  h.Password(getStatusBusOptions.Password);  });   cfg.PrefetchCount = getStatusBusOptions.PrefetchCount;  cfg.ConcurrentMessageLimit = getStatusBusOptions.UseConcurrencyLimit;   cfg.ConfigureEndpoints(context);  }); }); services.AddMassTransitHostedService(); services.AddGenericRequestClient();  

Затем вы можете просто добавить зависимость от IRequestClientlt;Tgt; отправки запросов и получения ответов. Ваш обновленный код издателя может выглядеть следующим образом:

 public class GetStatusPublisher :   IGetStatusPublisher {  public GetStatusPublisher(IServiceProvider provider)  {  _provider = provider;  }   public async Tasklt;Toutgt; GetResponselt;Tin, Toutgt;(Tin request) where Tin : class where Tout : class  {  var client = _provider.GetRequiredServicelt;IRequestClientlt;Tingt;gt;();   var response = await client.GetResponselt;Toutgt;(request);   return response.Message;  } }  

Ответ №2:

Если я использую следующий код, я получаю ошибку: «Система.Исключение InvalidOperationException: Не удается разрешить массив службы с областью действия.IRequestClient`1[GetStatusTry.Контракты.GetStatusRequest]’ от корневого поставщика…..»

 public async Tasklt;Toutgt; GetResponselt;Tin, Toutgt;(Tin request) where Tin : class where Tout : class  {  try  {  var client = _provider.GetRequiredServicelt;IRequestClientlt;Tingt;gt;(); // lt;lt; --- here is error    var response = await client.GetResponselt;Toutgt;(request);   return response.Message;  }  catch (Exception ex)  {  Console.WriteLine(ex.Message);  throw;  }   }  

Но этот код работает нормально:

общедоступный класс Dev2Controller : ControllerBase {

 IRequestClientlt;GetStatusRequestgt; _client;  public Dev2Controller(IGetStatusPublisher getStatusPublisher, IRequestClientlt;GetStatusRequestgt; client) {  _client = client; }  [HttpPost] public async Tasklt;GetStatusResponsegt; GetStatus2() {  var req = new GetStatusRequest { Statuses = new Listlt;intgt; { 1, 2, 3 }, TerminalDescr = "try to get status2" };   var response = await _client.GetResponselt;GetStatusResponsegt;(req);   return (response.Message); }  

}

Ответ №3:

этот код работает

 public class GetStatusPublisher : IGetStatusPublisher {  readonly IServiceProvider _provider;   public GetStatusPublisher(IServiceProvider provider)  {  _provider = provider;  }   public async Tasklt;Toutgt; GetResponselt;Tin, Toutgt;(Tin request) where Tin : class where Tout : class  {  try  {  using (var _scope = _provider.CreateScope())  {  var client = _scope.ServiceProvider.GetRequiredServicelt;IRequestClientlt;Tingt;gt;();   var response = await client.GetResponselt;Toutgt;(request);   return response.Message;  }   }  catch (Exception ex)  {  Console.WriteLine(ex.Message);  throw;  }   } }