#java #apache-spark #apache-kafka #avro #spark-structured-streaming
#java #apache-spark #apache-kafka #avro #spark-structured-streaming
Вопрос:
Я пытаюсь применить непрерывную потоковую передачу spark, используя Kafka в качестве источника чтения в проекте, использующем реестр схем. и мое сообщение Kafka расширяется SpecificAvroRecord
[в котором есть поле схемы (org.apache.avro.Схема)].
Насколько я понимаю, ссылки на себя не поддерживаются spark. итак, каков наилучший способ интегрировать реестр схемы Kafka [конкретную запись] с непрерывной потоковой передачей spark?
мой тестовый код:
SparkSession spark = SparkSession.builder().appName("testpro")
.master("local[2]").getOrCreate();
Dataset<Row> df = spark.readStream()
.format("kafka").option("kafka.bootstrap.servers", "192.168.68.1:9092,192.168.204.1:9092")
.option("subscribe", "testTopic")
.option("startingOffsets", "latest").load();
Dataset<Data> messages = df.selectExpr("CAST(value AS STRING)").map(value -> new Data(value.toString()), Encoders.bean(Data.class));
Класс сообщений
public class Data extends SpecificAvroRecord {
private static final long serialVersionUID = 1L;
private String firstName;
private String lastName;
public String name;
public Data() {}
........setters and getters and string constructore
SpecificAvroRecord:
public abstract class SpecificAvroRecord extends org.apache.avro.specific.SpecificRecordBase {
protected Schema schema;
}
Исключение :
Exception in thread "main" java.lang.UnsupportedOperationException: Cannot have circular references in bean class, but got the circular reference of class class org.apache.avro.Schema
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:126)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$1.apply(JavaTypeInference.scala:136)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$1.apply(JavaTypeInference.scala:134)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:134)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$1.apply(JavaTypeInference.scala:136)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$1.apply(JavaTypeInference.scala:134)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:134)
at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:55)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:86)
at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
at org.apache.spark.sql.Encoders.bean(Encoders.scala)
at com.eventumsolutions.nms.spark.services.analyzer.app.contStr.main(contStr.java:50)
```
Комментарии:
1. Вы захотите исключить зависимость Avro (и kafka-clients) из зависимости реестра схемы, но также приведение значения к строке, а затем попытка десериализовать его с помощью Avro не сработает
2. не могли бы вы уточнить, что вы подразумеваете под «вы захотите исключить Avro», пожалуйста?
3. В качестве зависимости Java с использованием Maven / Gradle, предполагая, что вы их используете