Отправка событий json в kafka в нестроковом формате

#apache-spark #spark-structured-streaming

#apache-spark #spark-structured-streaming

Вопрос:

Я создал фрейм данных, как показано ниже, где я использовал метод to_json() для создания значения массива JSON.

  ---------------------------------------------------------------------------------------------------- 

|json_data                                                                                                  |
 ----------------------------------------------------------------------------------------------------------- 
|{"name":"sensor1","value-array":[{"time":"2020-11-27T01:01:00.000Z","sensorvalue":11.0,"tag1":"tagvalue"}]}|
 ----------------------------------------------------------------------------------------------------------- 
 

Я использую приведенный ниже метод для отправки фрейма данных в тему kafka.
Но когда я использую данные, которые были отправлены в тему kafka, я мог видеть, что данные json были упорядочены.

Код для отправки данных в kafka:

 outgoingDF.selectExpr("CAST(Key as STRING) as key", "to_json(struct(*)) AS value")
        .write
        .format("kafka")
        .option("topic", "topic_test")
        .option("kafka.bootstrap.servers", "localhost:9093")
        .option("checkpointLocation", checkpointPath)
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("kafka.security.protocol", "SASL_SSL")
        .option("truncate", false)
        .save()
 

В kafka принимаются строковые данные:

 {
    "name": "sensor1",
    "value-array": "[{"time":"2020-11-27T01:01:00.000Z","sensorvalue":11.0,"tag1":"tagvalue"}]"
}
 

Как мы можем отправить данные в раздел kafka, чтобы мы не видели строковые json в качестве выходных данных?

Ответ №1:

json_data имеет тип string , и вы снова переходите json_data к to_json(struct("*")) функции.

Проверьте value столбец, который отправляется в kafka.

 df.withColumn("value",to_json(struct($"*"))).show(false)
 ----------------------------------------------------------------------------------------------------------- ------------------------------------------------------------------------------------------------------------------------------------------- 
|json_data                                                                                                  |value                                                                                                                                      |
 ----------------------------------------------------------------------------------------------------------- ------------------------------------------------------------------------------------------------------------------------------------------- 
|{"name":"sensor1","value-array":[{"time":"2020-11-27T01:01:00.000Z","sensorvalue":11.0,"tag1":"tagvalue"}]}|{"json_data":"{"name":"sensor1","value-array":[{"time":"2020-11-27T01:01:00.000Z","sensorvalue":11.0,"tag1":"tagvalue"}]}"}|
 ----------------------------------------------------------------------------------------------------------- ------------------------------------------------------------------------------------------------------------------------------------------- 
 

Попробуйте приведенный ниже код.

  df
 .withColumn("value-array",array(struct($"time",$"sensorvalue",$"tag1")))
 .selectExpr("CAST(Key as STRING) as key",to_json(struct($"name",$"value-array")).as("value"))
 .write
 .format("kafka")
 .option("topic", "topic_test")
 .option("kafka.bootstrap.servers", "localhost:9093")
 .option("checkpointLocation", checkpointPath)
 .option("kafka.sasl.mechanism", "PLAIN")
 .option("kafka.security.protocol", "SASL_SSL")
 .option("truncate", false)
 .save()