#java #apache-kafka #apache-kafka-streams
#java #apache-kafka #apache-kafka-streams
Вопрос:
Я просматриваю руководство по потокам Pipe. Мне нужно что-то подобное, но только локальное.
Другими словами, код в примере является:
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("streams-plaintext-input").to("streams-pipe-output");
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
которая брала бы сообщения из streams-plaintext-input
темы и помещала их в streams-pipe-output
topic.
Есть ли способ сделать это без создания темы вывода? Другими словами, должен ли каждый клиент использовать потоки и просто получать выходные данные локально?
Вариант использования в основном заключается в получении сообщений и выполнении некоторой фильтрации, группировки и тому подобного, а затем в использовании этого потока, как если бы это был исходный поток.
Однако будет много потребителей с разными критериями преобразования, и я бы не хотел создавать тему для каждого из них, тем более что это временные данные, больше похожие на непрерывный кэш.
Если есть какие-то руководства или примеры, с которыми я могу ознакомиться, пожалуйста, дайте мне знать.
Комментарии:
1. Не могли бы вы описать, что вы подразумеваете под затем, чтобы использовать этот поток, как если бы это был исходный поток ? Если вам нужно преобразовать данные, вам просто нужно: builder.stream(«потоки-открытый текст-ввод»). фильтр (…).карта (…) и т.д.
2. Не уверен, что вы подразумеваете под «локально» — возможно,
KStream#foreach()
это то, что вы ищете? — Кроме того, если вы хотите применить другую обработку, вы можете «повторно использовать» одну и ту жеKStream stream
переменную и «транслировать» данные в другой конвейер:stream.filter(); ... stream.map()
и т.д. В целом, вам следует прочитать документы и ознакомиться с примерами ( docs.confluent.io/current/streams/index.html и github.com/confluentinc/kafka-streams-examples )