Проблема с сериализацией Confluent Kafka Avro

#scala #apache-kafka #avro #confluent-platform

Вопрос:

Я запускаю один и тот же код в слиянии облаков, и один работает на не облачном сервере, размещенном в слиянии облаков, и когда я проверяю пользовательский интерфейс в разделе, я вижу, что публикуются разные типы данных

Значение в пользовательском интерфейсе слияния облаков для темы � 3-Jan-20N/AMale 35-49Vehicle?AlabamaLost TimeDayFabrication3367Fri�

Значение в другом облаке слияния находится в форме правильного JSON с соответствующими парами ключ значение

Нравится

{"Date": "3-Jan-20", "InjuryLocation": "Eye", "Gender": "Male", "AgeGroup": "18-24", "IncidentType": "Cut", "DaysLost": 0.0, "Plant": "Georgia", "ReportType": "Near Miss", "Shift": "Day", "Department": "Administration", "IncidentCost": "0", "WkDay": "Fri", "Month": 1, "Year": 2020}

Может ли кто-нибудь сообщить мне об этом, когда мы создадим сообщения Avro на тему о том, какие записи мы видим в значении, когда проверяем пользовательский интерфейс центра управления?

Код :

 import org.apache.avro.Schema.Parser
import org.apache.avro.generic.GenericData
import org.apache.kafka.clients.producer._

import java.io.{File, FileReader}
import java.util.Properties
import scala.Console.println
import scala.io.Source

object Avro extends App
{

  val configFileName = "src/main/config/java.config"
  val topicName = "avro"
  val props = buildProperties(configFileName)
  val producer = new KafkaProducer[String, GenericData.Record](props)
  val callback = new Callback {
    override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
      Option(exception) match {
        case Some(err) => println(s"Failed to produce: $err")
        case None => println(s"Produced record at $metadata")
      }
    }
  }
  println("Props:"   props)
  println("Producer:"   producer)

  val schemaParser = new Parser

  val key = "key1"
  val valueSchemaAvro = schemaParser.parse(new File("src/main/schema/avro.avsc"))
  println("valueSchemaAvro"   valueSchemaAvro)
  val avroRecord = new GenericData.Record(valueSchemaAvro)

  val fileName = "src\main\csvfile\avro.csv"
  for (line <- Source.fromFile(fileName).getLines().drop(1))
  { 
    //    val id = line.split(",") {0}.toInt
    //    val name = line.split(",") {1}
    //    val dept = line.split(",") {2}
    //    avroRecord.put("id", line.split(",") {0}.toInt)
    //    avroRecord.put("name", line.split(",") {1})
    //    avroRecord.put("dept", line.split(",") {2})

    avroRecord.put("Date", line.split(",") {0})
    avroRecord.put("InjuryLocation", line.split(",") {1})
    avroRecord.put("Gender", line.split(",") {2})
    avroRecord.put("AgeGroup", line.split(",") {3})
    avroRecord.put("IncidentType", line.split(",") {4})
    avroRecord.put("DaysLost", line.split(",") {5}.toFloat)
    avroRecord.put("Plant", line.split(",") {6})
    avroRecord.put("ReportType", line.split(",") {7})
    avroRecord.put("Shift", line.split(",") {8})
    avroRecord.put("Department", line.split(",") {9})
    avroRecord.put("IncidentCost", line.split(",") {10})
    avroRecord.put("WkDay", line.split(",") {11})
    avroRecord.put("Month", line.split(",") {12}.toInt)
    avroRecord.put("Year", line.split(",") {13}.toInt)
    try {
      val record = new ProducerRecord(topicName, key, avroRecord)
      println("avroRecord"   avroRecord)
      val ack = producer.send(record, callback).get()
      println(s"${ack.toString} written to partition ${ack.partition.toString}")
    }
    catch {
      case e: Throwable =>
        println(e.getMessage)
    }

  }

  def buildProperties(configFileName: String): Properties = {
    val properties: Properties = new Properties
    properties.load(new FileReader(configFileName))
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer")
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer")
    properties
  }

}
 

Комментарии:

1. Ваш код кажется прекрасным (хотя вы действительно могли бы создать определенный класс Scala/Java, а не использовать универсальную запись). Вы уверены, что пользовательский интерфейс знает о десериализации как Avro? Кроме того, ваши ключи-это просто строки, так что вам действительно не следует использовать для них Avro