Потоковая передача в теме apache kafka не имеет выходных данных

#java #apache-kafka

#java #apache-kafka

Вопрос:

Я следил за руководством по ссылке на веб-сайт apache Kafka.

Входная тема обрабатывается как поток, и средние темы также генерируются, но конечная тема вывода пуста.

Ниже приведен вывод топологии:

 Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
  --> KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
  --> KSTREAM-KEY-SELECT-0000000002
  <-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
  --> KSTREAM-FILTER-0000000006
  <-- KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FILTER-0000000006 (stores: [])
  --> KSTREAM-SINK-0000000005
  <-- KSTREAM-KEY-SELECT-0000000002
Sink: KSTREAM-SINK-0000000005 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition)
  <-- KSTREAM-FILTER-0000000006

Sub-topology: 1
Source: KSTREAM-SOURCE-0000000007 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition])
  --> KSTREAM-AGGREGATE-0000000004
Processor: KSTREAM-AGGREGATE-0000000004 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000003])
  --> KTABLE-TOSTREAM-0000000008
  <-- KSTREAM-SOURCE-0000000007
Processor: KTABLE-TOSTREAM-0000000008 (stores: [])
  --> KSTREAM-SINK-0000000009
  <-- KSTREAM-AGGREGATE-0000000004
Sink: KSTREAM-SINK-0000000009 (topic: streams-wordcount-output)
  <-- KTABLE-TOSTREAM-0000000008
  

и код:

 final CountDownLatch latch = new CountDownLatch(10);

    new Thread(() -> {
        while(latch.getCount() > 0){
            latch.countDown();
            kafkaTopicService.sendMessage("This is a line with 7 words" );
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }).start();
    final Serde<String> stringSerde = Serdes.String();
    final Serde<Long> longSerde = Serdes.Long();

    final StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> textLines = builder.stream(
            KafkaTopicService.TOPIC_NAME,
            Consumed.with(stringSerde, stringSerde)
    );

    KTable<String, Long> wordCounts = textLines
            // Split each text line, by whitespace, into words.
            .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W ")))

            // Group the text words as message keys
            .groupBy((key, value) -> value)

            // Count the occurrences of each word (message key).
            .count();

    wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));


    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    final Topology topology = builder.build();
    System.out.println(topology.describe());
    final KafkaStreams streams = new KafkaStreams(topology, props);


    try {
        streams.start();
        latch.await();
    } catch (Throwable e) {
    }
  

Комментарии:

1. Как вы проверили, выполнена ли обработка и вывод темы? При использовании console-consumer убедитесь, что вы указали флаг —from-beginning , на всякий случай, если он пропущен.

2. Попробуйте отправить больше сообщений в приведенном выше цикле и посмотрите результат, просто

3. Я использовал kafkatool для проверки данных темы. две средние темы генерируются в Kafka. Но тема вывода вообще не генерируется!

Ответ №1:

Groupby работает с оконным режимом, который по умолчанию составляет 1 день. Итак, чтобы перезапустить поток в другой теме, окно должно быть закрыто. Поэтому решением является закрытие окна или установка низкого размера окна, которое будет закрыто при запуске приложения.

Я решил проблему, закрыв поток.

 streams.close();
  

Ответ №2:

На самом деле, установка интервала фиксации на 0 немедленно отправит любые данные журнала изменений в тему вывода:

     props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, String.valueOf(0));