#python #json #apache-spark #pyspark #apache-spark-sql
Вопрос:
Я пытаюсь создать структуру JSON из фрейма данных pyspark. У меня есть следующие столбцы в моем фрейме данных — batch_id, batch_run_id, имя таблицы, имя столбца, тип столбца, last_refresh_time, частота обновления, владелец
Я хочу, чтобы это было ниже структуры JSON —
{
"GeneralInfo": {
"DataSetID": "xxxx1234Abcsd",
"Owner" : ["test1@email.com", "test2@email.com", "test3@email.com"]
"Description": "",
"BuisnessFunction": "",
"Source": "",
"RefreshRate": "Weekly",
"LastUpdate": "2020/10/15",
"InfoSource": "TemplateInfo"
},
"Tables": [
{
"TableName": "Employee",
"Columns" : [
{ "ColumnName" : "EmployeeID",
"ColumnDataType": "int"
},
{ "ColumnName" : "EmployeeName",
"ColumnDataType": "string"
}
]
}
}
}
Я пытаюсь присвоить значения в строке JSON с помощью индексов столбцов фрейма данных, но это приводит к ошибке «Объект типа Столбец не сериализуется в формате JSON». Я использовал, как показано ниже —
{
"GeneralInfo": {
"DataSetID": df["batch_id"],
"Owner" : list(df["owner"])
"Description": "",
"BuisnessFunction": "",
"Source": "",
"RefreshRate": df["refresh_frequency"],
"LastUpdate": df["last_update_time"],
"InfoSource": "TemplateInfo"
},
"Tables": [
{
"TableName": df["table_name"],
"Columns" : [
{ "ColumnName" : df["table_name"]["column_name"],
"ColumnDataType": df["table_name"]["column_datatype"]
}
]
}
}
}
Пожалуйста, помогите мне в этом, я недавно начал кодировать в Pyspark.
Комментарии:
1. Не могли бы вы, пожалуйста, вставить образец кадра данных?
2. @MohanaBC Я обновил свой пост, пожалуйста, проверьте. Спасибо
3. @MohanaBC — Ты проверял?
4. Да, но не получил ни одного элемента JSON, как вы ожидали.
Ответ №1:
Попытался получить формат JSON из предоставленных вами образцов данных, формат вывода не соответствует точно так, как вы ожидали. Вы можете импровизировать приведенный ниже код дальше.
Мы можем использовать функцию toJSON для преобразования фрейма данных в формат JSON. Перед вызовом функции toJSON нам нужно использовать функции array(), struct, передавая необходимые столбцы в соответствии с форматом JSON по мере необходимости.
from pyspark.sql import *
from pyspark.sql.functions import *
spark = SparkSession.builder.master('local[*]').getOrCreate()
in_values = [
(123, '123abc', 'Employee', 'Employee_id', 'int', '21/05/15', 'Weekly',
['test1@gmail.com', 'test1@gmail.com', 'test3@gmail.com']),
(123, '123abc', 'Employee', 'Employee_name', 'string', '21/05/15', 'Weekly',
['test1@gmail.com', 'test1@gmail.com', 'test3@gmail.com'])
]
cols = ["batch_id", "batch_run_id", "table_name", "column_name", "column_datatype",
"last_update_time", "refresh_frequency", "Owner"]
df = spark.createDataFrame(in_values).toDF(*cols)
.selectExpr("*","'' Description", "'' BusinessFunction", "'TemplateInfo' InfoSource", "'' Source")
list1 = [df["batch_id"].alias("DataSetID"), df["Owner"], df["refresh_frequency"].alias("RefreshRate"),
df["last_update_time"].alias("LastUpdate"), "Description", "BusinessFunction","InfoSource", "Source"]
list2 = [df["table_name"].alias("TableName"),df["column_name"].alias("ColumnName"),
df["column_datatype"].alias("ColumnDataType")]
df.groupBy("batch_id")
.agg(collect_set(struct(*list1))[0].alias("GeneralInfo"),
collect_list(struct(*list2)).alias("Tables")).drop("batch_id")
.toJSON().foreach(print)
# outputs JSON --->
'''
{
"GeneralInfo":{
"DataSetID":123,
"Owner":[
"test1@gmail.com",
"test1@gmail.com",
"test3@gmail.com"
],
"RefreshRate":"Weekly",
"LastUpdate":"21/05/15",
"Description":"",
"BusinessFunction":"",
"InfoSource":"TemplateInfo",
"Source":""
},
"Tables":[
{
"TableName":"Employee",
"ColumnName":"Employee_id",
"ColumnDataType":"int"
},
{
"TableName":"Employee",
"ColumnName":"Employee_name",
"ColumnDataType":"string"
}
]
}
'''
Комментарии:
1. Спасибо, что поделился этим. Я вижу, что вы жестко закодировали имена столбцов для создания списков. Какой-нибудь динамический способ справиться с этим, а не жесткое кодирование?
2. Кроме того,
{ "GeneralInfo": { "DataSetID": "xxxx1234Abcsd", "Owner" : ["test1@email.com", "test2@email.com", "test3@email.com"] "Description": "", "BuisnessFunction": "", "Source": "", "RefreshRate": "Weekly", "LastUpdate": "2020/10/15", "InfoSource": "TemplateInfo" }
было бы под {} не []3. Мы можем динамически генерировать список с помощью генераторов, нам также необходимо рассмотреть вопрос о переименовании столбцов, так как ваш формат json имеет разные имена.
4. из [], т. е. массива, в {}, т. е. структуру, мы можем это изменить. Дайте мне знать, согласны ли вы со второй частью «Таблицы»: [….] ?
5. обновил код, чтобы получить только структуру. Требуется ли переименование столбца типа table_name в TableName? или данные будут иметь имя столбца в качестве имени таблицы?