#scala #apache-spark #avro
#scala #apache-spark #avro
Вопрос:
Я пытаюсь преобразовать файл json в avro и наоборот.
Мой входной файл
[
{
"userId": 1,
"firstName": "Krish",
"lastName": "Lee",
"phoneNumber": "123456",
"emailAddress": "krish.lee@abc.com"
},
{
"userId": 2,
"firstName": "racks",
"lastName": "jacson",
"phoneNumber": "123456",
"emailAddress": "racks.jacson@abc.com"
}
]
Мой выходной файл
{"emailAddress":"krish.lee@abc.com","firstName":"Krish","lastName":"Lee","phoneNumber":"123456","userId":1}
{"emailAddress":"racks.jacson@abc.com","firstName":"racks","lastName":"jacson","phoneNumber":"123456","userId":2}
Ниже приведен мой исходный код
JSON в Avro
val df = spark.read.option("multiLine", true).json("src\main\resources\user.json")
df.printSchema()
df.show()
//convert to avro
df.write.mode("append").format("com.databricks.spark.avro").save("src\main\resources\user1")
AVRO в JSON
val jsonDF = spark.read
.format("com.databricks.spark.avro").load("src\main\resources\user")
jsonDF.show()
jsonDF.printSchema()
jsonDF.write.mode(SaveMode.Overwrite).json("src\main\resources\output\json")
Не могли бы вы помочь
Комментарии:
1. в чем ваша проблема?
2. Если вы посмотрите на входной файл, это список с несколькими объектами. В выходном файле я получаю только один объект, а не список.
3. во время записи вам необходимо преобразовать данные в массив и записать.
Ответ №1:
Проверьте приведенный ниже код.
Входные данные
scala> import sys.process._
scala> "cat /root/spark-examples/data.json".!
[
{
"userId": 1,
"firstName": "Krish",
"lastName": "Lee",
"phoneNumber": "123456",
"emailAddress": "krish.lee@abc.com"
},
{
"userId": 2,
"firstName": "racks",
"lastName": "jacson",
"phoneNumber": "123456",
"emailAddress": "racks.jacson@abc.com"
}
]
Загрузка содержимого файла json в DataFrame
scala> val df = spark
.read
.option("multiline","true")
.json("/root/spark-examples/data.json")
df: org.apache.spark.sql.DataFrame = [emailAddress: string, firstName: string ... 3 more fields]
Как только файл json будет загружен в DataFrame, он будет преобразован в array of object
в multiple objects or rows
, как показано ниже.
scala> df.show(false)
-------------------- --------- -------- ----------- ------
|emailAddress |firstName|lastName|phoneNumber|userId|
-------------------- --------- -------- ----------- ------
|krish.lee@abc.com |Krish |Lee |123456 |1 |
|racks.jacson@abc.com|racks |jacson |123456 |2 |
-------------------- --------- -------- ----------- ------
Когда вы записываете DataFrame
ответ, он записывает его в несколько строк.
scala> df.repartition(1).write.mode("overwrite").json("/tmp/dataa/")
scala> "ls -ltr /tmp/dataa/".!
total 4
-rw-r--r-- 1 root root 222 Oct 22 12:19 part-00000-fa9e79f6-2689-4385-b3ee-fd19cf291a31-c000.json
-rw-r--r-- 1 root root 0 Oct 22 12:19 _SUCCESS
scala> "cat /tmp/dataa/part-00000-fa9e79f6-2689-4385-b3ee-fd19cf291a31-c000.json".!
{"emailAddress":"krish.lee@abc.com","firstName":"Krish","lastName":"Lee","phoneNumber":"123456","userId":1}
{"emailAddress":"racks.jacson@abc.com","firstName":"racks","lastName":"jacson","phoneNumber":"123456","userId":2}
Если вы хотите получить то же, что и ваши входные данные, следуйте приведенному ниже коду.
scala> df
.select(to_json(collect_list(struct($"*"))).as("data"))
.write
.format("text") // You need to use text format, Using json will give you wrong data.
.mode("overwrite")
.save("/tmp/datab/")
scala> "ls -ltr /tmp/datab/".!
total 4
-rw-r--r-- 1 root root 224 Oct 22 12:19 part-00000-0896730e-51e1-4728-bd6b-cdfabc03978e-c000.txt
-rw-r--r-- 1 root root 0 Oct 22 12:19 _SUCCESS
scala> "cat /tmp/datab/part-00000-0896730e-51e1-4728-bd6b-cdfabc03978e-c000.txt".!
[
{"emailAddress":"krish.lee@abc.com","firstName":"Krish","lastName":"Lee","phoneNumber":"123456","userId":1},
{"emailAddress":"racks.jacson@abc.com","firstName":"racks","lastName":"jacson","phoneNumber":"123456","userId":2}
]