#java #apache-kafka #reactive-programming #apache-kafka-streams #spring-cloud-stream
Вопрос:
У меня есть такие сценарии, как.
Есть 4 темы: Тема обработки , тема исключения , Тема повторной попытки и тема отклонения. У меня есть приложение spring cloud stream, в котором есть процессор, использующий Kstream. Этот процессор считывает сообщение из темы исключения и на основе флага, доступного в каждом сообщении, создает две ветви kstream для темы повторной попытки и темы отклонения. Теперь необходимо сделать следующее: любое сообщение, присутствующее в теме повторной попытки, должно подождать определенный период времени, прежде чем отправлять его обратно в тему обработки. может ли кто-нибудь помочь мне, каков наилучший дизайн или решение для этого в приложении kafka stream в spring cloud stream. Может ли он быть спроектирован с асинхронным механизмом с использованием потока и Моно. любой ресурс или руководство были бы большим подспорьем. Спасибо
Ответ №1:
мы не можем смешивать реактивные типы в приложении Spring Cloud Stream Kafka Streams. В принципе, у вас могут быть только типы потоков Кафки, такие как KStream
привязки ввода/вывода или KTable
как привязки ввода / вывода. Если вы хотите ввести задержку перед отправкой в раздел обработки, почему вы не можете использовать что-то вроде a ScheduleExecutorService
, а затем вызывать службу только после начальной задержки? Вот пример псевдокода для потенциального решения.
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
...
...
.branch(
(k, v) -> {
if (reject flag found) {
return true;
}
}
(k, v) -> executorService.schedule(() -> true, initialDelay, TimeUnit.SECONDS));
В первой ветке мы проверяем, привязана ли запись к статусу отклонена, если да, немедленно отправьте ее в раздел отклонено. В противном случае он предназначен для темы повтора, задержка с некоторым значением (я предполагаю, что это настроено извне в вашем приложении), прежде чем возвращать логический флаг. По завершении задержки логическое true
значение вернется, и запись, поступившая в ветку, будет отправлена в раздел «Повторная попытка».
Пожалуйста, имейте в виду, что я не пробовал этот код, поэтому остерегайтесь любых условий гонки в этом втором фильтре, когда мы передаем данные планировщику асинхронно, но контекст потока все равно должен сохранять текущую (k,v)
пару. Пожалуйста, попробуйте это с некоторыми тестовыми данными, чтобы убедиться, что это соответствует вашим требованиям.