поиск конкретных слов с помощью потока кафки

#spring #apache-kafka #apache-kafka-streams

Вопрос:

Я создал конечную точку отдыха, откуда поступают данные, и я отправляю их в тему кафки «тема-обратная связь» . Теперь я хочу обработать эти данные и отправить их в другую тему кафки «тема-хорошее слово». Теперь из конечной точки rest данные поступают в раздел «тема-обратная связь» правильно , но после их обработки и отправки в другую тему «тема-хорошее слово» данные не отображаются в разделе «тема-хорошее слово».

В программе нет ошибок

Вот мой код:

 @Configuration
public class FeedbackStream {
    
    Set<String> GOOD_WORDS = Set.of("happy", "good", "helpful");

    @Bean
    public KStream<String,String> KStreamFeedback(StreamsBuilder builder)
    {
        var stringSerde = Serdes.String();
        var feedbackSerde = new JsonSerde<>(Feedback.class);

        ((JsonDeserializer) feedbackSerde.deserializer()).setUseTypeHeaders(false);
        
         var sourceStream = builder.stream("topic-feedback", Consumed.with(stringSerde,feedbackSerde))
                                                        .flatMapValues(mapperGoodWords());
          
        sourceStream.to("topic-goodWord", Produced.with(stringSerde,stringSerde));

        return sourceStream;
    }





private ValueMapper<Feedback,Iterable<String> > mapperGoodWords() 
    {
        return feedback -> Arrays
                           .asList(feedback.getFeedback().replaceAll("[^a-zA-Z]", "").toLowerCase()
                           .split("\s ")).stream()
                           .filter(word -> GOOD_WORDS.contains(word)).distinct().collect(Collectors.toList());
    }

    
 

Feedback.java

 public class Feedback {
    
    private String location;
    private int rating;
    private String feedback;     
    
    // getters,setters
}
  
 
 

конфигурация потока кафки

 @Configuration
@EnableKafkaStreams
public class KafkaStreamConfig {
    
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamConfig()
    {
        var props = new HashMap<String, Object>();

        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream");
      
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
        props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS,"false");
      
        return new KafkaStreamsConfiguration(props);
    }
}
 
 

Где я ошибаюсь ?

Комментарии:

1. Вы создали топологию KStream, но где вы на самом деле ее запускаете, чтобы любые данные считывались/обрабатывались?

2. @OneCricketeer Я создал конечную точку отдыха, оттуда поступают данные, и я отправляю их в тему кафки «тема-обратная связь» . Теперь я хочу обработать эти данные и отправить их в другую тему кафки «тема-хорошее слово». Теперь из конечной точки rest данные поступают в раздел «тема-обратная связь» правильно , но после их обработки и отправки в другую тему «тема-хорошее слово» данные не отображаются в разделе «тема-хорошее слово». И в программе нет никакой ошибки.

3. Я понимаю, что пытается сделать ваш код, но неясно, звонили ли вы start() , как показано в ответе. Все, что вы здесь сделали, — это определили некоторые методы и компонент. Возможно, ошибки нет, потому что вы никогда нигде не загружаете этот компонент для запуска потока

4. @ OneCricketeer В моей предыдущей программе практики на тему кафки я не использовал start() , но они отлично работали. Теперь я создал новую тему, но, похоже, я не упомянул о новом потоке в конфигурации. В этом ли проблема ? Я в замешательстве. Пожалуйста, помогите мне.

5. Я не знаю, какую библиотеку Spring вы в конечном итоге используете здесь, но это не связыватель потоков Кафки, основанный на документации cloud.spring.io/spring-cloud-static/…

Ответ №1:

Тушар, вот в чем проблема, которую я вижу в потоках кафки, иногда она съедает эти исключения. но в любом случае, теперь ваша проблема, если она не доходит до другой темы, означает, что это может быть проблема сериализации. также вы делаете это так, как я здесь не вижу.

 var kafkaStreams = new KafkaStreams(sourceStream,  streamsConfiguration);
kafkaStreams.start();
 

Комментарии:

1. Я добавил конфигурацию потока кафки. Кроме того, у меня есть файлы, которые подходят для другой темы. На этот раз я создал новую тему. Но я не настроил новый поток( канал ) новой темы. В этом ли проблема ? потому что это 1-я программа новой темы и нового потока, которая не показывает результат. И извините, но я также не использовал ваш приведенный выше код в своей предыдущей программе, но они работают нормально. Пожалуйста, направь меня, я в замешательстве.