#apache-spark #spark-structured-streaming
#apache-spark #spark-structured-streaming
Вопрос:
Я создал фрейм данных, как показано ниже, где я использовал метод to_json() для создания значения массива JSON.
----------------------------------------------------------------------------------------------------
|json_data |
-----------------------------------------------------------------------------------------------------------
|{"name":"sensor1","value-array":[{"time":"2020-11-27T01:01:00.000Z","sensorvalue":11.0,"tag1":"tagvalue"}]}|
-----------------------------------------------------------------------------------------------------------
Я использую приведенный ниже метод для отправки фрейма данных в тему kafka.
Но когда я использую данные, которые были отправлены в тему kafka, я мог видеть, что данные json были упорядочены.
Код для отправки данных в kafka:
outgoingDF.selectExpr("CAST(Key as STRING) as key", "to_json(struct(*)) AS value")
.write
.format("kafka")
.option("topic", "topic_test")
.option("kafka.bootstrap.servers", "localhost:9093")
.option("checkpointLocation", checkpointPath)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("truncate", false)
.save()
В kafka принимаются строковые данные:
{
"name": "sensor1",
"value-array": "[{"time":"2020-11-27T01:01:00.000Z","sensorvalue":11.0,"tag1":"tagvalue"}]"
}
Как мы можем отправить данные в раздел kafka, чтобы мы не видели строковые json в качестве выходных данных?
Ответ №1:
json_data
имеет тип string
, и вы снова переходите json_data
к to_json(struct("*"))
функции.
Проверьте value
столбец, который отправляется в kafka.
df.withColumn("value",to_json(struct($"*"))).show(false)
----------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------------------
|json_data |value |
----------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------------------
|{"name":"sensor1","value-array":[{"time":"2020-11-27T01:01:00.000Z","sensorvalue":11.0,"tag1":"tagvalue"}]}|{"json_data":"{"name":"sensor1","value-array":[{"time":"2020-11-27T01:01:00.000Z","sensorvalue":11.0,"tag1":"tagvalue"}]}"}|
----------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------------------
Попробуйте приведенный ниже код.
df
.withColumn("value-array",array(struct($"time",$"sensorvalue",$"tag1")))
.selectExpr("CAST(Key as STRING) as key",to_json(struct($"name",$"value-array")).as("value"))
.write
.format("kafka")
.option("topic", "topic_test")
.option("kafka.bootstrap.servers", "localhost:9093")
.option("checkpointLocation", checkpointPath)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("truncate", false)
.save()