#spring #apache-kafka #spring-integration #spring-kafka
#spring #apache-kafka #spring-интеграция #spring-kafka
Вопрос:
Итак, я настроил адаптер исходящих сообщений Kafka с каналами для успеха и неудачи, чтобы я мог выполнить некоторую постобработку на основе результатов публикации kafka
@Bean
public KafkaProducerMessageHandler<String, String> kafkaProducerMessageHandler() {
KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setHeaderMapper(mapper());
handler.setLoggingEnabled(TRUE);
handler.setTopicExpression(
new SpelExpressionParser()
.parseExpression(
"headers['" upstreamType "'] '_' headers['" upstreamTypeInstance "']"));
handler.setMessageKeyExpression(new SpelExpressionParser().parseExpression("payload['key']"));
handler.setSendSuccessChannel(kafkaPublishSuccessChannel());
handler.setSendFailureChannel(kafkaFailuresChannel());
return handler;
}
Затем я подключаю активатор службы к этому каналу успешной отправки, который также сохраняет успешно отправленное сообщение в хранилище сообщений
@Bean
public SubscribableChannel kafkaPublishSuccessChannel() {
return MessageChannels.direct("kafkaSuccessChannel").get();
}
@Bean
@ServiceActivator(inputChannel = "kafkaSuccessChannel")
public MongoDbStoringMessageHandler mongoDbOutboundGateway() {
MongoDbStoringMessageHandler mongoHandler = new MongoDbStoringMessageHandler(mongoDbFactory);
mongoHandler.setMongoConverter(mongoConverter);
mongoHandler.setLoggingEnabled(TRUE);
SpelExpressionParser parser = new SpelExpressionParser();
mongoHandler.setCollectionNameExpression(
parser.parseExpression(
"headers['" upstreamType "'] '_' headers['" upstreamTypeInstance "'] '_' headers['" upstreamWebhookSource "']"));
return mongoHandler;
}
Я ожидаю, что активатор службы будет вызван в случае успешной публикации, чего не происходит,
@Test
public void testPushNotificationIsSavedToMongo(
@Value("classpath:webhooks/jira/test-payload.json") Resource jiraWebhookPayload) throws IOException, InterruptedException {
//publish messsge to KAfka TOpic
...
//assert message saved in MongoDB
assertThat(mongoTemplate.findAll(DBObject.class, "alm_jira_some-project")).extracting("key")
.containsOnly("JRASERVER-2000");
}
Последнее утверждение завершается ошибкой, и в журналах я не вижу никакого вызова на канале успеха после публикации продюсера в теме.
Комментарии:
1. Вы ждете сообщения об успешном завершении? Он возвращается асинхронно в другом потоке, поэтому вам нужен какой-то механизм, чтобы дождаться, пока это произойдет. например, перехватчик канала для обратного отсчета защелки.
Ответ №1:
Как сказал Гэри в своем комментарии, sendSuccessChannel
вызывается асинхронно в другом потоке, чем ваш основной бегун JUnit. На самом деле это обратный Future
вызов завершения в клиенте Kafka.
Итак, чтобы быть уверенным, что все отправлено в MongoDB после отправки в Kafka, вам нужно более сложное утверждение, чем просто простое findAll()
. Вам нужно повторить такой вызов несколько раз в течение некоторого периода, чтобы убедиться, что другие потоки выполнили свою работу по отправке сообщения на этот канал и хранению документа в коллекции MongoDB.
Для этой цели я могу предложить инструмент ожидания, который мы действительно используем в наших собственных тестах: https://github.com/awaitility/awaitility
Комментарии:
1. поможет ли вызов
setSync(true)
только для целей тестов?