Может ли ответ на запрос Spring быть реализован на стороне клиента с использованием Spring kafka API и на стороне сервера с использованием apache kafka API?

#java #spring #spring-boot #apache-kafka #spring-kafka

#java #spring #spring-boot #apache-kafka #spring-kafka

Вопрос:

Может ли ответ на запрос Spring быть реализован на стороне клиента с использованием Spring kafka API и на стороне сервера с использованием apache kafka API?

Ответ №1:

Spring для Apache Kafka предоставляет запрос / ответ, используя ReplyingKafkaTemplate и @KafkaListener с @SendTo .

Документация здесь.

Редактировать

При использовании нескольких экземпляров клиента…

если у вас есть несколько экземпляров клиента, и вы не настраиваете их, как описано в предыдущем параграфе, для каждого экземпляра требуется отдельная тема ответа. Альтернативой является установка KafkaHeaders.REPLY_PARTITION и использование выделенного раздела для каждого экземпляра. Заголовок содержит четырехбайтовое значение int (big-endian). Сервер должен использовать этот заголовок, чтобы направить ответ в правильный раздел (это делает@KafkaListener). В этом случае, однако, контейнер ответа не должен использовать функцию управления группами Kafka и должен быть настроен для прослушивания фиксированного раздела (с помощью TopicPartitionOffset в его конструкторе ContainerProperties).

При настройке с использованием одной темы ответа каждый экземпляр должен использовать другую group.id . В этом случае все экземпляры получают каждый ответ, но только экземпляр, отправивший запрос, находит идентификатор корреляции. Это может быть полезно для автоматического масштабирования, но с накладными расходами на дополнительный сетевой трафик и небольшими затратами на удаление каждого нежелательного ответа. При использовании этого параметра мы рекомендуем установить для sharedReplyTopic шаблона значение true, что снижает уровень протоколирования неожиданных ответов для ОТЛАДКИ вместо ошибки по умолчанию.

Комментарии:

1. Несколько экземпляров клиента, вызывающих исключение времени ожидания ответа. org.springframework.kafka.requestreply. Исключение KafkaReplyTimeoutException: время ожидания ответа истекло в org.springframework.kafka.requestreply. ReplyingKafkaTemplate.lambda$scheduleTimeout $ 3(ReplyingKafkaTemplate.java:342) [spring-kafka-2.5.5.RELEASE.jar !/:2.5.5.RELEASE] в org.springframework.scheduling.support. Делегирование ошибок при выполнении.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.2.6.RELEASE.jar !/:5.2.6.RELEASE] на java.util.concurrent. Исполнители $RunnableAdapter.call(Executors.java:511)

2. когда я запускаю один экземпляр клиента, не получая никаких исключений времени ожидания ответа.

3. Не помещайте журналы в комментарии; нечитаемый — вместо этого отредактируйте вопрос. Если вы не используете Spring на стороне сервера, серверной стороне необходимо вернуть ответ в правильную тему ответа и включить заголовок идентификатора корреляции; вам нужно показать свой код и конфигурацию (обе стороны).

4. При использовании нескольких клиентских экземпляров, если вы используете одну и ту же тему ответа для каждого, вам необходимо использовать другой идентификатор группы, чтобы оба экземпляра получали ответ; вы можете установить sharedReplyTopic значение true для подавления сообщения журнала в другом экземпляре, или вам следует использовать другую тему ответа / раздел для каждого экземпляра — см. Документацию .

5. Огромное спасибо за вашу помощь, я пробовал с другим groupId и который работает так, как ожидалось. Но, к сожалению, мы собираемся использовать балансировщик нагрузки в PROD и K8S. В таком случае, как мы можем управлять этим groupId? Есть ли какой-либо другой способ, которым мы можем решить эту проблему (запрос и ответ)?