#scala #apache-kafka #apache-flink #flink-streaming
Вопрос:
Мой класс клиентов уже создан с помощью плагина maven-avro.Когда я пытаюсь запустить эту программу, я получаю ошибку, как Exception in thread "main" java.lang.IllegalStateException: Expecting type to be a PojoTypeInfo
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.example.Customer does not contain a setter for field first_name
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class com.example.Customer cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types amp; Serialization" for details of the effect on performance.
Я использую java 8
Мой класс клиентов, созданный с помощью плагина maven avro, имеет определенный тип записи
Пожалуйста, помогите мне, я потратил последние 5 дней на это, но все еще не решил
Я попробовал 3 разных метода и упомянул их как Метод 1, Метод 2 .. ниже
package com.example
import com.typesafe.config.ConfigException.Generic
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificRecord
import org.apache.flink.api.scala.createTypeInformation
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.flink.formats.avro.AvroDeserializationSchema
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
import java.util.Properties
object flink_kafka_avro extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties
properties.setProperty("bootstrap.servers", "127.0.0.1:9092")
properties.put("group.id", "customer-consumer-group-v1")
properties.put("auto.commit.enable", "false")
properties.put("auto.offset.reset", "earliest")
import org.apache.avro.Schema
import org.apache.avro.reflect.ReflectData
val schema = ReflectData.get.getSchema(classOf[Customer])
// Method 1 not working
//val ss = new FlinkKafkaConsumer[Customer]("customer-avro", AvroDeserializationSchema.forSpecific(classOf[Customer]),properties)
val schemaRegistryUrl = "http://localhost:8081"
//Method 2
val userKafkaReaderResult = env.addSource(new FlinkKafkaConsumer[Customer]("customer-avro",
ConfluentRegistryAvroDeserializationSchema.forSpecific(classOf[Customer],schemaRegistryUrl), properties).setStartFromEarliest())
userKafkaReaderResult.print()
//Method 3
// I tried like this it is not working even
//val strenew = FlinkKafkaConsumer[Customer]("test_topic", AvroDeserializationSchema.forSpecific(classOf[Customer]), properties).setStartFromEarliest
//env.addSource(ss).print()
env.execute()
}
Мой файл POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>org.example</groupId>
<artifactId>kafkaavrov1</artifactId>
<version>1.0-SNAPSHOT</version>
<modelVersion>4.0.0</modelVersion>
<properties>
<avro.version>1.8.2</avro.version>
<kafka.version>0.11.0.1</kafka.version>
<confluent.version>3.3.1</confluent.version>
</properties>
<!--necessary to resolve confluent dependencies-->
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.13.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.13.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro-confluent-registry -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>1.13.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.13.2</version>
</dependency>
<!--Only dependency needed for the avro part-->
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
<!--dependencies needed for the kafka part-->
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-runner</artifactId>
<version>1.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>5.2.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>dd
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
<stringType>String</stringType>
<createSetters>false</createSetters>
<enableDecimalLogicalType>true</enableDecimalLogicalType>
<fieldVisibility>private</fieldVisibility>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>target/generated-sources/avro</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>