Производитель приложений Kafka spring boot и не может отразить это с помощью соединителя Kafka Sink с форматом Avro

#spring-boot #apache-kafka #avro #kafka-producer-api #apache-kafka-connect

#spring-boot #apache-kafka #avro #kafka-producer-api #apache-kafka-connect

Вопрос:

Моя цель — у меня есть spring boot application kafka producer со свойством сериализации Avro, и я исключаю сообщение, которое отправляется в соответствующую тему, к которой следует обращаться с помощью соединителя confluent Sink Connector и вставлять в таблицы базы данных mysql / Oracle, я могу создать Avro serialize, а потребитель spring boot может десериализовать Avro, но мой соединитель приемника не работает, я не могу определить, какой вид полезной нагрузки использует соединитель приемника, и как следует быть производителем spring boot, закодированным для отправки сообщения таким образом, чтобы соединитель приемника мог справиться с этими свойствами

Заранее спасибо 🙂

Это приложение.yml в приложении Spring boot

 server: 
    port: 9000
    spring.kafka:
    bootstrap-servers: "localhost:9092"
    properties:
      schema.registry.url: "http://localhost:8081"
      specific.avro.reader: true
    producer:
      key-serializer: "io.confluent.kafka.serializers.KafkaAvroSerializer"
      value-serializer: "io.confluent.kafka.serializers.KafkaAvroSerializer"
    app:
      topic: event_pay2
  

Это полезная нагрузка для создания схемы из приложения Spring boot

 {
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "userid"
      },
      {
        "type": "string",
        "optional": false,
        "field": "regionid"
      },
      {
        "type": "string",
        "optional": false,
        "field": "gender"
      }
    ],
    "optional": false,
    "name": "oracle.user"
  },
  "payload": {
    "userid": "User_1",
    "regionid": "Region_5",
    "gender": "FEMALE"
  }
}
  

Pom.xml

 <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.5.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.kafka</groupId>
<artifactId>kafka-producer-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-producer-example</name>
<description>Demo project for Spring Boot</description>

 <repositories>
    <repository>
        <id>confluent</id>
        <url>http://packages.confluent.io/maven</url>
    </repository>
</repositories>

<properties>
    <java.version>1.8</java.version>
    <confluent.version>4.0.0</confluent.version>
</properties>

<dependencies>
<dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
     <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-avro-serializer</artifactId>
        <version>${confluent.version}</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.8.2</version>
</dependency>
</dependencies>

<build>
 <pluginManagement>
    <plugins>
    <plugin>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-maven-plugin</artifactId>
        <version>1.8.2</version>
        <executions>
            <execution>
                <phase>generate-sources</phase>
                <goals>
                    <goal>schema</goal>
                </goals>
                <configuration>
                    <sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory>
                    <outputDirectory>${project.build.directory}/generated/avro</outputDirectory>
                </configuration>
            </execution>
        </executions>
    </plugin>


        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
     </pluginManagement>
</build>

</project>
  

Это мой вызов rest, как я продвигаю messgae в тему kafka

 @PostMapping("/publish/avrodata")
public String sendMessage(@RequestBody String request) {
    sender.send(request);
    return "Published successfully";
}
  

Наконец-то мой sink connector

 "name": "JDBC_Sink_EVENT_PAY",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "event_pay2",
"connection.url": "jdbc:mysql://localhost:3306/user",
"connection.user": "****",
"connection.password": "****",
"auto.create": "true",
"auto.evolve":"true",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"key.converter.schemas.enable": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
 "value.converter.schemas.enable": "true"
  

Комментарии:

1. Каково состояние вашего коннектора, он запущен? Вы находите какие-либо ошибки в журнале подключения.?

2. Да .. он в рабочем состоянии

3. —————————Ошибка: схема значений должна иметь тип Struct————————————- » полезная нагрузка»: { «идентификатор пользователя»: «User_1», «regionid»: «Region_5», «пол»: «ЖЕНСКИЙ» }

4. Проверьте состояние соединителя, http (ы)://имя хоста: 8083/connectors/JDBC_Sink_EVENT_PAY/status

5. Также проверьте схему, http (ы):// hostname: 8081/subjects/event_pay2-value/versions/latest

Ответ №1:

Всегда, всегда отлаживайте свою тему перед настройкой соединителя. Используйте kafka-avro-console-consumer для этого. Если это не сработает, то Connect AvroConverter , скорее всего, тоже не будет работать (по моему опыту), и вы можете уменьшить проблемное пространство.


Если я правильно прочитал ваш код, вы отправили строку, а не объект Avro

 public String sendMessage(@RequestBody String request) {
    sender.send(request);  // <--- here and ^^^^^^ here
    return "Published successfully";
}
  

Вместо этого вам нужно проанализировать ваш входной запрос в объект, который был создан как часть ваших /src/main/resources/avro данных схемы, а не просто пересылать через входящий запрос в виде строки.

И этот файл AVSC может выглядеть примерно так

 {
  "type": "Record",
  "namespace": "oracle.user",
  "name": "User",
  "fields": [
      { "type": "string", "name: "user_id" },
      { "type": "string", "name": "regionid" },
      { "type" "string", "name" "gender" }
   ]
}
  

Который создал бы oracle.user.User объект, из-за чего вам KafkaTemplate нужно было бы что-то вроде KafkaTemplate<String, oracle.user.User> sender

Комментарии:

1. Привет, спасибо за ценный ответ. Я изменил код, как вы предложили, теперь я получаю ошибку типа -java.io.NotSerializableException: com.kafka.kafkaproducerexample.modal. PayloadRequest——————- Здесь полезная нагрузка — это мой модальный класс

2. И при отправке сообщения в тему через соединитель kafka возникает проблема в connect-avro-disturbed.свойства, такие как ————— org.apache. kafka.connect.ошибки. ConnectException: схема значений должна иметь тип Struct——————— `

3. Что касается второй ошибки, похоже, вы все еще не отправляете Avro, поэтому ваши сериализаторы где-то настроены неправильно. Я не знаю о первой ошибке, но ваш входной объект не может быть того же класса, что и сгенерированный вами Avro

4. Примечание: я никогда не использовал Spring Kafka, но я знаю, что ваш входящий веб-запрос должен быть отделен от вашего создаваемого класса Avro… Или вы можете просто придерживаться JSON и позволить Connect просто использовать это, но опять же, забудьте о Connect. Используйте раздел на консоли для его отладки