Идентификатор корреляции в заголовке Kafka

#apache-kafka #spring-kafka

#apache-kafka #spring-kafka

Вопрос:

Я использую клиент kafka conluent c # для kafka и хочу реализовать соответствующий шаблон. Я установил заголовок ответа и идентификатор корреляции ( https://github.com/nestjs/nest/blob/master/packages/microservices/enums/kafka-headers.enum.ts ) CORRELATION_ID / REPLY_TOPIC в запросе.

Я получаю эти заголовки на стороне «сервера» и отвечаю в теме значения заголовка REPLY_TOPIC. Вероятно, мне следует установить CORRELATION_ID в заголовке ответа.

Итак, вопросы:

  1. Есть ли какая-либо магия внутри kafka, которая ищет CORRELATION_ID в заголовке сообщения и отправляет его потребителям с соответствующими настройками?
  2. МБ кто-нибудь знает и указывает мне, как spring-kafka реализует шаблон req-resp?

Комментарии:

1. Привет, @Yuriy Vikulov вам удалось успешно внедрить эту систему? / Не могли бы вы поделиться своим методом?

2. Я пытаюсь сделать то же самое, мне нужен какой-то ответ на запрос в kafka для взаимодействия с пользовательским интерфейсом в системе, основанной на iot

Ответ №1:

В Kafka для этого ничего не встроено; spring-kafka использует свои собственные пользовательские заголовки для этих целей. См KafkaHeaders .

Если вы используете Spring на стороне сервера ( @KafkaListener ), вам необходимо установить эти заголовки. Затем платформа передает идентификатор корреляции в ответ и использует тему для назначения.

Если вы пишете свой собственный серверный код, вам нужно сделать то же самое.

     /**
     * The prefix for Kafka headers.
     */
    public static final String PREFIX = "kafka_";

...

    /**
     * The header containing information to correlate requests/replies.
     * Type: byte[].
     * @since 2.1.3
     */
    public static final String CORRELATION_ID = PREFIX   "correlationId";

    /**
     * The header containing the default reply topic.
     * Type: byte[].
     * @since 2.1.3
     */
    public static final String REPLY_TOPIC = PREFIX   "replyTopic";