Потоки Кафки записывают событие обратно в тему ввода

#kotlin #apache-kafka #apache-kafka-streams

Вопрос:

в моем приложении kafka streams мне нужно повторно пытаться обрабатывать сообщение всякий раз, когда в логике обработки возникает исключение определенного типа.

Вместо того, чтобы оборачивать свою логику в RetryTemplate (я использую springboot), я рассматриваю возможность просто записать сообщение обратно в тему ввода, я предполагаю, что это сообщение будет добавлено в конец журнала в соответствующем разделе и в конечном итоге будет повторно обработано.

Я понимаю, что это испортило бы заказ, и меня это устраивает.

Мой вопрос в том, возникнет ли проблема у потоков кафки, когда он столкнется с сообщением, которое предположительно уже было обработано в прошлом (я предполагаю, что потоки кафки могут помечать обработанные сообщения, особенно когда это включено)?

Вот пример кода, который я рассматриваю для этого решения.

 val branches = streamsBuilder.stream(inputTopicName)
            .mapValues { it -> myServiceObject.executeSomeLogic(it) }
            .branch(
                { _, value -> value is successfulResult() }, // success
                { _, error -> error is errorResult() },  // exception was thrown
            )

branches[0].to(outputTopicName)
branches[1].to(inputTopicName) //write them back to input as a way of retrying
 

Комментарии:

1. Я никогда не работал с потоками Кафки, но я предполагаю, что потоки Кафки работают со смещениями разделов. Всякий раз, когда вы повторно создаете сообщение в той же теме, его содержимое (Ключ и значение) будет равно исходному сообщению, но оно получит новое смещение. Тем не менее, я бы настоятельно рекомендовал добавить счетчик повторных попыток к сообщению (или использовать заголовок кафки) для всех сообщений о повторных попытках. Вы никогда не знаете, приведут ли по какой-либо причине вредоносные входные данные к сбоям, которые (без счетчика повторных попыток) приведут к безрезультатному циклу

2. Спасибо, я попробую этот подход