Как реализовать механизм отложенной повторной попытки в потоковом приложении Kafka (kstreams), построенном в облачном потоке spring?

#java #apache-kafka #reactive-programming #apache-kafka-streams #spring-cloud-stream

Вопрос:

У меня есть такие сценарии, как.

Есть 4 темы: Тема обработки , тема исключения , Тема повторной попытки и тема отклонения. У меня есть приложение spring cloud stream, в котором есть процессор, использующий Kstream. Этот процессор считывает сообщение из темы исключения и на основе флага, доступного в каждом сообщении, создает две ветви kstream для темы повторной попытки и темы отклонения. Теперь необходимо сделать следующее: любое сообщение, присутствующее в теме повторной попытки, должно подождать определенный период времени, прежде чем отправлять его обратно в тему обработки. может ли кто-нибудь помочь мне, каков наилучший дизайн или решение для этого в приложении kafka stream в spring cloud stream. Может ли он быть спроектирован с асинхронным механизмом с использованием потока и Моно. любой ресурс или руководство были бы большим подспорьем. Спасибо

Ответ №1:

мы не можем смешивать реактивные типы в приложении Spring Cloud Stream Kafka Streams. В принципе, у вас могут быть только типы потоков Кафки, такие как KStream привязки ввода/вывода или KTable как привязки ввода / вывода. Если вы хотите ввести задержку перед отправкой в раздел обработки, почему вы не можете использовать что-то вроде a ScheduleExecutorService , а затем вызывать службу только после начальной задержки? Вот пример псевдокода для потенциального решения.

 ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
...
...

.branch(
   (k, v) -> {
     if (reject flag found) {
       return true;
     }
   }
   (k, v) -> executorService.schedule(() -> true, initialDelay, TimeUnit.SECONDS));
       
 

В первой ветке мы проверяем, привязана ли запись к статусу отклонена, если да, немедленно отправьте ее в раздел отклонено. В противном случае он предназначен для темы повтора, задержка с некоторым значением (я предполагаю, что это настроено извне в вашем приложении), прежде чем возвращать логический флаг. По завершении задержки логическое true значение вернется, и запись, поступившая в ветку, будет отправлена в раздел «Повторная попытка».

Пожалуйста, имейте в виду, что я не пробовал этот код, поэтому остерегайтесь любых условий гонки в этом втором фильтре, когда мы передаем данные планировщику асинхронно, но контекст потока все равно должен сохранять текущую (k,v) пару. Пожалуйста, попробуйте это с некоторыми тестовыми данными, чтобы убедиться, что это соответствует вашим требованиям.