Kafkastream springcloud kafka присоединиться к selectKey

#apache-kafka #apache-kafka-streams #spring-cloud-stream

#apache-kafka #apache-kafka-streams #spring-cloud-stream

Вопрос:

не могли бы вы помочь настроить приложение spring cloud stream на основе Kafka, я столкнулся с проблемой при работе с selectKey.

Давайте объясним, что я пытаюсь связаться с 2 входящими темами Person, RefGenre Person содержат ключ Refgenre (по значению)

 public class Person {
    String nom;
    String prenom;
    String codeGenre;  <<--- here is the key of the second topic refgenre 
}
 

Итак, я использую оператор selectKey для подготовки моего потока перед операцией объединения.

новая тема создается с помощью selectByKey (my-app-KSTREAM-KEY-SELECT-0000000004-repartition), а затем возникает проблема с сериализацией :

Исключение в потоке «my-app-3c57b31c-28e5-4199-b07d-87f8940425ab-StreamThread-1» org.apache.kafka.streams.ошибки.Исключение StreamsException: исключение ClassCastException при создании данных для раздела my-app-KSTREAM-KEY-SELECT-0000000004-repartition. Сериализатор (ключ: org.apache.kafka.common.сериализация.StringSerializer / значение: полное состояние.serde.PersonWithGenreSerde) несовместим с фактическим типом ключа или значения (тип ключа: java.lang.Тип строки / значения: statefull.model.Человек). Измените Serdes по умолчанию в StreamConfig или укажите правильные Serdes с помощью параметров метода (например, при использовании DSL, #to(String topic, Produced<K, V> produced) с Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class)) помощью ).

Где я могу указать serde для этой темы перераспределения и могу ли я указать имя этой «внутренней» темы?

 @Bean
public BiFunction<KStream<String, Person>, KTable<String, ReferentielGenre>, KStream<Long, PersonWithGenre>> joinKtable() {
    return (persons, referentielGenres) ->
            persons.selectKey((k,v) -> v.getCodeGenre())
                .join(referentielGenres,
                (person, genre) -> new PersonWithGenre(person.getNom(), person.getPrenom(),genre),
                Joined.with(Serdes.String(), new PersonWithGenreSerde(), null));
 

}

вот полный код моей нерабочей работы: https://github.com/YohanAlard/joinkstream

Есть ли лучший способ справиться с этим случаем использования?

Комментарии:

1. ibb.co/GFdXWFW топология

2. Я думаю, что у вашего последнего join вызова есть некоторые проблемы. В ValueJoiner (втором аргументе join ) первым аргументом является a Person , но вы передаете Serde тип PersonWithGenreSerde , который принимает PersonWithGenre type . Это должно быть Serde , которое может обрабатывать Person типы.

3. Еще раз спасибо 😉 есть ли способ назвать тему перераспределения с помощью нового ключа? автоматическое создание темы не разрешено нашим брокером. Есть ли лучший способ обеспечить тот же результат без затрат на создание темы?