Потребитель не работает после некоторого простоя (брокер rabbitmq)

#rabbitmq #spring-cloud-stream

Вопрос:

Я использую spring-cloud-stream и rabbitmq binder для простого тестирования для случая использования, который я собираюсь использовать в будущем.

Ниже приведен потребитель, которого я использую.

 @Bean
public Consumer<Flux<Message<RssModel>>> rssConsumer1(RssTaskHandler taskHandler) {
    return flux -> {
        flux.map(s -> s.getPayload()).doOnNext(System.out::println).subscribe();
    };
}
 

приложение.yml

 spring:
  main:
    allow-bean-definition-overriding: true
  cloud:
    stream:
      bindings:
        requester1:
          destination: rss-exchange
          group: requester
      function:
        bindings:
          rssConsumer1-in-0: requester1        
        definition: rssConsumer1
 

Класс модели:

 public class RssModel {

private String contentcategory;

private String rssfeedurl;

private String channel;

private String contentowner;

public String getContentcategory() {
    return contentcategory;
}

public void setContentcategory(String contentcategory) {
    this.contentcategory = contentcategory;
}

public String getRssfeedurl() {
    return rssfeedurl;
}

public void setRssfeedurl(String rssfeedurl) {
    this.rssfeedurl = rssfeedurl;
}

public String getChannel() {
    return channel;
}

public void setChannel(String channel) {
    this.channel = channel;
}

public String getContentowner() {
    return contentowner;
}

public void setContentowner(String contentowner) {
    this.contentowner = contentowner;
}

@Override
public String toString() {
    return "RssModel [contentcategory="   contentcategory   ", rssfeedurl="   rssfeedurl   ", channel="   channel
              ", contentowner="   contentowner   "]";
}
 

}

Полезная нагрузка, которую я отправляю из графического интерфейса rabbitmq в очереди:

 {"contentCategory": "", "rssFeedUrl": "http://somevalidrssurl/rss.xml","channel": "016179", "contentOwner": "RTES"}
 

Здесь все работает, и я получаю модель, которая отправляется с rabbit-mq на выходе.

Проблема После некоторого простоя я получаю исключение ниже:

  2021-09-16 18:11:32.575 ERROR 19732 --- [nge.requester-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'worker-stream.requester1'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[151], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=rss-exchange.requester, amqp_receivedExchange=, amqp_deliveryTag=3, deliveryAttempt=3, amqp_consumerQueue=rss-exchange.requester, amqp_redelivered=false, id=dd6addcb-4b18-c853-6f96-8e7af8d953a4, amqp_consumerTag=amq.ctag-dYcS5GwSXV5qrqZtcIF29A, sourceData=(Body:'[B@222405ac(byte[151])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=, receivedRoutingKey=rss-exchange.requester, deliveryTag=3, consumerTag=amq.ctag-dYcS5GwSXV5qrqZtcIF29A, consumerQueue=rss-exchange.requester]), contentType=application/json, timestamp=1631796089561}], failedMessage=GenericMessage [payload=byte[151], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=rss-exchange.requester, amqp_receivedExchange=, amqp_deliveryTag=3, deliveryAttempt=3, amqp_consumerQueue=rss-exchange.requester, amqp_redelivered=false, id=dd6addcb-4b18-c853-6f96-8e7af8d953a4, amqp_consumerTag=amq.ctag-dYcS5GwSXV5qrqZtcIF29A, sourceData=(Body:'[B@222405ac(byte[151])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=, receivedRoutingKey=rss-exchange.requester, deliveryTag=3, consumerTag=amq.ctag-dYcS5GwSXV5qrqZtcIF29A, consumerQueue=rss-exchange.requester]), contentType=application/json, timestamp=1631796089561}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:66)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:308)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:304)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[151], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=rss-exchange.requester, amqp_receivedExchange=, amqp_deliveryTag=3, deliveryAttempt=3, amqp_consumerQueue=rss-exchange.requester, amqp_redelivered=false, id=dd6addcb-4b18-c853-6f96-8e7af8d953a4, amqp_consumerTag=amq.ctag-dYcS5GwSXV5qrqZtcIF29A, sourceData=(Body:'[B@222405ac(byte[151])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=, receivedRoutingKey=rss-exchange.requester, deliveryTag=3, consumerTag=amq.ctag-dYcS5GwSXV5qrqZtcIF29A, consumerQueue=rss-exchange.requester]), contentType=application/json, timestamp=1631796089561}]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    ... 23 more
 

После этого потребитель останавливается и не получает никакого сообщения, но выдает то же исключение.

Я почти уверен, что это как-то связано с реактивностью в потребителе. Поскольку ни у одного реактивного потребителя этой проблемы нет.

Комментарии:

1. Это довольно странно, и предоставленная трассировка стека является своего рода постфактумной ошибкой. До этого произошло нечто, что завершило подписку. Я не знаю, что это может быть, так как у меня нет возможности воспроизвести это. Можете ли вы создать минимальный проект, который воспроизводит его, и перенести его на Github, чтобы мы могли взглянуть?

2. Конечно @ОлегЖураковский. Поделюсь проектом после его создания.

3. Привет, @OlegZhurakousky, проект добавлен на GitHub «github.com/Mhsh/spring-cloud-stream-reactive-app.git» > Кроме того, я думаю, что сузил круг проблем. 1) Запустите приложение и отправьте действительный JSON, указанный в приведенном выше вопросе. 2) Это работает все время 3) Передайте неверную полезную нагрузку (неверный текст JSON). Здесь будут возникать некоторые исключения, в основном приведение класса или что-то в этом роде. 4) Теперь попробуйте передать действительный JSON, это воспроизведет вышеупомянутую проблему. Таким образом, неработающая программа не является причиной, по-видимому, но исключение в потоке вызывает описанное выше поведение, в которое я верю.

4. Привет @ОлегЖураковский. у вас была возможность взглянуть на проблему?