Как фильтровать события из потока Kafka на основе его содержимого в формате JSON

#java #json #apache-kafka #apache-kafka-streams

#java #json #apache-kafka #apache-kafka-streams

Вопрос:

Я использую потоки Kafka для чтения из темы в моем кластере, и я хочу фильтровать сообщения на основе его содержимого в формате JSON, т.Е.:

Формат JSON:

 {
   "id": 1 
   "timestamp": "2019-04-21 12:53:18", 
   "priority": "Medium", 
   "name": "Sample Text",
   "metadata": [{
      "metric_1": "0", 
      "metric_2": "1", 
      "metric_3": "2"
   }]
}
  

Я хочу прочитать сообщения из темы ввода (назовем ее «input-topic»), отфильтровать их (предположим, мне нужны только сообщения с приоритетом «Low»), затем объединить их и отправить в другую тему («filtered-topic»)

У меня не так много кода, кроме создания самого потока и его конфигураций. Я думаю, что в Serdes должно быть что-то, что мне нужно настроить, но я не уверен, как. Я также пытался использовать десериализатор JSON, но не смог заставить его работать.

Прежде всего, возможно ли это вообще? Если да, то каков будет правильный курс действий?

Комментарии:

1. что именно вы имеете в виду then aggregate ? вы хотите агрегировать по времени?

2. Это возможно. Вам нужно прочитать сообщения, десериализовать их в свой пользовательский объект, затем выполнить логику фильтрации на основе поля этого пользовательского объекта, а затем опубликовать его в другой теме.

3. Вы можете использовать библиотеку Kafka от Spring, библиотеку Kafka от Apache или даже библиотеку spring cloud.

4. @VasiliySarzhynskyi Под «агрегированием» я подразумеваю отправку отфильтрованных сообщений в отдельную тему, вот и все

5. В Интернете есть много примеров: github.com/confluentinc/kafka-streams-examples

Ответ №1:

Вы можете создать поток из темы.

     StreamsBuilder builder = new StreamsBuilder();

    // key value type here is both String for me and update based on cases
    KStream<String, String> source = builder.stream("input-topic");

    source.filter(new Predicate<String, String>() {
        @Override
        public boolean test(String s, String s2) {
            // your filter logic here and s and s2 are key/value from topic
            // In your case, s2 should be type of your json Java object
            return false;
        }
    }).groupBy(new KeyValueMapper<String, String, String>() {
        @Override
        public String apply(String key, String value) {
            // your group by logic
            return null;
        }
    }).count().toStream().to("new topic");
  

Комментарии:

1. Кажется, я получаю некоторые ошибки компиляции, пытаясь заставить это работать: «неправильное количество аргументов типа; требуется 1» «здесь не ожидается интерфейс» «метод не переопределяет и не реализует метод из супертипа» В основном в части фильтра

2. Я думаю, вам нужна зависимость от kafka-stream <зависимость> <Идентификатор группы>org.apache. кафка</groupId> <artifactId>кафка-потоки </artifactId> <версия> 1.0.1</версия> </зависимость>