#spring-integration #spring-kafka
Вопрос:
Я хочу использовать EmbeddedKafkaBroker
для тестирования своего потока , который включает KafkaMessageDrivenChannelAdapter
,
похоже , что потребитель начинает правильно, подписался на тему, но обработчик не запускается после отправки сообщения EmbeddedKafkaBroker
.
@SpringBootTest(properties = {"...."}, classes = {....class})
@EmbeddedKafka
class IntTests {
@BeforeAll
static void setup() {
embeddedKafka = new EmbeddedKafkaBroker(1, true, TOPIC);
embeddedKafka.kafkaPorts(57412);
embeddedKafka.afterPropertiesSet();
}
@Test
void testit() throws InterruptedException {
String ip = embeddedKafka.getBrokersAsString();
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafka));
Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();
// Act
producer.send(new ProducerRecord<>(TOPIC, "key", "{"name":"Test"}"));
producer.flush();
....
}
...
}
И основной класс:
@Configuration
public class Kafka {
@Bean
public KafkaMessageDrivenChannelAdapter<String, String> adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =..
kafkaMessageDrivenChannelAdapter.setOutputChannelName("kafkaChannel");
}
@Bean
public KafkaMessageListenerContainer<String, String> container() {
ContainerProperties properties = new ContainerProperties(TOPIC);
KafkaMessageListenerContainer<String, String> kafkaContainer = ...;
return kafkaContainer;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:57412");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group12");
...
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public PublishSubscribeChannel kafkaChannel() {
return new PublishSubscribeChannel ();
}
@Bean
@ServiceActivator(inputChannel = "kafkaChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
}
};
}
...
}
в журнале я действительно вижу:
clients.consumer.KafkaConsumer : [Consumer clientId=consumer-group12-1, groupId=group12] Subscribed to topic(s): TOPIC
ThreadPoolTaskScheduler : Initializing ExecutorService
KafkaMessageDrivenChannelAdapter : started bean 'adapter'; defined in: 'com.example.demo.demo.Kafka';
Ответ №1:
Имея embeddedKafka = new EmbeddedKafkaBroker(1, true, TOPIC);
и @EmbeddedKafka
, вы, по сути, начинаете два отдельных кластера Кафки. Смотрите ports
опцию @EmbeddedKafka
, если вы хотите изменить случайный порт для встроенного брокера. Но в то же время лучше полагаться на то, что Spring Boot предоставляет нам с его автоматической настройкой.
Дополнительную информацию смотрите в документации: https://docs.spring.io/spring-boot/docs/current/reference/html/spring-boot-features.html#boot-features-embedded-kafka. Обратите внимание на bootstrapServersProperty = "spring.kafka.bootstrap-servers"
недвижимость.
Обновить
В вашем тесте у вас есть это @SpringBootTest(classes = {Kafka.class})
. Когда я удаляю этот classes
атрибут, все начинает работать. Проблема в том, что ваш класс конфигурации не поддерживает автоматическую настройку, поэтому у вас неправильно инициализирована интеграция Spring, и сообщение не используется из канала. Может быть, есть какой-то другой эффект. Но все же: лучше полагаться на автоматическую настройку, поэтому позвольте вашему тесту увидеть эту @SpringBootApplication
аннотацию.
Комментарии:
1. удален новый
, still handler not triggered when I push message. Maybe because I use
поток EmbeddedKafkaBroker. Поспать после толчка?2. Не уверен, что происходит. Есть ли у вас шансы поделиться с нами простым проектом, чтобы мы могли воспроизвести его и поиграть с ним?
3. очень простой пример, 2 джавы , github.com/yerivlazer/IntegrationTest
4. Смотрите ОБНОВЛЕНИЕ в моем ответе.