Класс, созданный Spring kafka AVRO, не найден

#java #spring-boot #apache-kafka #spring-kafka

Вопрос:

Я использую соединитель confluent JDBC для подключения к базе данных postgres, чтобы получить изменения и поместить их в раздел кафки. Теперь я хочу использовать эти сообщения с потребителем spring boot. Эти сообщения представлены в формате AVRO. У меня есть схема из соединителя, и я создал для нее класс POJO с помощью плагина avro maven.

Но при запуске прослушивателя возникает только следующая ошибка

    java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
       at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194) ~[spring-kafka-2.7.2.jar:2.7.2]
       at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.2.jar:2.7.2]
       at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1598) ~[spring-kafka-2.7.2.jar:2.7.2]
       at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.2.jar:2.7.2]
       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
       at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
   Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition ps_git_repo-0 at offset 0. If needed, please seek past the record to continue consumption.
   Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class ps_git_repo specified in writer's schema whilst finding reader's schema for a SpecificRecord.

 

Когда я не использую avro для десериализации данных, я буду получать данные, но нечитаемые.

В pom.xml У меня есть следующие зависимости

 
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.10.2</version>
    </dependency>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-avro-serializer</artifactId>
        <version>6.2.0</version>
        <exclusions>
            <exclusion>
                <artifactId>netty</artifactId>
                <groupId>io.netty</groupId>
            </exclusion>
        </exclusions>
    </dependency>

 

и в application.properties я добавил URL-адрес реестра десериализатора и схемы.

 
    spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer = io.confluent.kafka.serializers.KafkaAvroDeserializer
    spring.kafka.bootstrap-servers = http://localhost:9092
    spring.kafka.consumer.properties.specific.avro.reader = true
    spring.kafka.consumer.properties.schema.registry.url = http://localhost:8081

 

В сборке я использую плагин avro maven для создания POJO из схемы, созданной соединителем.

плагин в pom.xml

 
    <plugin>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-maven-plugin</artifactId>
        <version>1.10.2</version>
        <executions>
            <execution>
                <phase>generate-sources</phase>
                <goals>
                    <goal>schema</goal>
                </goals>
                <configuration>
                    <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                    <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                    <stringType>String</stringType>
                </configuration>
            </execution>
        </executions>
    </plugin>

 

I’ve put the following schema into the folder and generate to pojo with mvn generate-sources

Schema.avsc

 
    {
      "connect.name": "ps_git_repo",
      "fields": [
        {
          "name": "id",
          "type": "long"
        },
        {
          "default": null,
          "name": "name",
          "type": [
            "null",
            "string"
          ]
        }
      ],
      "name": "ps_git_repo",
      "namespace": "com.company.api.kafkademo",
      "type": "record"
    }

 

Я понимаю, что ps_git_repo.java класс в правильном, тогда у меня есть этот прослушиватель для извлечения сообщений.

     @SpringBootApplication
    @EnableKafka
    public class KafkaDemoApplication {
    
        @KafkaListener(groupId = "test123", topics = "ps_git_repo_test")
        public void handleMessage(ps_git_repo message) {
            System.out.println(message);
        }
    
        public static void main(String[] args) {
            SpringApplication.run(KafkaDemoApplication.class, args);
        }
    
    }

 

Схема не может быть найдена.

ИЗМЕНИТЬ: Решено

Десериализатор использовал connect.name поле вместо пространства имен, чтобы найти правильный класс.

Я добавил следующие строки в конфигурацию JDBC_connector, чтобы позволить соединителю создать правильное пространство имен

 "transforms":"AddNamespace",
"transforms.AddNamespace.type":"org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.AddNamespace.schema.name": "com.company.api.kafkademo.ps_git_repo"
 

Комментарии:

1. Вы уверены namespace , что это правильно?

2. Класс создается в правильных пакетах и может быть импортирован в код, поэтому я предполагаю, что пространство имен правильное

3. Итак, вы сами создали схему с этим пространством имен? Или вы загрузили схему из реестра, а затем добавили/изменили пространство имен? Соединитель источника JDBC по умолчанию не использует это имя пространства имен (даже не уверен, что его вообще можно изменить), и поэтому анализатор Avro думает, что вы пытаетесь использовать два разных класса

4. Да, в этом-то и была проблема. В «connector.name» использовался как полный путь. Так что это действительно было пространство имен. Сейчас это работает. Спасибо!

5. Круто. Бесплатно бесплатно ответьте на свой собственный пост ниже, а не обновляйте вопрос

Ответ №1:

Решенный

Десериализатор использовал connect.name поле вместо пространства имен, чтобы найти правильный класс.

Я добавил следующие строки в конфигурацию JDBC_connector, чтобы позволить соединителю создать правильное пространство имен

 "transforms":"AddNamespace",
"transforms.AddNamespace.type":"org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.AddNamespace.schema.name": "com.company.api.kafkademo.ps_git_repo"