активатор службы сопоставлен с каналом успешной отправки kafka в качестве входного канала, но не выполняется при успешной отправке kafka

#spring #apache-kafka #spring-integration #spring-kafka

#spring #apache-kafka #spring-интеграция #spring-kafka

Вопрос:

Итак, я настроил адаптер исходящих сообщений Kafka с каналами для успеха и неудачи, чтобы я мог выполнить некоторую постобработку на основе результатов публикации kafka

 @Bean
public KafkaProducerMessageHandler<String, String> kafkaProducerMessageHandler() {
    KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
    handler.setHeaderMapper(mapper());
    handler.setLoggingEnabled(TRUE);
    handler.setTopicExpression(
            new SpelExpressionParser()
                    .parseExpression(
                            "headers['"   upstreamType   "']   '_'   headers['"   upstreamTypeInstance   "']"));
    handler.setMessageKeyExpression(new SpelExpressionParser().parseExpression("payload['key']"));
    handler.setSendSuccessChannel(kafkaPublishSuccessChannel());
    handler.setSendFailureChannel(kafkaFailuresChannel());
    return handler;
} 
 

Затем я подключаю активатор службы к этому каналу успешной отправки, который также сохраняет успешно отправленное сообщение в хранилище сообщений

 @Bean
public SubscribableChannel kafkaPublishSuccessChannel() {
    return MessageChannels.direct("kafkaSuccessChannel").get();
}

@Bean
@ServiceActivator(inputChannel = "kafkaSuccessChannel")
public MongoDbStoringMessageHandler mongoDbOutboundGateway() {
    MongoDbStoringMessageHandler mongoHandler = new MongoDbStoringMessageHandler(mongoDbFactory);
    mongoHandler.setMongoConverter(mongoConverter);
    mongoHandler.setLoggingEnabled(TRUE);
    SpelExpressionParser parser = new SpelExpressionParser();
    mongoHandler.setCollectionNameExpression(
            parser.parseExpression(
                    "headers['"   upstreamType   "']   '_'  headers['"   upstreamTypeInstance   "']   '_'   headers['"   upstreamWebhookSource   "']"));
    return mongoHandler;
}
 

Я ожидаю, что активатор службы будет вызван в случае успешной публикации, чего не происходит,

 @Test
public void testPushNotificationIsSavedToMongo(
        @Value("classpath:webhooks/jira/test-payload.json") Resource jiraWebhookPayload) throws IOException, InterruptedException {

    //publish messsge to KAfka TOpic
      ...
    //assert message saved in MongoDB
    assertThat(mongoTemplate.findAll(DBObject.class, "alm_jira_some-project")).extracting("key")
            .containsOnly("JRASERVER-2000");
}
 

Последнее утверждение завершается ошибкой, и в журналах я не вижу никакого вызова на канале успеха после публикации продюсера в теме.

Комментарии:

1. Вы ждете сообщения об успешном завершении? Он возвращается асинхронно в другом потоке, поэтому вам нужен какой-то механизм, чтобы дождаться, пока это произойдет. например, перехватчик канала для обратного отсчета защелки.

Ответ №1:

Как сказал Гэри в своем комментарии, sendSuccessChannel вызывается асинхронно в другом потоке, чем ваш основной бегун JUnit. На самом деле это обратный Future вызов завершения в клиенте Kafka.

Итак, чтобы быть уверенным, что все отправлено в MongoDB после отправки в Kafka, вам нужно более сложное утверждение, чем просто простое findAll() . Вам нужно повторить такой вызов несколько раз в течение некоторого периода, чтобы убедиться, что другие потоки выполнили свою работу по отправке сообщения на этот канал и хранению документа в коллекции MongoDB.

Для этой цели я могу предложить инструмент ожидания, который мы действительно используем в наших собственных тестах: https://github.com/awaitility/awaitility

Комментарии:

1. поможет ли вызов setSync(true) только для целей тестов?