Как установить ключ сообщения kafka в ProducerRecord

#spring-boot #apache-kafka #spring-kafka

#весенняя загрузка #apache-kafka #spring-kafka

Вопрос:

В настоящее время я использую org.springframework.kafka.core.KafkaTemplate для публикации сообщений avro по теме с заголовками.

 @Override
public ListenableFuture<SendResult<K, V>> send(Message<?> message) {
    ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, this.defaultTopic);
    if (!producerRecord.headers().iterator().hasNext()) { // possibly no Jackson
        byte[] correlationId = message.getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class);
        if (correlationId != null) {
            producerRecord.headers().add(KafkaHeaders.CORRELATION_ID, correlationId);
        }
    }
    return doSend((ProducerRecord<K, V>) producerRecord);
}
 

В сообщении<?> мы можем установить значение и заголовки, но не можем установить ключ. Есть ли способ иметь ключ в заголовке? Если да, не могли бы вы сообщить мне имя заголовка для ключа? Есть ли способ отправить ключ, значение и заголовок с помощью KafkaTemplate

Комментарии:

1. messageConverter.fromMessage еще не установлен ключ ProducerRecord? Какой тип объекта this.messageConverter ?

Ответ №1:

Вы должны установить заголовки на ProducerRecord , например:

 RecordHeader yourHeader = new RecordHeader("yourHeaderName", "yourValue".toByteArray())
record.headers().add(recordHeaderKafkaMessageKey)
 

Редактировать: неверно истолковал требования, извините. Для ключа сообщения есть определенный заголовок kafka, который KafkaHeaders.MESSAGE_KEY .

Комментарии:

1. На самом деле мне нужно заполнить ключ сообщения вместе с заголовком плюс значение. Я знаю, что могу установить заголовок, но отправка заголовка не является проблемой. Я не могу отправить ключ сообщения с сообщением <?>

2. Извините, я должен был внимательно прочитать ваш вопрос. KafkaHeaders.MESSAGE_KEY Помогает ли?