Как установить KafkaHeaders.MESSAGE_KEY в spring integration

#java #spring #apache-kafka #spring-integration

#java #весна #apache-kafka #spring-интеграция

Вопрос:

Может кто-нибудь, пожалуйста, помочь мне с установкой random KafkaHeaders.MESSAGE_KEY для приведенного ниже кода, чтобы я мог публиковать полезную нагрузку в разных разделах.

 @Bean
public IntegrationFlow fileInboundChannelFlow() {
    FileInboundChannelAdapterSpec messageSourceSpec = Files
            .inboundAdapter(Paths.get(this.properties.getDirectory()).toFile());

    messageSourceSpec = messageSourceSpec.filter(getFilter());
    //messageSourceSpec.regexFilter(this.properties.getFilenameRegex());
    messageSourceSpec.preventDuplicates(this.properties.isPreventDuplicates());

    IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(messageSourceSpec)
            .split(new FileSplitter(true, true))
            .enrichHeaders(h -> h.header(KafkaHeaders.MESSAGE_KEY, "payload.flightNumber"));

    return flowBuilder.<Object, Class<?>>route(Object::getClass,
            m -> m.channelMapping(FileSplitter.FileMarker.class, "markers.input").channelMapping(String.class,
                    "lines.input"))
            .get();
}
  

Я хочу установить ключ на основе payload.prod_cd.
Поскольку это одноэлементный компонент, я хочу инициализировать разные ключи заголовка kafka для каждой полезной нагрузки.

Ответ №1:

Вы на правильном пути с этим:

 .enrichHeaders(h -> h.header(KafkaHeaders.MESSAGE_KEY, "payload.flightNumber"));
  

Единственное, что вам нужно, это использовать headerExpression() вместо:

 /**
 * Add a single header specification where the value is a String representation of a
 * SpEL {@link Expression}. If the header exists, it will <b>not</b> be overwritten
 * unless {@link #defaultOverwrite(boolean)} is true.
 * @param name the header name.
 * @param expression the expression.
 * @return the header enricher spec.
 */
public HeaderEnricherSpec headerExpression(String name, String expression) {
  

Комментарии:

1. Здравствуйте, я изменил файл .enrichHeaders(h -> h.headerExpression(KafkaHeaders. MESSAGE_KEY,»T(java.util. UUID).randomUUID().toString()»)); но каждый файл имеет другой ключ, а не каждая запись в файле

2. Такой enrichHeaders() оператор должен быть после .split(new FileSplitter(true, true)) . Тогда каждая запись из файла будет иметь свою собственную KafkaHeaders.MESSAGE_KEY . Примечание: для этого UUID варианта вы можете вместо этого использовать a headerFunction() для удобства и завершения кода.