Использование встроенного кафкаброкера с весенней интеграцией и весенней кафкой

#spring-integration #spring-kafka

Вопрос:

Я хочу использовать EmbeddedKafkaBroker для тестирования своего потока , который включает KafkaMessageDrivenChannelAdapter ,
похоже , что потребитель начинает правильно, подписался на тему, но обработчик не запускается после отправки сообщения EmbeddedKafkaBroker .

 @SpringBootTest(properties = {"...."},  classes = {....class})
@EmbeddedKafka
class IntTests {
 @BeforeAll
 static void setup() {
    embeddedKafka = new EmbeddedKafkaBroker(1, true, TOPIC);
    embeddedKafka.kafkaPorts(57412);
    embeddedKafka.afterPropertiesSet();
 }

 @Test
 void testit() throws InterruptedException {
    String ip = embeddedKafka.getBrokersAsString();
    Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafka));
    Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();
        // Act
    producer.send(new ProducerRecord<>(TOPIC, "key", "{"name":"Test"}"));
    producer.flush();

    ....

}
...
}
 

И основной класс:

     @Configuration
    public class Kafka {

   
    
     @Bean
     public KafkaMessageDrivenChannelAdapter<String, String> adapter(KafkaMessageListenerContainer<String, String> container) {
        KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =..
        kafkaMessageDrivenChannelAdapter.setOutputChannelName("kafkaChannel");

     }
    
     @Bean
     public KafkaMessageListenerContainer<String, String> container() {
        ContainerProperties properties = new ContainerProperties(TOPIC);        
        KafkaMessageListenerContainer<String, String> kafkaContainer = ...;
        return kafkaContainer;
     }
    
    
     @Bean
     public ConsumerFactory<String, String> consumerFactory() {
       Map<String, Object> props = new HashMap<>();
       props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:57412");
       props.put(ConsumerConfig.GROUP_ID_CONFIG, "group12");
    
       ...
           
       return new DefaultKafkaConsumerFactory<>(props); 
     }
    

    @Bean
    public PublishSubscribeChannel kafkaChannel() {
        return new PublishSubscribeChannel ();
    }

   @Bean
    @ServiceActivator(inputChannel = "kafkaChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                
            }
        };
    }
    ...
    
    }
 

в журнале я действительно вижу:

 clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-group12-1, groupId=group12] Subscribed to topic(s): TOPIC
ThreadPoolTaskScheduler          : Initializing ExecutorService
KafkaMessageDrivenChannelAdapter : started bean 'adapter'; defined in: 'com.example.demo.demo.Kafka';
 

Ответ №1:

Имея embeddedKafka = new EmbeddedKafkaBroker(1, true, TOPIC); и @EmbeddedKafka , вы, по сути, начинаете два отдельных кластера Кафки. Смотрите ports опцию @EmbeddedKafka , если вы хотите изменить случайный порт для встроенного брокера. Но в то же время лучше полагаться на то, что Spring Boot предоставляет нам с его автоматической настройкой.

Дополнительную информацию смотрите в документации: https://docs.spring.io/spring-boot/docs/current/reference/html/spring-boot-features.html#boot-features-embedded-kafka. Обратите внимание на bootstrapServersProperty = "spring.kafka.bootstrap-servers" недвижимость.

Обновить

В вашем тесте у вас есть это @SpringBootTest(classes = {Kafka.class}) . Когда я удаляю этот classes атрибут, все начинает работать. Проблема в том, что ваш класс конфигурации не поддерживает автоматическую настройку, поэтому у вас неправильно инициализирована интеграция Spring, и сообщение не используется из канала. Может быть, есть какой-то другой эффект. Но все же: лучше полагаться на автоматическую настройку, поэтому позвольте вашему тесту увидеть эту @SpringBootApplication аннотацию.

Комментарии:

1. удален новый , still handler not triggered when I push message. Maybe because I use поток EmbeddedKafkaBroker. Поспать после толчка?

2. Не уверен, что происходит. Есть ли у вас шансы поделиться с нами простым проектом, чтобы мы могли воспроизвести его и поиграть с ним?

3. очень простой пример, 2 джавы , github.com/yerivlazer/IntegrationTest

4. Смотрите ОБНОВЛЕНИЕ в моем ответе.