Spark Streaming MQTT — применить схему к набору данных

#json #apache-spark #streaming #mqtt

#json #apache-spark #потоковая передача #mqtt

Вопрос:

Я работаю над блоками данных (Spark 2.0.1-db1 (Scala 2.11)) и пытаюсь использовать функции Spark Streaming. Я использую библиотеки:
— spark-sql-streaming-mqtt_2.11-2.1.0-SNAPSHOT.jar (смотрите здесь:http://bahir.apache.org/docs/spark/current/spark-sql-streaming-mqtt /)

Следующая команда дает мне набор данных :

 val lines = spark.readStream  
      .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")  
      .option("clientId", "sparkTest")  
      .option("brokerUrl", "tcp://xxx.xxx.xxx.xxx:xxx")  
      .option("topic", "/Name/data")  
      .option("localStorage", "dbfs:/models/mqttPersist")  
      .option("cleanSession", "true")  
      .load().as[(String, Timestamp)]  
  

с помощью этой схемы печати :

 root  
 |-- value : string (nullable : true)  
 |-- timestamp : timestamp (nullable : true)  
  

И я хотел бы применить схему к столбцу «значение» моего набора данных. вы можете увидеть мою схему json следующим образом.

 root  
 |-- id : string (nullable = true)  
 |-- DateTime : timestamp (nullable = true)  
 |-- label : double (nullable = true)  
  

Возможно ли напрямую проанализировать мой json в потоке, чтобы получить что-то подобное :

 root   
 |-- value : struct (nullable : true)  
   |-- id : string (nullable = true)  
   |-- DateTime : timestamp (nullable = true)  
   |-- label : double (nullable = true)  
 |-- timestamp : timestamp (nullable : true)  
  

На данный момент я не вижу никакого способа проанализировать json из mqtt, и любая помощь была бы очень полезной.

Заранее благодарю.

Ответ №1:

Сегодня у меня была точно такая же проблема! Я использовал json4s и Jackson для анализа json.

Как я получаю потоковый набор данных (почти такой же, как у вас):

  val lines = spark.readStream
   .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
   .option("topic", topic)
   .option("brokerUrl",brokerUrl)
   .load().as[(String,Timestamp)]
  

Я определил схему, используя класс case:

   case class  DeviceData(devicename: String, time: Long, metric: String, value: Long, unit: String)
  

Анализ столбца JSON с использованием org.json4s.jackson.JsonMethods.parse:

 val ds = lines.map {
  row =>
    implicit val format = DefaultFormats
    parse(row._1).extract[DeviceData]
}
  

Вывод результатов:

 val query = ds.writeStream
  .format("console")
  .option("truncate", false)
  .start()
  

Результаты:

  ---------- ------------- ----------- ----- ---- 
|devicename|time         |metric     |value|unit|
 ---------- ------------- ----------- ----- ---- 
|dht11_4   |1486656575772|temperature|9    |C   |
|dht11_4   |1486656575772|humidity   |36   |%   |
 ---------- ------------- ----------- ----- ---- 
  

Я отчасти разочарован, что не могу найти решение, которое использует собственный синтаксический анализ json Sparks. Вместо этого мы должны полагаться на Джексона. Вы можете использовать собственный синтаксический анализ json от spark, если вы читаете файл как поток. Как так:

 val lines = spark.readStream
  .....
  .json("./path/to/file").as[(String,Timestamp)]
  

Но для MQTT мы не можем этого сделать.