Весенний облачный автобус — java.lang.Исключение ClassCastException: java.lang.Строка не может быть передана в org.springframework.cloud.bus.event.RemoteApplicationEvent

#spring-boot #spring-cloud #spring-cloud-bus

Вопрос:

Я использую конфигурацию spring cloud для изменения значения свойств во время выполнения. После обновления spring boot до версии 2.4.3 и spring cloud до 2020.0.3 я столкнулся со следующей проблемой:

 org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@3cb2d5ae]; nested exception is java.lang.ClassCastException: java.lang.String cannot be cast to org.springframework.cloud.bus.event.RemoteApplicationEvent, failedMessage=GenericMessage [payload={"type":"AckRemoteApplicationEvent","timestamp":1624620780241,"originService":"configuration-server:8888:6306985b5ae768fa51f4315ad800e3cd","destinationService":"**","id":"a34246ff-e232-44db-ae1b-4b9947ee7b4c","ackId":"9de043fa-1e14-4119-a7c9-49949ad83102","ackDestinationService":"*:**","event":"org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent"}, headers={deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=springCloudBus, b3=a23524f08878776b-eac41587dcfb4202-1, nativeHeaders={b3=[a23524f08878776b-eac41587dcfb4202-1]}, skip-input-type-conversion=true, kafka_offset=113, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@38b53c4f, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTimestamp=1624620780242, kafka_groupId=sdpCrmData-1}]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2087)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2072)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1971)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1898)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1786)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1505)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1164)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1059)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@3cb2d5ae]; nested exception is java.lang.ClassCastException: java.lang.String cannot be cast to org.springframework.cloud.bus.event.RemoteApplicationEvent
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:396)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:78)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:455)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:429)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2039)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2021)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1958)
    ... 8 common frames omitted
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.springframework.cloud.bus.event.RemoteApplicationEvent
    at org.springframework.cloud.bus.BusConsumer.accept(BusConsumer.java:31)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeConsumer(SimpleFunctionRegistry.java:854)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:643)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:489)
    at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:77)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:727)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:560)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    ... 31 common frames omitted 
 

Я обнаружил, что если я понижу рейтинг spring.cloud.bus до версии 2.2.4.RELEASE, то все будет работать правильно.

свойства кафки:

 spring.cloud.stream.kafka.bindings.springCloudBusInput.consumer.configuration.key.deserializer=org.apache.kafka  .common.serialization.ByteArraySerializer
spring.cloud.stream.kafka.bindings.springCloudBusInput.consumer.configuration.value.deserializer=org.apache.kafka.common.serialization.ByteArraySerializer
 

Обратите внимание, что если я изменю ByteArraySerializer на StringSerializer, то я получу:

 Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.springframework.cloud.bus.event.RemoteApplicationEvent