Получаю java.lang.Исключение ClassCastException: [B не может быть приведено к org.springframework.messaging.Исключение сообщения после использования пакета

#java #spring #apache-kafka #spring-cloud-stream #spring-cloud-stream-binder-kafka

Вопрос:

Я использую spring-cloud-stream-kafka-binder-3.0.4 для пакетного использования сообщений после их использования, преобразования сообщения в объект, но получения вышеуказанного исключения.

Вот код:

 @StreamListener(ActivityChannel.ACTIVITY_INPUT_CHANNEL)
public void handleActivity(List<Message<Event>> messages,
    @Header(name = "deliveryAttempt", defaultValue = "1") int deliveryAttempt,
    @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment)  
{
    try
    {
        log.info("Received activity message with message length {} attempt {}",
              messages.size(), deliveryAttempt);
        List<Event> eventList = messages.stream().map(Message::getPayload).collect(Collectors.toList());
        nodeConfigActivityBatchProcessor.processNodeConfigActivity(eventList);
        acknowledgment.acknowledge();
        log.debug("Processed activity message {} successfully!!", messages.size());
    } 
    catch (Exception e)
    { 
        throw e;
    }
}
 

Конфигурация:

 spring.cloud.stream.bindings.activity-input-channel.destination=TOPIC.FEED.NAME
spring.cloud.stream.bindings.activity-input-channel.contentType=application/json
spring.cloud.stream.bindings.activity-input-channel.consumer.batch-mode=true
spring.cloud.stream.bindings.activity-input-channel.consumer.max-attempts=1
spring.cloud.stream.kafka.bindings.activity-input-channel.consumer.auto-commit-offset=false
spring.cloud.stream.kafka.bindings.activity-input-channel.consumer.reset-offsets=true
spring.cloud.stream.kafka.bindings.activity-input-channel.consumer.start-offset=latest
spring.kafka.consumer.max-poll-records=5
spring.kafka.consumer.fetch-max-wait=60000
spring.kafka.consumer.fetch-min-size=500
 

Я получаю вышеуказанную ошибку в этой строке List<Event> eventList = messages.stream().map(Message::getPayload).collect(Collectors.toList()); по адресу .collect(Коллекторы.тоЛист()). Я не могу понять, почему??

если я проверю Message<Event> eventMessage = messages.get(0) получение того же исключения(сообщения-это список переменных сообщений).

если пакетный режим, если false, то он использует только одно сообщение(сообщение сообщения), то он работает нормально, без исключений.

Нужно ли добавлять какой-либо десериализатор при использовании пакетного режима???

Комментарии:

1. это сообщение означает, что вы пытаетесь передать экземпляр B экземпляру сообщения, но между ними нет связи «ЕСТЬ-А», поэтому это не удается.

2. Я публикую сообщения производителю кафки, и этот слушатель потребляет их в пакете из 5, но после этого я не могу получить первый элемент из полученного им списка (который имеет формат сообщения). Точно так же, если пакетный режим имеет значение false, то он использует только одно сообщение handleActivity(сообщение<Событие> сообщение), тогда он работает нормально. что такое тип B , я не понимаю

3. Я тоже не знаю, что такое B, но это тип, упомянутый в вашем сообщении об ошибке

Ответ №1:

Мне удалось устранить это исключение, добавив десериализатор.

Итак, ниже приведен мой пакетный прослушиватель, вместо того, чтобы потреблять List<Message<Event>> messages , как указано в вопросе, потреблять List<Event> messages .

 @StreamListener(ActivityChannel.ACTIVITY_INPUT_CHANNEL)
 public void handleActivity(List<Event> messages,@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment)  
 {
  try
  {
    log.info("Received activity message with message length {} attempt 
    {}",messages.size(), deliveryAttempt);        
   nodeConfigActivityBatchProcessor.processNodeConfigActivity(messages);
    acknowledgment.acknowledge();
    log.debug("Processed activity message {} successfully!!", 
   messages.size());
 } 
 catch (Exception e)
  { 
      throw e;
  }
}
 

Добавлен десериализатор ниже

 public class EventDeserializer extends JsonDeserializer<Event> {
}
 

Добавлено значение ниже.свойство десериализатора в файл свойств.

 spring.cloud.stream.kafka.bindings.input-channel.consumer.configuration.value.deserializer=com.sample.messaging.util.EventDeserializer
 

У меня есть список событий в моем пакетном прослушивателе.