#java #apache-kafka
#java #apache-kafka
Вопрос:
Я следил за руководством по ссылке на веб-сайт apache Kafka.
Входная тема обрабатывается как поток, и средние темы также генерируются, но конечная тема вывода пуста.
Ниже приведен вывод топологии:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
--> KSTREAM-KEY-SELECT-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
--> KSTREAM-FILTER-0000000006
<-- KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FILTER-0000000006 (stores: [])
--> KSTREAM-SINK-0000000005
<-- KSTREAM-KEY-SELECT-0000000002
Sink: KSTREAM-SINK-0000000005 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition)
<-- KSTREAM-FILTER-0000000006
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000007 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition])
--> KSTREAM-AGGREGATE-0000000004
Processor: KSTREAM-AGGREGATE-0000000004 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000003])
--> KTABLE-TOSTREAM-0000000008
<-- KSTREAM-SOURCE-0000000007
Processor: KTABLE-TOSTREAM-0000000008 (stores: [])
--> KSTREAM-SINK-0000000009
<-- KSTREAM-AGGREGATE-0000000004
Sink: KSTREAM-SINK-0000000009 (topic: streams-wordcount-output)
<-- KTABLE-TOSTREAM-0000000008
и код:
final CountDownLatch latch = new CountDownLatch(10);
new Thread(() -> {
while(latch.getCount() > 0){
latch.countDown();
kafkaTopicService.sendMessage("This is a line with 7 words" );
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream(
KafkaTopicService.TOPIC_NAME,
Consumed.with(stringSerde, stringSerde)
);
KTable<String, Long> wordCounts = textLines
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W ")))
// Group the text words as message keys
.groupBy((key, value) -> value)
// Count the occurrences of each word (message key).
.count();
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final Topology topology = builder.build();
System.out.println(topology.describe());
final KafkaStreams streams = new KafkaStreams(topology, props);
try {
streams.start();
latch.await();
} catch (Throwable e) {
}
Комментарии:
1. Как вы проверили, выполнена ли обработка и вывод темы? При использовании console-consumer убедитесь, что вы указали флаг —from-beginning , на всякий случай, если он пропущен.
2. Попробуйте отправить больше сообщений в приведенном выше цикле и посмотрите результат, просто
3. Я использовал kafkatool для проверки данных темы. две средние темы генерируются в Kafka. Но тема вывода вообще не генерируется!
Ответ №1:
Groupby работает с оконным режимом, который по умолчанию составляет 1 день. Итак, чтобы перезапустить поток в другой теме, окно должно быть закрыто. Поэтому решением является закрытие окна или установка низкого размера окна, которое будет закрыто при запуске приложения.
Я решил проблему, закрыв поток.
streams.close();
Ответ №2:
На самом деле, установка интервала фиксации на 0 немедленно отправит любые данные журнала изменений в тему вывода:
props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, String.valueOf(0));