#scala #apache-kafka #avro #confluent-platform
Вопрос:
Я запускаю один и тот же код в слиянии облаков, и один работает на не облачном сервере, размещенном в слиянии облаков, и когда я проверяю пользовательский интерфейс в разделе, я вижу, что публикуются разные типы данных
Значение в пользовательском интерфейсе слияния облаков для темы � 3-Jan-20N/AMale 35-49Vehicle?AlabamaLost TimeDayFabrication3367Fri�
Значение в другом облаке слияния находится в форме правильного JSON с соответствующими парами ключ значение
Нравится
{"Date": "3-Jan-20", "InjuryLocation": "Eye", "Gender": "Male", "AgeGroup": "18-24", "IncidentType": "Cut", "DaysLost": 0.0, "Plant": "Georgia", "ReportType": "Near Miss", "Shift": "Day", "Department": "Administration", "IncidentCost": "0", "WkDay": "Fri", "Month": 1, "Year": 2020}
Может ли кто-нибудь сообщить мне об этом, когда мы создадим сообщения Avro на тему о том, какие записи мы видим в значении, когда проверяем пользовательский интерфейс центра управления?
Код :
import org.apache.avro.Schema.Parser
import org.apache.avro.generic.GenericData
import org.apache.kafka.clients.producer._
import java.io.{File, FileReader}
import java.util.Properties
import scala.Console.println
import scala.io.Source
object Avro extends App
{
val configFileName = "src/main/config/java.config"
val topicName = "avro"
val props = buildProperties(configFileName)
val producer = new KafkaProducer[String, GenericData.Record](props)
val callback = new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
Option(exception) match {
case Some(err) => println(s"Failed to produce: $err")
case None => println(s"Produced record at $metadata")
}
}
}
println("Props:" props)
println("Producer:" producer)
val schemaParser = new Parser
val key = "key1"
val valueSchemaAvro = schemaParser.parse(new File("src/main/schema/avro.avsc"))
println("valueSchemaAvro" valueSchemaAvro)
val avroRecord = new GenericData.Record(valueSchemaAvro)
val fileName = "src\main\csvfile\avro.csv"
for (line <- Source.fromFile(fileName).getLines().drop(1))
{
// val id = line.split(",") {0}.toInt
// val name = line.split(",") {1}
// val dept = line.split(",") {2}
// avroRecord.put("id", line.split(",") {0}.toInt)
// avroRecord.put("name", line.split(",") {1})
// avroRecord.put("dept", line.split(",") {2})
avroRecord.put("Date", line.split(",") {0})
avroRecord.put("InjuryLocation", line.split(",") {1})
avroRecord.put("Gender", line.split(",") {2})
avroRecord.put("AgeGroup", line.split(",") {3})
avroRecord.put("IncidentType", line.split(",") {4})
avroRecord.put("DaysLost", line.split(",") {5}.toFloat)
avroRecord.put("Plant", line.split(",") {6})
avroRecord.put("ReportType", line.split(",") {7})
avroRecord.put("Shift", line.split(",") {8})
avroRecord.put("Department", line.split(",") {9})
avroRecord.put("IncidentCost", line.split(",") {10})
avroRecord.put("WkDay", line.split(",") {11})
avroRecord.put("Month", line.split(",") {12}.toInt)
avroRecord.put("Year", line.split(",") {13}.toInt)
try {
val record = new ProducerRecord(topicName, key, avroRecord)
println("avroRecord" avroRecord)
val ack = producer.send(record, callback).get()
println(s"${ack.toString} written to partition ${ack.partition.toString}")
}
catch {
case e: Throwable =>
println(e.getMessage)
}
}
def buildProperties(configFileName: String): Properties = {
val properties: Properties = new Properties
properties.load(new FileReader(configFileName))
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer")
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer")
properties
}
}
Комментарии:
1. Ваш код кажется прекрасным (хотя вы действительно могли бы создать определенный класс Scala/Java, а не использовать универсальную запись). Вы уверены, что пользовательский интерфейс знает о десериализации как Avro? Кроме того, ваши ключи-это просто строки, так что вам действительно не следует использовать для них Avro