Продюсер для уже предоставленного RabbitMQ через поток Spring Cloud

#spring #spring-boot #spring-cloud #spring-cloud-stream #spring-rabbit

#весна #spring-boot #spring-cloud #spring-cloud-stream #spring-rabbit

Вопрос:

Я нахожусь в среде, где rabbitmq уже предоставлены в качестве заданной инфраструктуры.

Служба A записывает данные в очереди rabbit, а служба B считывает данные из очередей. Часть чтения в качестве потребителя через Spring Cloud Stream Binder работает как заклинание после интенсивного чтения документации и ее правильной настройки.

Однако я не могу настроить продюсера, который будет записывать в очереди кроликов.


Настройка

RabbitMQ (уже запущен и работает)

  • 1 Обмен: myExchange
  • 3 очереди: myQueueA, myQueueB, myQueueC (в разделе, связанном с myExchange)

Продюсер-сервис (вышеупомянутый сервис A)

 @Slf4j
@Component
@RequiredArgsConstructor
@EnableBinding(RabbitChannelSource.class)
public class RabbitSender
{
    private final RabbitChannelSource rabbitChannelSource;


    public void sendMessage()
    {
        Message<String> msg = MessageBuilder.withPayload("TEEEEST").build();
        rabbitChannelSource.myOutput().send(msg);
    }
  
 public interface RabbitChannelSource
{
    String MY_OUTPUT_BINDING = "my-output";

    @Output(MY_OUTPUT_BINDING)
    MessageChannel myOutput();

}
  

Я пытаюсь заставить его работать сначала для одной очереди, но в идеале я бы установил правильные свойства для всех трех очередей.

 spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=user
spring.rabbitmq.password=pass

## use existing rabbitmq via bindings
spring.cloud.stream.bindings.my-output.destination=my-output
#spring.cloud.stream.bindings.my-output.group=myQueueA
spring.cloud.stream.bindings.my-output.producer.required-groups=myQueueA
spring.cloud.stream.rabbit.bindings.my-output.producer.bind-queue=false
spring.cloud.stream.rabbit.bindings.my-output.producer.declare-exchange=false
spring.cloud.stream.rabbit.bindings.my-output.producer.queueNameGroupOnly=true

  

bind-queue=false и declare-exchange=false это необходимо, поскольку у меня есть инфраструктура rabbit.


Но, однако, я всегда получаю одно и то же исключение, я не мог понять, почему. Я имею в виду, я знаю почему, потому что нет подходящего канала для отправки сообщений. Поэтому я подозреваю, что это как-то связано с application.properties .

 org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.my-output'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[7], headers={contentType=application/json, id=98698b4c-61fa-596d-736e-f630d3ba4626, timestamp=1605105272723}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
    at de.techem.emsreceiver.rabbitmq.RabbitSender.sendMessage(RabbitSender.java:25)
    at de.techem.emsreceiver.event.TriggeredEmsImport.execute(TriggeredImport.java:95)
    at de.techem.emsreceiver.event.TriggeredEmsImport$$FastClassBySpringCGLIB$$d84f31a4.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:769)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
    at org.springframework.aop.aspectj.AspectJAfterThrowingAdvice.invoke(AspectJAfterThrowingAdvice.java:62)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
    at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:95)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
    at de.techem.emsreceiver.event.TriggeredEmsImport$$EnhancerBySpringCGLIB$$a0878aed.execute(<generated>)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:305)
    at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:190)
    at org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:153)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139)
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:403)
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:409)
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:360)
    at org.springframework.boot.context.event.EventPublishingRunListener.running(EventPublishingRunListener.java:103)
    at org.springframework.boot.SpringApplicationRunListeners.running(SpringApplicationRunListeners.java:77)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:330)
    at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:140)
    at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinderInstance(DefaultBinderFactory.java:320)
    at org.springframework.cloud.stream.binder.DefaultBinderFactory.doGetBinder(DefaultBinderFactory.java:209)
    at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinder(DefaultBinderFactory.java:140)
    at org.springframework.cloud.stream.binding.BindingService.getBinder(BindingService.java:379)
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:268)
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:291)
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindOutputs(AbstractBindableProxyFactory.java:136)
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58)
    at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57)
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:894)
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553)
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747)
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215)
    at de.techem.emsreceiver.EmsReceiverApplication.main(EmsReceiverApplication.java:21)

  

Я хочу иметь возможность записывать на myQueueA, myQueueB or myQueueC основе routingKey. Итак, если я установлю routingKey внутри своего приложения на myQA сообщение, оно должно быть отправлено на myQueueA , myQB затем на myQueueB , а myQC затем на myQueueC .

Таким образом, три разных ключа маршрутизации ведут к соответствующей очереди rabbit.

Я рад любому вводу, поскольку я пробовал много вещей, которые не привели меня к какому-либо успеху. Спасибо!

Комментарии:

1. Что такое testSending() ? Я не вижу такого метода.

2. Это sendMessage() , я просто переименовал его. Извините, что я виноват!

Ответ №1:

Возможно, вы пытаетесь использовать привязку до ее привязки? Для меня это работает нормально:

 spring.cloud.stream.bindings.output.destination=my-output
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers['routeTo']
  
 @SpringBootApplication
@EnableBinding(Source.class)
public class So64788954Application {

    public static void main(String[] args) {
        SpringApplication.run(So64788954Application.class, args);
    }

    @Autowired
    Source source;

    @Bean
    ApplicationRunner runner() {
        return args -> {
            source.output().send(MessageBuilder.withPayload("foo")
                    .setHeader("routeTo", "myQA")
                    .build());

            source.output().send(MessageBuilder.withPayload("bar")
                    .setHeader("routeTo", "myQB")
                    .build());

            source.output().send(MessageBuilder.withPayload("baz")
                    .setHeader("routeTo", "myQC")
                    .build());

        };
    }

    @RabbitListener(queues = { "myQueueA", "myQueueB", "myQueueC" } )
    void listen(String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
        System.out.println(in   ", from: "   queue);
    }

}
  
 foo, from: myQueueA
bar, from: myQueueB
baz, from: myQueueC
  

Вам не нужны обязательные группы на стороне производителя, когда вы добавляете свои собственные очереди / обмены.

The @RabbitListener` предназначен только для обработки сообщений, отправленных в 3 очереди.

Кстати, модель аннотаций устарела; теперь предпочтительнее модель функционального программирования, где мы используем a StreamBridge для вывода (и Consumer<?> или Function<?, ?> для потребления и обработки соответственно).

Вот эквивалент приведенного выше:

 spring.cloud.stream.rabbit.bindings.my-output.producer.declare-exchange=false
spring.cloud.stream.rabbit.bindings.my-output.producer.routing-key-expression=headers['routeTo']
  
 @SpringBootApplication
public class So64788954Application {

    public static void main(String[] args) {
        SpringApplication.run(So64788954Application.class, args);
    }

    @Autowired
    StreamBridge bridge;

    @Bean
    ApplicationRunner runner() {
        return args -> {
            bridge.send("my-output", MessageBuilder.withPayload("foo")
                    .setHeader("routeTo", "myQA")
                    .build());

            bridge.send("my-output", MessageBuilder.withPayload("bar")
                    .setHeader("routeTo", "myQB")
                    .build());

            bridge.send("my-output", MessageBuilder.withPayload("baz")
                    .setHeader("routeTo", "myQC")
                    .build());

        };
    }

    @RabbitListener(queues = { "myQueueA", "myQueueB", "myQueueC" } )
    void listen(String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
        System.out.println(in   ", from: "   queue);
    }

}
  

Комментарии:

1. Привет, Гэри, спасибо за ваш ответ. Я не могу проверить ваше предложение, так как возникла другая вещь. Как только я доберусь до этого, я дам вам обновление!

2. Я попробовал первую модель, я получаю ту же ошибку, что и в моем сообщении. Для новой модели я получаю другую ошибку, но при ее отладке у меня нет выходных данных, связанных с StreamBridge . Я что-то упускаю из виду? Так любопытно узнать ..

3. Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'output'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[7], headers={routeTo=CRE, id=afb2a163-c110-96a8-0ee6-acc26a3d640b, contentType=application/json, timestamp=1606302368009}] Для первого предложения.

4. Если вы не можете понять, что отличается от моего примера, опубликуйте небольшой, полный, урезанный пример, который демонстрирует это поведение, и я посмотрю; Я не могу помочь дальше, не видя реального кода / конфигурации.

Ответ №2:

После предложения Гэри я получаю ту же ошибку, что и в моем первом сообщении.

application.properties

 # --- spring cloud ---
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=username
spring.rabbitmq.password=password
## use existing rabbitmq via bindings
spring.cloud.stream.bindings.output.destination=my-output
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers['routeTo']
  

RabbitTrigger.java

 @Slf4j
@Component
@RequiredArgsConstructor
public class RabbitTrigger
{
    private final RabbitSender rabbitSender;


    @EventListener(ApplicationReadyEvent.class)
    public void execute()
    {
        rabbitSender.testSending();
    }

}
  

RabbitSender.java

 @Slf4j
@Configuration
@RequiredArgsConstructor
@EnableBinding(Source.class)
public class RabbitSender
{
    private final Source source;


    public void testSending()
    {
        Message<String> testMessage = MessageBuilder.withPayload("TEEEEST")
                                                    .setHeader("routeTo", "CRE")
                                                    .build();
        source.output().send(testMessage);
    }
  

Но я получаю то же исключение, что и в моем первоначальном сообщении. Я предполагаю, что я не учел важную часть этого?

 2020-11-25 16:27:17,116 [main] ERROR o.s.boot.SpringApplication - Application run failed
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.output'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[7], headers={routeTo=CRE, id=15479810-375f-38e8-a692-e84c6ede01d7, contentType=application/json, timestamp=1606318037108}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
    at de.techem.emsreceiver.rabbitmq.RabbitSender.testSending(RabbitSender.java:34)
    at de.techem.emsreceiver.event.RabbitTrigger.execute(RabbitTrigger.java:96)
    at de.techem.emsreceiver.event.RabbitTrigger$$FastClassBySpringCGLIB$$d84f31a4.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:769)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
    at org.springframework.aop.aspectj.AspectJAfterThrowingAdvice.invoke(AspectJAfterThrowingAdvice.java:62)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
    at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:95)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
    at de.techem.emsreceiver.event.RabbitTrigger$$EnhancerBySpringCGLIB$$d00eff49.execute(<generated>)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:305)
    at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:190)
    at org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:153)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139)
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:403)
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:409)
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:360)
    at org.springframework.boot.context.event.EventPublishingRunListener.running(EventPublishingRunListener.java:103)
    at org.springframework.boot.SpringApplicationRunListeners.running(SpringApplicationRunListeners.java:77)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:330)
    at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:140)
    at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinderInstance(DefaultBinderFactory.java:320)
    at org.springframework.cloud.stream.binder.DefaultBinderFactory.doGetBinder(DefaultBinderFactory.java:209)
    at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinder(DefaultBinderFactory.java:140)
    at org.springframework.cloud.stream.binding.BindingService.getBinder(BindingService.java:379)
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:268)
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:291)
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindOutputs(AbstractBindableProxyFactory.java:136)
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58)
    at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57)
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:894)
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553)
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747)
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215)
    at de.techem.emsreceiver.EmsReceiverApplication.main(EmsReceiverApplication.java:21)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    ... 60 common frames omitted

  

Настройка продюсера для меня намного сложнее, чем для потребительской части.