#kotlin #apache-kafka #apache-kafka-streams
Вопрос:
в моем приложении kafka streams мне нужно повторно пытаться обрабатывать сообщение всякий раз, когда в логике обработки возникает исключение определенного типа.
Вместо того, чтобы оборачивать свою логику в RetryTemplate
(я использую springboot), я рассматриваю возможность просто записать сообщение обратно в тему ввода, я предполагаю, что это сообщение будет добавлено в конец журнала в соответствующем разделе и в конечном итоге будет повторно обработано.
Я понимаю, что это испортило бы заказ, и меня это устраивает.
Мой вопрос в том, возникнет ли проблема у потоков кафки, когда он столкнется с сообщением, которое предположительно уже было обработано в прошлом (я предполагаю, что потоки кафки могут помечать обработанные сообщения, особенно когда это включено)?
Вот пример кода, который я рассматриваю для этого решения.
val branches = streamsBuilder.stream(inputTopicName)
.mapValues { it -> myServiceObject.executeSomeLogic(it) }
.branch(
{ _, value -> value is successfulResult() }, // success
{ _, error -> error is errorResult() }, // exception was thrown
)
branches[0].to(outputTopicName)
branches[1].to(inputTopicName) //write them back to input as a way of retrying
Комментарии:
1. Я никогда не работал с потоками Кафки, но я предполагаю, что потоки Кафки работают со смещениями разделов. Всякий раз, когда вы повторно создаете сообщение в той же теме, его содержимое (Ключ и значение) будет равно исходному сообщению, но оно получит новое смещение. Тем не менее, я бы настоятельно рекомендовал добавить счетчик повторных попыток к сообщению (или использовать заголовок кафки) для всех сообщений о повторных попытках. Вы никогда не знаете, приведут ли по какой-либо причине вредоносные входные данные к сбоям, которые (без счетчика повторных попыток) приведут к безрезультатному циклу
2. Спасибо, я попробую этот подход