#rabbitmq #spring-cloud-stream
Вопрос:
Я использую spring-cloud-stream и rabbitmq binder для простого тестирования для случая использования, который я собираюсь использовать в будущем.
Ниже приведен потребитель, которого я использую.
@Bean
public Consumer<Flux<Message<RssModel>>> rssConsumer1(RssTaskHandler taskHandler) {
return flux -> {
flux.map(s -> s.getPayload()).doOnNext(System.out::println).subscribe();
};
}
приложение.yml
spring:
main:
allow-bean-definition-overriding: true
cloud:
stream:
bindings:
requester1:
destination: rss-exchange
group: requester
function:
bindings:
rssConsumer1-in-0: requester1
definition: rssConsumer1
Класс модели:
public class RssModel {
private String contentcategory;
private String rssfeedurl;
private String channel;
private String contentowner;
public String getContentcategory() {
return contentcategory;
}
public void setContentcategory(String contentcategory) {
this.contentcategory = contentcategory;
}
public String getRssfeedurl() {
return rssfeedurl;
}
public void setRssfeedurl(String rssfeedurl) {
this.rssfeedurl = rssfeedurl;
}
public String getChannel() {
return channel;
}
public void setChannel(String channel) {
this.channel = channel;
}
public String getContentowner() {
return contentowner;
}
public void setContentowner(String contentowner) {
this.contentowner = contentowner;
}
@Override
public String toString() {
return "RssModel [contentcategory=" contentcategory ", rssfeedurl=" rssfeedurl ", channel=" channel
", contentowner=" contentowner "]";
}
}
Полезная нагрузка, которую я отправляю из графического интерфейса rabbitmq в очереди:
{"contentCategory": "", "rssFeedUrl": "http://somevalidrssurl/rss.xml","channel": "016179", "contentOwner": "RTES"}
Здесь все работает, и я получаю модель, которая отправляется с rabbit-mq на выходе.
Проблема После некоторого простоя я получаю исключение ниже:
2021-09-16 18:11:32.575 ERROR 19732 --- [nge.requester-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'worker-stream.requester1'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[151], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=rss-exchange.requester, amqp_receivedExchange=, amqp_deliveryTag=3, deliveryAttempt=3, amqp_consumerQueue=rss-exchange.requester, amqp_redelivered=false, id=dd6addcb-4b18-c853-6f96-8e7af8d953a4, amqp_consumerTag=amq.ctag-dYcS5GwSXV5qrqZtcIF29A, sourceData=(Body:'[B@222405ac(byte[151])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=, receivedRoutingKey=rss-exchange.requester, deliveryTag=3, consumerTag=amq.ctag-dYcS5GwSXV5qrqZtcIF29A, consumerQueue=rss-exchange.requester]), contentType=application/json, timestamp=1631796089561}], failedMessage=GenericMessage [payload=byte[151], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=rss-exchange.requester, amqp_receivedExchange=, amqp_deliveryTag=3, deliveryAttempt=3, amqp_consumerQueue=rss-exchange.requester, amqp_redelivered=false, id=dd6addcb-4b18-c853-6f96-8e7af8d953a4, amqp_consumerTag=amq.ctag-dYcS5GwSXV5qrqZtcIF29A, sourceData=(Body:'[B@222405ac(byte[151])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=, receivedRoutingKey=rss-exchange.requester, deliveryTag=3, consumerTag=amq.ctag-dYcS5GwSXV5qrqZtcIF29A, consumerQueue=rss-exchange.requester]), contentType=application/json, timestamp=1631796089561}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:66)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:308)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:304)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[151], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=rss-exchange.requester, amqp_receivedExchange=, amqp_deliveryTag=3, deliveryAttempt=3, amqp_consumerQueue=rss-exchange.requester, amqp_redelivered=false, id=dd6addcb-4b18-c853-6f96-8e7af8d953a4, amqp_consumerTag=amq.ctag-dYcS5GwSXV5qrqZtcIF29A, sourceData=(Body:'[B@222405ac(byte[151])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=, receivedRoutingKey=rss-exchange.requester, deliveryTag=3, consumerTag=amq.ctag-dYcS5GwSXV5qrqZtcIF29A, consumerQueue=rss-exchange.requester]), contentType=application/json, timestamp=1631796089561}]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
... 23 more
После этого потребитель останавливается и не получает никакого сообщения, но выдает то же исключение.
Я почти уверен, что это как-то связано с реактивностью в потребителе. Поскольку ни у одного реактивного потребителя этой проблемы нет.
Комментарии:
1. Это довольно странно, и предоставленная трассировка стека является своего рода постфактумной ошибкой. До этого произошло нечто, что завершило подписку. Я не знаю, что это может быть, так как у меня нет возможности воспроизвести это. Можете ли вы создать минимальный проект, который воспроизводит его, и перенести его на Github, чтобы мы могли взглянуть?
2. Конечно @ОлегЖураковский. Поделюсь проектом после его создания.
3. Привет, @OlegZhurakousky, проект добавлен на GitHub «github.com/Mhsh/spring-cloud-stream-reactive-app.git» > Кроме того, я думаю, что сузил круг проблем. 1) Запустите приложение и отправьте действительный JSON, указанный в приведенном выше вопросе. 2) Это работает все время 3) Передайте неверную полезную нагрузку (неверный текст JSON). Здесь будут возникать некоторые исключения, в основном приведение класса или что-то в этом роде. 4) Теперь попробуйте передать действительный JSON, это воспроизведет вышеупомянутую проблему. Таким образом, неработающая программа не является причиной, по-видимому, но исключение в потоке вызывает описанное выше поведение, в которое я верю.
4. Привет @ОлегЖураковский. у вас была возможность взглянуть на проблему?