#spring #apache-kafka #apache-kafka-streams
Вопрос:
Я создал конечную точку отдыха, откуда поступают данные, и я отправляю их в тему кафки «тема-обратная связь» . Теперь я хочу обработать эти данные и отправить их в другую тему кафки «тема-хорошее слово». Теперь из конечной точки rest данные поступают в раздел «тема-обратная связь» правильно , но после их обработки и отправки в другую тему «тема-хорошее слово» данные не отображаются в разделе «тема-хорошее слово».
В программе нет ошибок
Вот мой код:
@Configuration
public class FeedbackStream {
Set<String> GOOD_WORDS = Set.of("happy", "good", "helpful");
@Bean
public KStream<String,String> KStreamFeedback(StreamsBuilder builder)
{
var stringSerde = Serdes.String();
var feedbackSerde = new JsonSerde<>(Feedback.class);
((JsonDeserializer) feedbackSerde.deserializer()).setUseTypeHeaders(false);
var sourceStream = builder.stream("topic-feedback", Consumed.with(stringSerde,feedbackSerde))
.flatMapValues(mapperGoodWords());
sourceStream.to("topic-goodWord", Produced.with(stringSerde,stringSerde));
return sourceStream;
}
private ValueMapper<Feedback,Iterable<String> > mapperGoodWords()
{
return feedback -> Arrays
.asList(feedback.getFeedback().replaceAll("[^a-zA-Z]", "").toLowerCase()
.split("\s ")).stream()
.filter(word -> GOOD_WORDS.contains(word)).distinct().collect(Collectors.toList());
}
Feedback.java
public class Feedback {
private String location;
private int rating;
private String feedback;
// getters,setters
}
конфигурация потока кафки
@Configuration
@EnableKafkaStreams
public class KafkaStreamConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kafkaStreamConfig()
{
var props = new HashMap<String, Object>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS,"false");
return new KafkaStreamsConfiguration(props);
}
}
Где я ошибаюсь ?
Комментарии:
1. Вы создали топологию KStream, но где вы на самом деле ее запускаете, чтобы любые данные считывались/обрабатывались?
2. @OneCricketeer Я создал конечную точку отдыха, оттуда поступают данные, и я отправляю их в тему кафки «тема-обратная связь» . Теперь я хочу обработать эти данные и отправить их в другую тему кафки «тема-хорошее слово». Теперь из конечной точки rest данные поступают в раздел «тема-обратная связь» правильно , но после их обработки и отправки в другую тему «тема-хорошее слово» данные не отображаются в разделе «тема-хорошее слово». И в программе нет никакой ошибки.
3. Я понимаю, что пытается сделать ваш код, но неясно, звонили ли вы
start()
, как показано в ответе. Все, что вы здесь сделали, — это определили некоторые методы и компонент. Возможно, ошибки нет, потому что вы никогда нигде не загружаете этот компонент для запуска потока4. @ OneCricketeer В моей предыдущей программе практики на тему кафки я не использовал
start()
, но они отлично работали. Теперь я создал новую тему, но, похоже, я не упомянул о новом потоке в конфигурации. В этом ли проблема ? Я в замешательстве. Пожалуйста, помогите мне.5. Я не знаю, какую библиотеку Spring вы в конечном итоге используете здесь, но это не связыватель потоков Кафки, основанный на документации cloud.spring.io/spring-cloud-static/…
Ответ №1:
Тушар, вот в чем проблема, которую я вижу в потоках кафки, иногда она съедает эти исключения. но в любом случае, теперь ваша проблема, если она не доходит до другой темы, означает, что это может быть проблема сериализации. также вы делаете это так, как я здесь не вижу.
var kafkaStreams = new KafkaStreams(sourceStream, streamsConfiguration);
kafkaStreams.start();
Комментарии:
1. Я добавил конфигурацию потока кафки. Кроме того, у меня есть файлы, которые подходят для другой темы. На этот раз я создал новую тему. Но я не настроил новый поток( канал ) новой темы. В этом ли проблема ? потому что это 1-я программа новой темы и нового потока, которая не показывает результат. И извините, но я также не использовал ваш приведенный выше код в своей предыдущей программе, но они работают нормально. Пожалуйста, направь меня, я в замешательстве.