Локальная инкрементная карта значений с потоками Kafka

#java #apache-kafka #apache-kafka-streams

#java #apache-kafka #apache-kafka-streams

Вопрос:

Я просматриваю руководство по потокам Pipe. Мне нужно что-то подобное, но только локальное.

Другими словами, код в примере является:

 final StreamsBuilder builder = new StreamsBuilder();
builder.stream("streams-plaintext-input").to("streams-pipe-output");

final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
  

которая брала бы сообщения из streams-plaintext-input темы и помещала их в streams-pipe-output topic.

Есть ли способ сделать это без создания темы вывода? Другими словами, должен ли каждый клиент использовать потоки и просто получать выходные данные локально?

Вариант использования в основном заключается в получении сообщений и выполнении некоторой фильтрации, группировки и тому подобного, а затем в использовании этого потока, как если бы это был исходный поток.

Однако будет много потребителей с разными критериями преобразования, и я бы не хотел создавать тему для каждого из них, тем более что это временные данные, больше похожие на непрерывный кэш.

Если есть какие-то руководства или примеры, с которыми я могу ознакомиться, пожалуйста, дайте мне знать.

Комментарии:

1. Не могли бы вы описать, что вы подразумеваете под затем, чтобы использовать этот поток, как если бы это был исходный поток ? Если вам нужно преобразовать данные, вам просто нужно: builder.stream(«потоки-открытый текст-ввод»). фильтр (…).карта (…) и т.д.

2. Не уверен, что вы подразумеваете под «локально» — возможно, KStream#foreach() это то, что вы ищете? — Кроме того, если вы хотите применить другую обработку, вы можете «повторно использовать» одну и ту же KStream stream переменную и «транслировать» данные в другой конвейер: stream.filter(); ... stream.map() и т.д. В целом, вам следует прочитать документы и ознакомиться с примерами ( docs.confluent.io/current/streams/index.html и github.com/confluentinc/kafka-streams-examples )