#java #json #apache-kafka #apache-kafka-streams
#java #json #apache-kafka #apache-kafka-streams
Вопрос:
Я использую потоки Kafka для чтения из темы в моем кластере, и я хочу фильтровать сообщения на основе его содержимого в формате JSON, т.Е.:
Формат JSON:
{
"id": 1
"timestamp": "2019-04-21 12:53:18",
"priority": "Medium",
"name": "Sample Text",
"metadata": [{
"metric_1": "0",
"metric_2": "1",
"metric_3": "2"
}]
}
Я хочу прочитать сообщения из темы ввода (назовем ее «input-topic»), отфильтровать их (предположим, мне нужны только сообщения с приоритетом «Low»), затем объединить их и отправить в другую тему («filtered-topic»)
У меня не так много кода, кроме создания самого потока и его конфигураций. Я думаю, что в Serdes должно быть что-то, что мне нужно настроить, но я не уверен, как. Я также пытался использовать десериализатор JSON, но не смог заставить его работать.
Прежде всего, возможно ли это вообще? Если да, то каков будет правильный курс действий?
Комментарии:
1. что именно вы имеете в виду
then aggregate
? вы хотите агрегировать по времени?2. Это возможно. Вам нужно прочитать сообщения, десериализовать их в свой пользовательский объект, затем выполнить логику фильтрации на основе поля этого пользовательского объекта, а затем опубликовать его в другой теме.
3. Вы можете использовать библиотеку Kafka от Spring, библиотеку Kafka от Apache или даже библиотеку spring cloud.
4. @VasiliySarzhynskyi Под «агрегированием» я подразумеваю отправку отфильтрованных сообщений в отдельную тему, вот и все
5. В Интернете есть много примеров: github.com/confluentinc/kafka-streams-examples
Ответ №1:
Вы можете создать поток из темы.
StreamsBuilder builder = new StreamsBuilder();
// key value type here is both String for me and update based on cases
KStream<String, String> source = builder.stream("input-topic");
source.filter(new Predicate<String, String>() {
@Override
public boolean test(String s, String s2) {
// your filter logic here and s and s2 are key/value from topic
// In your case, s2 should be type of your json Java object
return false;
}
}).groupBy(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String key, String value) {
// your group by logic
return null;
}
}).count().toStream().to("new topic");
Комментарии:
1. Кажется, я получаю некоторые ошибки компиляции, пытаясь заставить это работать: «неправильное количество аргументов типа; требуется 1» «здесь не ожидается интерфейс» «метод не переопределяет и не реализует метод из супертипа» В основном в части фильтра
2. Я думаю, вам нужна зависимость от kafka-stream <зависимость> <Идентификатор группы>org.apache. кафка</groupId> <artifactId>кафка-потоки </artifactId> <версия> 1.0.1</версия> </зависимость>