#java #apache-kafka #kafka-consumer-api #apache-kafka-streams
Вопрос:
у нас есть один вариант использования для группировки нескольких сообщений данных, а затем их обработки
Пример Сообщения С Данными
{
"meta": {
"id": "66b3cd0e-6a15-4730-a5c8-71ca6dd601a5",
"userId": "47922F57-2C49-4B0B-A34B-08C9E6A15CC",
},
"data": [
{
"Email": "user1@email.com",
"FirstName": "User 1",
"LastName": "Last 1",
},
{
"Email": "user2@email.com",
"FirstName": "User 2",
"LastName": "Last 2",
},
]
}
Мы постоянно получаем похожие сообщения с разными идентификаторами, как (meta.id meta.идентификатор пользователя) и с несколькими записями в массиве данных
То, что мы хотим сделать, — это группировать сообщения на основе (meta.id meta.идентификатор пользователя), а затем обработайте их
- либо периодически с интервалом в 5 мин.
- или если для идентификатора мы получили x записей данных.
Этот вопрос может быть расплывчатым, мы хотим знать, как мы можем достичь этой группировки с помощью Кафки, если это возможно.
Ответ №1:
Вы можете использовать потоки кафки для этого, так что в основном ваша топология будет выглядеть примерно так:
val kStream = streamsBuilder.stream(YOUR_TOPIC, YOUR_SERDES)
kStream.groupBy(k ,v ->v.meta)
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
Однако вы должны знать, что использование groupBy приведет к перераспределению данных.
Более подробную информацию вы можете найти здесь