#java #filter #apache-kafka #stream
#java #Фильтр #apache-kafka #поток
Вопрос:
Есть ли какой-либо способ обработать динамическое условие в потоке kafka? Мне нужно отфильтровать мои потоковые данные по списку значений, измененных пользователями, но этот список значений отсутствует в потоке, они доступны с помощью http-вызова.
stream(myTopic)
.filter(isDataOK())
...
private Predicate<> isDataOK() {
http_call;
return predicate_value_based_on_http_answer;
}
Возможно ли обрабатывать http-вызов во время обработки потока kafka или мне нужны условные данные в другом потоке?
Заранее спасибо, с уважением
Комментарии:
1. Это возможно, но это неправильный подход. http-вызов может быть тяжелым , вызывать исключение и т.д., И это может замедлить обработку или даже остановить ее.
Ответ №1:
У меня были аналогичные требования к конфигурации динамического потока. Вызов http для обработки каждого сообщения не является хорошей идеей, поскольку это отнимает много времени. Лучше кэшировать значения в вашем компоненте обработки и обновлять их на регулярной основе, например, раз в час. Вам также следует подумать о сценарии, когда http-вызов недоступен.
В другом проекте я получал динамическую конфигурацию асинхронно из другой темы Kafka, и это работало лучше и естественнее для потоковой обработки.
Комментарии:
1. Здравствуйте, спасибо за ваш ответ, я разделяю ваше мнение, и я создам функцию расписания в качестве быстрого решения и создам еще одну тему kafka с моей настройкой для долгосрочного решения. Но на самом деле, меня также интересует техническое решение без расписания или предоставленной темы, просто чтобы понять, как это возможно сделать.