#apache-kafka #apache-kafka-streams #spring-cloud-stream
#apache-kafka #apache-kafka-streams #spring-cloud-stream
Вопрос:
не могли бы вы помочь настроить приложение spring cloud stream на основе Kafka, я столкнулся с проблемой при работе с selectKey.
Давайте объясним, что я пытаюсь связаться с 2 входящими темами Person, RefGenre Person содержат ключ Refgenre (по значению)
public class Person {
String nom;
String prenom;
String codeGenre; <<--- here is the key of the second topic refgenre
}
Итак, я использую оператор selectKey для подготовки моего потока перед операцией объединения.
новая тема создается с помощью selectByKey (my-app-KSTREAM-KEY-SELECT-0000000004-repartition), а затем возникает проблема с сериализацией :
Исключение в потоке «my-app-3c57b31c-28e5-4199-b07d-87f8940425ab-StreamThread-1» org.apache.kafka.streams.ошибки.Исключение StreamsException: исключение ClassCastException при создании данных для раздела my-app-KSTREAM-KEY-SELECT-0000000004-repartition. Сериализатор (ключ: org.apache.kafka.common.сериализация.StringSerializer / значение: полное состояние.serde.PersonWithGenreSerde) несовместим с фактическим типом ключа или значения (тип ключа: java.lang.Тип строки / значения: statefull.model.Человек). Измените Serdes по умолчанию в StreamConfig или укажите правильные Serdes с помощью параметров метода (например, при использовании DSL, #to(String topic, Produced<K, V> produced)
с Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))
помощью ).
Где я могу указать serde для этой темы перераспределения и могу ли я указать имя этой «внутренней» темы?
@Bean
public BiFunction<KStream<String, Person>, KTable<String, ReferentielGenre>, KStream<Long, PersonWithGenre>> joinKtable() {
return (persons, referentielGenres) ->
persons.selectKey((k,v) -> v.getCodeGenre())
.join(referentielGenres,
(person, genre) -> new PersonWithGenre(person.getNom(), person.getPrenom(),genre),
Joined.with(Serdes.String(), new PersonWithGenreSerde(), null));
}
вот полный код моей нерабочей работы: https://github.com/YohanAlard/joinkstream
Есть ли лучший способ справиться с этим случаем использования?
Комментарии:
1. ibb.co/GFdXWFW топология
2. Я думаю, что у вашего последнего
join
вызова есть некоторые проблемы. ВValueJoiner
(втором аргументеjoin
) первым аргументом является aPerson
, но вы передаетеSerde
типPersonWithGenreSerde
, который принимаетPersonWithGenre
type . Это должно бытьSerde
, которое может обрабатыватьPerson
типы.3. Еще раз спасибо 😉 есть ли способ назвать тему перераспределения с помощью нового ключа? автоматическое создание темы не разрешено нашим брокером. Есть ли лучший способ обеспечить тот же результат без затрат на создание темы?