#spring-boot #apache-kafka #spring-kafka
#весенняя загрузка #apache-kafka #spring-kafka
Вопрос:
В настоящее время я использую org.springframework.kafka.core.KafkaTemplate для публикации сообщений avro по теме с заголовками.
@Override
public ListenableFuture<SendResult<K, V>> send(Message<?> message) {
ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, this.defaultTopic);
if (!producerRecord.headers().iterator().hasNext()) { // possibly no Jackson
byte[] correlationId = message.getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class);
if (correlationId != null) {
producerRecord.headers().add(KafkaHeaders.CORRELATION_ID, correlationId);
}
}
return doSend((ProducerRecord<K, V>) producerRecord);
}
В сообщении<?> мы можем установить значение и заголовки, но не можем установить ключ. Есть ли способ иметь ключ в заголовке? Если да, не могли бы вы сообщить мне имя заголовка для ключа? Есть ли способ отправить ключ, значение и заголовок с помощью KafkaTemplate
Комментарии:
1.
messageConverter.fromMessage
еще не установлен ключ ProducerRecord? Какой тип объектаthis.messageConverter
?
Ответ №1:
Вы должны установить заголовки на ProducerRecord
, например:
RecordHeader yourHeader = new RecordHeader("yourHeaderName", "yourValue".toByteArray())
record.headers().add(recordHeaderKafkaMessageKey)
Редактировать: неверно истолковал требования, извините. Для ключа сообщения есть определенный заголовок kafka, который KafkaHeaders.MESSAGE_KEY
.
Комментарии:
1. На самом деле мне нужно заполнить ключ сообщения вместе с заголовком плюс значение. Я знаю, что могу установить заголовок, но отправка заголовка не является проблемой. Я не могу отправить ключ сообщения с сообщением <?>
2. Извините, я должен был внимательно прочитать ваш вопрос.
KafkaHeaders.MESSAGE_KEY
Помогает ли?