Как правильно создать потребителя для интеграционного теста, чтобы проверить, было ли сообщение отправлено кафке?

#java #spring-boot #apache-kafka #spring-cloud #spring-cloud-stream

Вопрос:

Я использую avro для создания класса java (Сердцебиение), и я использую процессор обмена сообщениями Spring cloud, чтобы отправить сообщение кафке, используя этот класс сердцебиения.

Итак, вот моя услуга:

 @Service
public class HeartbeatServiceImpl implements HeartbeatService {

  private Processor processor;

  public HeartbeatServiceImpl(Processor processor) {
    this.processor = processor;
  }

  @Override
  public boolean sendHeartbeat(Heartbeat heartbeat) {
    Message<Heartbeat> message =
        MessageBuilder.withPayload(heartbeat).setHeader(KafkaHeaders.MESSAGE_KEY, "MY_KEY").build();
    return processor.output().send(message);
  }

}
 

У меня есть этот потребитель в моем тестовом пакете:

 @Component
public class HeartbeatKafkaConsumer {


  private CountDownLatch latch = new CountDownLatch(1);
  private String payload = null;

  @KafkaListener(topics = "HEARTBEAT")
  public void receive(ConsumerRecord<?, ?> consumerRecord) {
    this.payload = consumerRecord.toString();
    this.latch.countDown();
  }


  public CountDownLatch getLatch() {
    return latch;
  }

  public Object getPayload() {
    return payload;
  }

}
 

Теперь на моем реальном тестовом занятии у меня есть это:

 public class HeartbeatServiceImplIntegrationTest {

  @Autowired
  private HeartbeatServiceImpl heartbeatService;

  @Autowired
  private HeartbeatKafkaConsumer heartbeatKafkaConsumer;

  @Test
  public void assertHeartbeatPushedToKafka() throws InterruptedException {
    Heartbeat heartbeat =
        Heartbeat.newBuilder().setID("my-Test ID").setINPUTSOURCE("my-Test IS")
            .setMSGID("my-Test 123").setMSGTIME(12345l).setRECEIVEDTIME(12345l).build();

    boolean isMessageSent = heartbeatService.sendHeartbeat(heartbeat);
    assertThat(isMessageSent).isTrue();

    heartbeatKafkaConsumer.getLatch().await(10000, TimeUnit.MILLISECONDS);

    assertThat(heartbeatKafkaConsumer.getLatch().getCount()).isEqualTo(0L);
    assertThat(heartbeatKafkaConsumer.getPayload()).isEqualTo(heartbeat);

  }

}
 

Я вижу, что сообщение действительно приходит на кафку, запустив ksql. Таким образом, сообщение там, как и ожидалось.
Я также получаю сообщение от своего пользователя heartbeatKafkaConsumer, но когда я делаю утверждение, я получаю эту ошибку:

 Expecting:
 <"ConsumerRecord(topic = HEARTBEAT, partition = 0, leaderEpoch = 0, offset = 4, CreateTime = 1622134829899, serialized key size = 12, serialized value size = 75, headers = RecordHeaders(headers = [], isReadOnly = false), key = [B@765aa560, value = [B@3582e1cd)">
to be equal to:
 <{"ID": "my-Test ID", "TYPE": "null", "MSG_ID": "my-Test 123", "MSG_TIME": 12345, "RECEIVED_TIME": 12345, "INPUT_SOURCE": "my-Test IS", "SBK_FEED_PROVIDER_ID": "null", "SBK_FEED_PROVIDER_NAME": "null"}>
 

Теперь я пытался читать по-разному с моего HeartbeatKafkaConsumer, но я просто не мог правильно проанализировать значение до сердцебиения.

Как мне использовать кафку в качестве сердцебиения, чтобы я мог проверить его на соответствие первоначально отправленному сообщению? Я даже не могу получить его в виде строки.

О, и вот моя конфигурация applicaiton.properties для кафки:

 spring.cloud.stream.default.producer.useNativeEncoding=true
spring.cloud.stream.default.consumer.useNativeEncoding=true
spring.cloud.stream.bindings.input.destination=HEARTBEAT
spring.cloud.stream.bindings.input.content-type=application/* avro
spring.cloud.stream.bindings.output.destination=HEARTBEAT
spring.cloud.stream.bindings.output.content-type=application/* avro
spring.cloud.stream.kafka.binder.producer-properties.schema.registry.url=http://localhost:8081
spring.cloud.stream.kafka.binder.producer-properties.key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.cloud.stream.kafka.binder.producer-properties.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.cloud.stream.kafka.binder.consumer-properties.schema.registry.url=http://localhost:8081
spring.cloud.stream.kafka.binder.consumer-properties.key.serializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.cloud.stream.kafka.binder.consumer-properties.value.serializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.cloud.stream.kafka.binder.consumer-properties.specific.avro.reader=true
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.group-id=myclient
spring.kafka.consumer.auto-offset-reset=earliest
 

Ответ №1:

Используйте consumerRecord.value() вместо toString.

Это будет a byte[] , который вы можете передать в ObjectMapper, чтобы десериализовать его как a Heartbeat .

Или просто настройте потребителя на использование JsonDeserializer и consumerRecord.value() будет сердцебиение. Вам нужно будет настроить десериализатор, чтобы указать ему, какой тип создавать.

Третий (и самый простой) вариант-добавить JsonMessageConverter @Bean (загрузка передаст его в контейнер прослушивателя) и изменить свой метод на

   @KafkaListener(topics = "HEARTBEAT")
  public void receive(Heartbeat heartbeat) {
      ...
  }
 

Структура сообщает преобразователю, какой тип следует создать, исходя из подписи метода.

https://docs.spring.io/spring-kafka/docs/current/reference/html/#messaging-message-conversion