#pyspark
#pyspark
Вопрос:
Мне нужно выполнить цикл по файлу json, сгладить результаты и добавить столбец в фрейм данных в каждом цикле с соответствующими значениями. Но конечный результат будет содержать около ~ 2000 столбцов. Итак, использование withColumn для добавления столбцов выполняется чрезвычайно медленно. Есть ли у них какая-либо другая альтернатива для добавления столбцов в dataframe?
Пример ввода json:
[
{
"ID": "12345",
"Timestamp": "20140101",
"Usefulness": "Yes",
"Code": [
{
"event1": "A",
"result": "1"
}
]
},
{
"ID": "1A35B",
"Timestamp": "20140102",
"Usefulness": "No",
"Code": [
{
"event1": "B",
"result": "1"
}
]
}
]
Мой вывод должен быть:
ID Timestamp Usefulness Code_event1 Code_result
12345 20140101 Yes A 1
1A35B 20140102 No B 1
Файл json, над которым я работаю, огромен и состоит из множества столбцов. Итак, withColumn в моем случае невыполним.
Редактировать:
Пример кода:
# Data file
df_data = spark.read.json(file_path)
# Schema file
with open(schemapath) as fh:
jsonschema = json.load(fh,object_pairs_hook=OrderedDict)
Я просматриваю файл схемы и в цикле получаю доступ к данным для определенного ключа из data DF (df_data). Я делаю это, потому что мой файл данных содержит несколько записей, поэтому я не могу перебирать json-файл данных, иначе он будет перебирать каждую запись.
def func_structs(json_file):
for index,(k,v) in enumerate(json_file.items()):
if isinstance(v, dict):
srccol = k
func_structs(v)
elif isinstance(v, list):
srccol = k
func_lists(v) # Separate function to loop through list elements to find nested elements
else:
try:
df_data = df_data.withColumn(srcColName,df_Data[srcCol])
except:
df_data = df_data.withColumn(srcColName,lit(None).cast(StringType()))
func_structs(jsonschema)
Я добавляю столбцы в сам data DF (df_data).
Комментарии:
1. Я вижу, что из вашего ожидаемого результата вам нужно прямое преобразование JSON -> dataframe. Дополнительные столбцы не добавлены. Было бы здорово, если бы вы могли опубликовать код, который вы пробовали до сих пор, для этого. Это было бы отличным началом для solvers.Fr
2. Привет @Surabhi так что, на самом деле ваш фрейм данных будет масштабироваться горизонтально? Я имею в виду, есть ли какая-то конкретная причина, по которой вам нужна эта схема, а не что-то подобное
(id, timestamp, usefulness, code_event_id, code_result)
Ответ №1:
Один из способов — использовать встроенный json
анализатор Spark для чтения json в DF:
df = (sqlContext
.read
.option("multiLine", True)
.option("mode", "PERMISSIVE")
.json('file:///mypath/file.json')) # change as necessary
Результат выглядит следующим образом:
-------- ----- --------- ----------
| Code| ID|Timestamp|Usefulness|
-------- ----- --------- ----------
|[[A, 1]]|12345| 20140101| Yes|
|[[B, 1]]|1A35B| 20140102| No|
-------- ----- --------- ----------
Затем вторым шагом является выделение структуры внутри Code
столбца:
df = df.withColumn('Code_event1', f.col('Code').getItem(0).getItem('event1'))
df = df.withColumn('Code_result', f.col('Code').getItem(0).getItem('result'))
df.show()
что дает
-------- ----- --------- ---------- ----------- -----------
| Code| ID|Timestamp|Usefulness|Code_event1|Code_result|
-------- ----- --------- ---------- ----------- -----------
|[[A, 1]]|12345| 20140101| Yes| A| 1|
|[[B, 1]]|1A35B| 20140102| No| B| 1|
-------- ----- --------- ---------- ----------- -----------
Редактировать:
Основываясь на комментарии ниже от @pault, вот более аккуратный способ получения требуемых значений (запустите этот код после инструкции load):
df = df.withColumn('Code', f.explode('Code'))
df.select("*", "Code.*")
Комментарии:
1. как насчет
df.select("*", "Code.*")
вместоgetItem
s для второго шага?2. хм, я получаю следующую ошибку, когда пытаюсь использовать этот код:
Can only star expand struct data types. Attribute:
ArrayBuffer (код);
3. может быть, мне нужно сначала разобрать код, а затем использовать ваш код?