Подход к группировке в Кафке

#java #apache-kafka #kafka-consumer-api #apache-kafka-streams

Вопрос:

у нас есть один вариант использования для группировки нескольких сообщений данных, а затем их обработки

Пример Сообщения С Данными

 {
    "meta": {
        "id": "66b3cd0e-6a15-4730-a5c8-71ca6dd601a5",
        "userId": "47922F57-2C49-4B0B-A34B-08C9E6A15CC",
    },
    "data": [
        {
            "Email": "user1@email.com",
            "FirstName": "User 1",
            "LastName": "Last 1",
        }, 
        {
            "Email": "user2@email.com",
            "FirstName": "User 2",
            "LastName": "Last 2",
        }, 
    ]
}
 

Мы постоянно получаем похожие сообщения с разными идентификаторами, как (meta.id meta.идентификатор пользователя) и с несколькими записями в массиве данных

То, что мы хотим сделать, — это группировать сообщения на основе (meta.id meta.идентификатор пользователя), а затем обработайте их

  • либо периодически с интервалом в 5 мин.
  • или если для идентификатора мы получили x записей данных.

Этот вопрос может быть расплывчатым, мы хотим знать, как мы можем достичь этой группировки с помощью Кафки, если это возможно.

Ответ №1:

Вы можете использовать потоки кафки для этого, так что в основном ваша топология будет выглядеть примерно так:

 val kStream = streamsBuilder.stream(YOUR_TOPIC, YOUR_SERDES)
kStream.groupBy(k ,v ->v.meta)
       .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
 

Однако вы должны знать, что использование groupBy приведет к перераспределению данных.
Более подробную информацию вы можете найти здесь