Потоки Kafka: как обрабатывать динамические условия в фильтре?

#java #filter #apache-kafka #stream

#java #Фильтр #apache-kafka #поток

Вопрос:

Есть ли какой-либо способ обработать динамическое условие в потоке kafka? Мне нужно отфильтровать мои потоковые данные по списку значений, измененных пользователями, но этот список значений отсутствует в потоке, они доступны с помощью http-вызова.

 stream(myTopic)
    .filter(isDataOK())
    ...

private Predicate<> isDataOK() {
     http_call;
     return predicate_value_based_on_http_answer;
}
  

Возможно ли обрабатывать http-вызов во время обработки потока kafka или мне нужны условные данные в другом потоке?

Заранее спасибо, с уважением

Комментарии:

1. Это возможно, но это неправильный подход. http-вызов может быть тяжелым , вызывать исключение и т.д., И это может замедлить обработку или даже остановить ее.

Ответ №1:

У меня были аналогичные требования к конфигурации динамического потока. Вызов http для обработки каждого сообщения не является хорошей идеей, поскольку это отнимает много времени. Лучше кэшировать значения в вашем компоненте обработки и обновлять их на регулярной основе, например, раз в час. Вам также следует подумать о сценарии, когда http-вызов недоступен.

В другом проекте я получал динамическую конфигурацию асинхронно из другой темы Kafka, и это работало лучше и естественнее для потоковой обработки.

Комментарии:

1. Здравствуйте, спасибо за ваш ответ, я разделяю ваше мнение, и я создам функцию расписания в качестве быстрого решения и создам еще одну тему kafka с моей настройкой для долгосрочного решения. Но на самом деле, меня также интересует техническое решение без расписания или предоставленной темы, просто чтобы понять, как это возможно сделать.