#scala #apache-kafka #avro
#scala #апачи-кафка #avro
Вопрос:
В первый момент мне пришлось обрабатывать информацию из текстового файла: C1_4, C2_4, C1______10,01/12/2015,30/12/2015,123456789, S,12345
Теперь мне нужно обработать ту же информацию, но в формате avro. Как я могу это сделать?
До того, как я использовал этот код:
createDirectStream[String, String, StringDecoder, StringDecoder](ssc,
Map("metadata.broker.list" -> brokerlist, "auto.offset.reset" -> "smallest"),
Set(topic))
Затем я обработал сообщения таким образом:
val logStream = kafkaStream.map(pair => pair._2)
logStream.foreachRDD(
rdd => {
val new_rdd = rdd.map(f = line => {
val ent = line.split(Utils.COMMA_DELIMITER)(0)
val cenReg = line.split(Utils.COMMA_DELIMITER)(1)
.......
.......
})
})
Теперь я пытался сделать что-то подобное:
createDirectStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](ssc,
Map("metadata.broker.list" -> brokerlist, "auto.offset.reset" -> "smallest"),
Set(topic))
Also I have to use a SchemaRegistry that the client give me.
How can I process the messages avro ?
Комментарии:
1. Какая-нибудь идея, пожалуйста?