Как использовать сообщение из темы Kafka, используя какой-либо фильтр?

#java #apache-kafka #kafka-consumer-api

#java #apache-kafka #kafka-consumer-api

Вопрос:

Я пишу простую программу Kafka producer-consumer на Java, в которой я создал такие данные:

  • ключ: «a» значение:»{25,223,465}»

  • ключ: «a» значение: «{26,323,56}»

  • ключ: «a» значение:»{62,256,652}»

  • ключ: «a» значение:»{26,227,42}»

  • ключ: «b» значение: «{4352,234,65342}»

  • ключ: «b» значение: «{243,22347,434}»

Я могу использовать сообщения, используя consumer.poll(10000) , но теперь я хочу использовать данные, например, сколько a записей и сколько b записей присутствует в теме Kafka.

Если я свяжу это с SQL

 select count(*) from 'mytopic' where key='a'

select count(*) from 'mytopic' where key='b'
  

Пожалуйста, предоставьте мне код на Java, если это возможно

Комментарии:

1. Похоже, вас может заинтересовать использование ksqlDB

2. @OneCricketeer Да, может быть, но не знаю, как использовать это в коде Java. не могли бы вы предоставить какой-нибудь совет

3. Я доверяю вашей способности находить и искать примеры ksqlDB на своем веб-сайте… KsqlDB использует синтаксис sql, но у него есть REST API, который вы можете использовать через Java. Однако, как уже было сказано, простой потребитель Kafka не предназначен для использования так, как вы просите для агрегирования — вам понадобится другой инструмент

4. @OneCricketeer Спасибо

Ответ №1:

Kafka таким образом не работает, потребитель просто считывает последнее зафиксированное смещение в разделе (или с самого начала) и последовательно все сообщения. Что вы можете сделать, так это фильтровать в вашем приложении. Для вашей цели вместо написания приложения, использующего только потребительский API, вы можете использовать приложение на основе API Kafka Streams, которое предоставляет вам DSL для выполнения таких операций, как отображение, фильтрация, … очень легко. Дополнительная информация здесь:

https://kafka.apache.org/documentation/streams/