#rabbitmq #masstransit
#кролик #масстрансит
Вопрос:
Я хочу получить ответ (GetStatusResponse) от потребителя (GetStatusConsumer).
Мой запрос помещается в очередь кролика «getstatus», но мой потребитель не поднимается, и происходит тайм-аут.
Метод публикации и потребитель, вложенные в один проект
Мне кажется, проблема в Startup.cs. Не могли бы вы мне помочь?
У меня есть следующий код в Startup.cs
... services.AddSingleton(provider =gt; { var getStatusBusOptions = provider.GetRequiredServicelt;IOptionslt;BusOptionsgt;gt;().Value; var bus = Bus.Factory.CreateUsingRabbitMq(sbc =gt; { var host = sbc.Host(new Uri(getStatusBusOptions.HostUri), h =gt; { h.Username(getStatusBusOptions.UserName); h.Password(getStatusBusOptions.Password); }); sbc.ReceiveEndpoint("getstatus", ep =gt; { ep.Consumerlt;GetStatusConsumergt;(provider); ep.PrefetchCount = getStatusBusOptions.PrefetchCount; ep.UseConcurrencyLimit(getStatusBusOptions.UseConcurrencyLimit); }); }); return new GetStatusBus { Bus = bus, HostUri = getStatusBusOptions.HostUri }; }); ...
Следующий код в классе GetStatusPublisher.cs
public class GetStatusPublisher : IGetStatusPublisher { readonly GetStatusBus _bus; public GetStatusPublisher(GetStatusBus bus) { _bus = bus; } public async Tasklt;Toutgt; GetResponselt;Tin, Toutgt;(Tin request) where Tin : class where Tout : class { var serviceAddress = new Uri($"rabbitmq://rabbitmq.test.com/jgt/getstatus"); var timeout = TimeSpan.FromSeconds(30); var client = new MessageRequestClientlt;Tin, Toutgt;(_bus.Bus, serviceAddress, timeout); var resp = await client.Request(request); // lt;== Timeout here and don't rise consumer (GetStatusConsumer) return resp; }
Вот метод публикации:
... readonly IGetStatusPublisher _getStatusPublisher; ... var resp = await _getStatusPublisher.GetResponselt;GetStatusRequest, GetStatusResponsegt;(statusReq);
Потребитель имеет следующий код:
public class GetStatusConsumer : MetricWriter, IConsumerlt;GetStatusRequestgt; { public GetStatusConsumer(IMetrics metrics) : base(metrics) { ...... } public async Task Consume(ConsumeContextlt;GetStatusRequestgt; context) { .... } }
Ответ №1:
Перво-наперво, я не думаю, что ты собираешься садиться в автобус. Это главная проблема.
Однако…
Вы используете старинную версию MassTransit? Ваш код серьезно устарел, и я бы предложил обновить его, чтобы использовать текущую версию/синтаксис, поскольку в приведенном выше примере кода так много ошибок.
services.AddMassTransit(x =gt; { x.AddConsumerlt;GetStatusConsumergt;(); x.UsingRabbitMq((context, cfg) =gt; { var getStatusBusOptions = context.GetRequiredServicelt;IOptionslt;BusOptionsgt;gt;().Value; cfg.Host(new Uri(getStatusBusOptions.HostUri), h =gt; { h.Username(getStatusBusOptions.UserName); h.Password(getStatusBusOptions.Password); }); cfg.PrefetchCount = getStatusBusOptions.PrefetchCount; cfg.ConcurrentMessageLimit = getStatusBusOptions.UseConcurrencyLimit; cfg.ConfigureEndpoints(context); }); }); services.AddMassTransitHostedService(); services.AddGenericRequestClient();
Затем вы можете просто добавить зависимость от IRequestClientlt;Tgt;
отправки запросов и получения ответов. Ваш обновленный код издателя может выглядеть следующим образом:
public class GetStatusPublisher : IGetStatusPublisher { public GetStatusPublisher(IServiceProvider provider) { _provider = provider; } public async Tasklt;Toutgt; GetResponselt;Tin, Toutgt;(Tin request) where Tin : class where Tout : class { var client = _provider.GetRequiredServicelt;IRequestClientlt;Tingt;gt;(); var response = await client.GetResponselt;Toutgt;(request); return response.Message; } }
Ответ №2:
Если я использую следующий код, я получаю ошибку: «Система.Исключение InvalidOperationException: Не удается разрешить массив службы с областью действия.IRequestClient`1[GetStatusTry.Контракты.GetStatusRequest]’ от корневого поставщика…..»
public async Tasklt;Toutgt; GetResponselt;Tin, Toutgt;(Tin request) where Tin : class where Tout : class { try { var client = _provider.GetRequiredServicelt;IRequestClientlt;Tingt;gt;(); // lt;lt; --- here is error var response = await client.GetResponselt;Toutgt;(request); return response.Message; } catch (Exception ex) { Console.WriteLine(ex.Message); throw; } }
Но этот код работает нормально:
общедоступный класс Dev2Controller : ControllerBase {
IRequestClientlt;GetStatusRequestgt; _client; public Dev2Controller(IGetStatusPublisher getStatusPublisher, IRequestClientlt;GetStatusRequestgt; client) { _client = client; } [HttpPost] public async Tasklt;GetStatusResponsegt; GetStatus2() { var req = new GetStatusRequest { Statuses = new Listlt;intgt; { 1, 2, 3 }, TerminalDescr = "try to get status2" }; var response = await _client.GetResponselt;GetStatusResponsegt;(req); return (response.Message); }
}
Ответ №3:
этот код работает
public class GetStatusPublisher : IGetStatusPublisher { readonly IServiceProvider _provider; public GetStatusPublisher(IServiceProvider provider) { _provider = provider; } public async Tasklt;Toutgt; GetResponselt;Tin, Toutgt;(Tin request) where Tin : class where Tout : class { try { using (var _scope = _provider.CreateScope()) { var client = _scope.ServiceProvider.GetRequiredServicelt;IRequestClientlt;Tingt;gt;(); var response = await client.GetResponselt;Toutgt;(request); return response.Message; } } catch (Exception ex) { Console.WriteLine(ex.Message); throw; } } }