Как добавить столбцы в фрейм данных без использования withcolumn

#pyspark

#pyspark

Вопрос:

Мне нужно выполнить цикл по файлу json, сгладить результаты и добавить столбец в фрейм данных в каждом цикле с соответствующими значениями. Но конечный результат будет содержать около ~ 2000 столбцов. Итак, использование withColumn для добавления столбцов выполняется чрезвычайно медленно. Есть ли у них какая-либо другая альтернатива для добавления столбцов в dataframe?

Пример ввода json:

 [
  {
    "ID": "12345",
    "Timestamp": "20140101",
    "Usefulness": "Yes",
    "Code": [
      {
        "event1": "A",
        "result": "1"
      }
    ]
  },
  {
    "ID": "1A35B",
    "Timestamp": "20140102",
    "Usefulness": "No",
    "Code": [
      {
        "event1": "B",
        "result": "1"
      }
    ]
  }
]
  

Мой вывод должен быть:

 
ID     Timestamp  Usefulness  Code_event1  Code_result

12345  20140101   Yes          A           1
1A35B  20140102   No           B           1 
  

Файл json, над которым я работаю, огромен и состоит из множества столбцов. Итак, withColumn в моем случае невыполним.

Редактировать:

Пример кода:

 # Data file
df_data = spark.read.json(file_path)  

# Schema file
with open(schemapath) as fh:
    jsonschema = json.load(fh,object_pairs_hook=OrderedDict)
  

Я просматриваю файл схемы и в цикле получаю доступ к данным для определенного ключа из data DF (df_data). Я делаю это, потому что мой файл данных содержит несколько записей, поэтому я не могу перебирать json-файл данных, иначе он будет перебирать каждую запись.

 def func_structs(json_file):
    for index,(k,v) in enumerate(json_file.items()):
        if isinstance(v, dict):
           srccol = k
           func_structs(v)
        elif isinstance(v, list):
           srccol = k
           func_lists(v) # Separate function to loop through list elements to find nested elements
        else:
            try:
                df_data = df_data.withColumn(srcColName,df_Data[srcCol])
            except:
                df_data = df_data.withColumn(srcColName,lit(None).cast(StringType()))

func_structs(jsonschema)
  

Я добавляю столбцы в сам data DF (df_data).

Комментарии:

1. Я вижу, что из вашего ожидаемого результата вам нужно прямое преобразование JSON -> dataframe. Дополнительные столбцы не добавлены. Было бы здорово, если бы вы могли опубликовать код, который вы пробовали до сих пор, для этого. Это было бы отличным началом для solvers.Fr

2. Привет @Surabhi так что, на самом деле ваш фрейм данных будет масштабироваться горизонтально? Я имею в виду, есть ли какая-то конкретная причина, по которой вам нужна эта схема, а не что-то подобное (id, timestamp, usefulness, code_event_id, code_result)

Ответ №1:

Один из способов — использовать встроенный json анализатор Spark для чтения json в DF:

 df = (sqlContext
      .read
      .option("multiLine", True)
      .option("mode", "PERMISSIVE")
      .json('file:///mypath/file.json')) # change as necessary
  

Результат выглядит следующим образом:

  -------- ----- --------- ---------- 
|    Code|   ID|Timestamp|Usefulness|
 -------- ----- --------- ---------- 
|[[A, 1]]|12345| 20140101|       Yes|
|[[B, 1]]|1A35B| 20140102|        No|
 -------- ----- --------- ---------- 
  

Затем вторым шагом является выделение структуры внутри Code столбца:

 df = df.withColumn('Code_event1', f.col('Code').getItem(0).getItem('event1'))
df = df.withColumn('Code_result', f.col('Code').getItem(0).getItem('result'))
df.show()
  

что дает

  -------- ----- --------- ---------- ----------- ----------- 
|    Code|   ID|Timestamp|Usefulness|Code_event1|Code_result|
 -------- ----- --------- ---------- ----------- ----------- 
|[[A, 1]]|12345| 20140101|       Yes|          A|          1|
|[[B, 1]]|1A35B| 20140102|        No|          B|          1|
 -------- ----- --------- ---------- ----------- ----------- 
  

Редактировать:

Основываясь на комментарии ниже от @pault, вот более аккуратный способ получения требуемых значений (запустите этот код после инструкции load):

 df = df.withColumn('Code', f.explode('Code'))
df.select("*", "Code.*")
  

Комментарии:

1. как насчет df.select("*", "Code.*") вместо getItem s для второго шага?

2. хм, я получаю следующую ошибку, когда пытаюсь использовать этот код: Can only star expand struct data types. Attribute: ArrayBuffer (код) ;

3. может быть, мне нужно сначала разобрать код, а затем использовать ваш код?