Интеграция реестра схемы Kafka и spark с непрерывным потоком, исключение: циклическая ссылка класса класса org.apache.avro.Схема

#java #apache-spark #apache-kafka #avro #spark-structured-streaming

#java #apache-spark #apache-kafka #avro #spark-structured-streaming

Вопрос:

Я пытаюсь применить непрерывную потоковую передачу spark, используя Kafka в качестве источника чтения в проекте, использующем реестр схем. и мое сообщение Kafka расширяется SpecificAvroRecord [в котором есть поле схемы (org.apache.avro.Схема)].

Насколько я понимаю, ссылки на себя не поддерживаются spark. итак, каков наилучший способ интегрировать реестр схемы Kafka [конкретную запись] с непрерывной потоковой передачей spark?

мой тестовый код:

     SparkSession spark = SparkSession.builder().appName("testpro")
        .master("local[2]").getOrCreate();

    Dataset<Row> df = spark.readStream()
        .format("kafka").option("kafka.bootstrap.servers", "192.168.68.1:9092,192.168.204.1:9092")
        .option("subscribe", "testTopic")
        .option("startingOffsets", "latest").load();

    Dataset<Data> messages = df.selectExpr("CAST(value AS STRING)").map(value -> new Data(value.toString()), Encoders.bean(Data.class));
  

Класс сообщений

 public class Data extends SpecificAvroRecord {

  private static final long serialVersionUID = 1L;
  private String firstName;
  private String lastName;
  public String name;

  public Data() {}
   ........setters and getters and string constructore
  

SpecificAvroRecord:

 public abstract class SpecificAvroRecord extends org.apache.avro.specific.SpecificRecordBase {
  protected Schema schema;
}
  

Исключение :

 Exception in thread "main" java.lang.UnsupportedOperationException: Cannot have circular references in bean class, but got the circular reference of class class org.apache.avro.Schema
    at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:126)
    at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$1.apply(JavaTypeInference.scala:136)
    at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$1.apply(JavaTypeInference.scala:134)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:134)
    at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$1.apply(JavaTypeInference.scala:136)
    at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$1.apply(JavaTypeInference.scala:134)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:134)
    at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:55)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:86)
    at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
    at org.apache.spark.sql.Encoders.bean(Encoders.scala)
    at com.eventumsolutions.nms.spark.services.analyzer.app.contStr.main(contStr.java:50)
    ```
  

Комментарии:

1. Вы захотите исключить зависимость Avro (и kafka-clients) из зависимости реестра схемы, но также приведение значения к строке, а затем попытка десериализовать его с помощью Avro не сработает

2. не могли бы вы уточнить, что вы подразумеваете под «вы захотите исключить Avro», пожалуйста?

3. В качестве зависимости Java с использованием Maven / Gradle, предполагая, что вы их используете