createDirectStream с сообщениями avro

#scala #apache-kafka #avro

#scala #апачи-кафка #avro

Вопрос:

В первый момент мне пришлось обрабатывать информацию из текстового файла: C1_4, C2_4, C1______10,01/12/2015,30/12/2015,123456789, S,12345

Теперь мне нужно обработать ту же информацию, но в формате avro. Как я могу это сделать?

До того, как я использовал этот код:

 createDirectStream[String, String, StringDecoder, StringDecoder](ssc,
  Map("metadata.broker.list" -> brokerlist, "auto.offset.reset" ->    "smallest"),
  Set(topic))
 

Затем я обработал сообщения таким образом:

 val logStream = kafkaStream.map(pair => pair._2)

logStream.foreachRDD(
  rdd => {
    val new_rdd = rdd.map(f = line => {

      val ent = line.split(Utils.COMMA_DELIMITER)(0)
      val cenReg = line.split(Utils.COMMA_DELIMITER)(1)
      .......

      .......
    })
  })
 

Теперь я пытался сделать что-то подобное:

   createDirectStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](ssc,
  Map("metadata.broker.list" -> brokerlist, "auto.offset.reset" -> "smallest"),
  Set(topic))


Also I have to use a SchemaRegistry that the client give me.


How can I process the messages avro ?
 

Комментарии:

1. Какая-нибудь идея, пожалуйста?