#java #spring #apache-kafka #spring-integration
#java #весна #apache-kafka #spring-интеграция
Вопрос:
Может кто-нибудь, пожалуйста, помочь мне с установкой random KafkaHeaders.MESSAGE_KEY
для приведенного ниже кода, чтобы я мог публиковать полезную нагрузку в разных разделах.
@Bean
public IntegrationFlow fileInboundChannelFlow() {
FileInboundChannelAdapterSpec messageSourceSpec = Files
.inboundAdapter(Paths.get(this.properties.getDirectory()).toFile());
messageSourceSpec = messageSourceSpec.filter(getFilter());
//messageSourceSpec.regexFilter(this.properties.getFilenameRegex());
messageSourceSpec.preventDuplicates(this.properties.isPreventDuplicates());
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(messageSourceSpec)
.split(new FileSplitter(true, true))
.enrichHeaders(h -> h.header(KafkaHeaders.MESSAGE_KEY, "payload.flightNumber"));
return flowBuilder.<Object, Class<?>>route(Object::getClass,
m -> m.channelMapping(FileSplitter.FileMarker.class, "markers.input").channelMapping(String.class,
"lines.input"))
.get();
}
Я хочу установить ключ на основе payload.prod_cd.
Поскольку это одноэлементный компонент, я хочу инициализировать разные ключи заголовка kafka для каждой полезной нагрузки.
Ответ №1:
Вы на правильном пути с этим:
.enrichHeaders(h -> h.header(KafkaHeaders.MESSAGE_KEY, "payload.flightNumber"));
Единственное, что вам нужно, это использовать headerExpression()
вместо:
/**
* Add a single header specification where the value is a String representation of a
* SpEL {@link Expression}. If the header exists, it will <b>not</b> be overwritten
* unless {@link #defaultOverwrite(boolean)} is true.
* @param name the header name.
* @param expression the expression.
* @return the header enricher spec.
*/
public HeaderEnricherSpec headerExpression(String name, String expression) {
Комментарии:
1. Здравствуйте, я изменил файл .enrichHeaders(h -> h.headerExpression(KafkaHeaders. MESSAGE_KEY,»T(java.util. UUID).randomUUID().toString()»)); но каждый файл имеет другой ключ, а не каждая запись в файле
2. Такой
enrichHeaders()
оператор должен быть после.split(new FileSplitter(true, true))
. Тогда каждая запись из файла будет иметь свою собственнуюKafkaHeaders.MESSAGE_KEY
. Примечание: для этогоUUID
варианта вы можете вместо этого использовать aheaderFunction()
для удобства и завершения кода.