Pyspark разнесет вложенный список

#apache-spark #pyspark

#apache-spark #pyspark

Вопрос:

У меня есть следующий фрейм данных, и я хотел бы разнести столбец значений, чтобы каждое значение находилось в отдельном столбце:

 id | values
-----------------------
1  | '[[532,969020406,89],[216,969100125,23],[169,39356140000,72],[399,14407358500,188],[377,13761937166.6667,24]]'
2 | '[[532,969020406,89]]'
  

Обратите внимание, что списки в столбце значений могут иметь разную длину и что они имеют строковый тип данных.

Желаемая таблица должна выглядеть следующим образом:

 id | v11 | v12 | v13 | v21 | v22... 
--------------------------------------
1  | 532 | 969020406 | 89 | 216 | 969100125...
2 | 532 | 969020406 | 89 | Null | Null...
  

Я попытался указать схему и использовать метод from_json для создания массива, а затем разнесите его, но я столкнулся с проблемами, а именно, любая из схем, похоже, не вписывается в мои данные

 json_schema =  types.StructType([types.StructField('array', types.StructType([ 
    types.StructField("v1",types.StringType(),True), 
    types.StructField("v2",types.StringType(),True), 
    types.StructField("v3",types.StringType(),True)
  ]))])

json_schema = types.ArrayType(types.StructType([ 
    types.StructField("v1",types.StringType(),True), 
    types.StructField("v2",types.StringType(),True), 
    types.StructField("v3",types.StringType(),True)
  ]))

json_schema = types.ArrayType(types.ArrayType(types.IntegerType()))

df.select('id', F.from_json('values', schema=json_schema)).show()
  

Процедура возвращает только нулевое значение или пустой массив: [,,]

Я также получил следующую ошибку: StructType не может принять объект ‘[‘ в типе <class ‘str’>

Схема входных данных, выведенных Pyspark:

 root
 |-- id: integer (nullable = true)
 |-- values: string (nullable = true)
  

Любая помощь будет оценена.

Комментарии:

1. Не могли бы вы добавить схему входных данных, например, выходные df.printSchema() данные для ваших исходных данных?

Ответ №1:

Для Spark 2.4 вы можете использовать комбинацию split и transform для преобразования строки в двумерный массив. Отдельные элементы этого массива затем могут быть отдельно преобразованы в столбцы.

 from pyspark.sql import functions as F

df2 = df.withColumn("parsed_values", F.expr("transform(split(values, '\\],\\['), "  
           "c ->  transform(split(c, ','), d->regexp_replace(d,'[\\[\\]]','')))"))
    .withColumn("length", F.size("parsed_values"))

max_length = df2.agg(F.max("length")).head()["max(length)"]
  

df2 теперь имеет структуру

 root
 |-- id: string (nullable = true)
 |-- values: string (nullable = true)
 |-- parsed_values: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- length: integer (nullable = false)
  

и max_length содержит максимальное количество записей в одной строке (5 для примера данных).

parsed_value[0][1] вернет вторую вложенную запись первой записи. Это будет 969020406 для примера данных.

Второй шаг — преобразовать вложенный массив в столбцы.

 cols = [F.col('parsed_values').getItem(x).getItem(y).alias("v{}{}".format(x 1,y 1)) 
    for x in range(0, max_length) for y in range(0,3)]

df2.select([F.col('id')]   cols).show()
  

Вывод:

  --- --- --------- --- ---- --------- ---- ---- ----------- ---- ---- ----------- ---- ---- ---------------- ---- 
| id|v11|      v12|v13| v21|      v22| v23| v31|        v32| v33| v41|        v42| v43| v51|             v52| v53|
 --- --- --------- --- ---- --------- ---- ---- ----------- ---- ---- ----------- ---- ---- ---------------- ---- 
|  1|532|969020406| 89| 216|969100125|  23| 169|39356140000|  72| 399|14407358500| 188| 377|13761937166.6667|  24|
|  2|532|969020406| 89|null|     null|null|null|       null|null|null|       null|null|null|            null|null|
 --- --- --------- --- ---- --------- ---- ---- ----------- ---- ---- ----------- ---- ---- ---------------- ---- 
  

Решение можно было бы улучшить, если бы существовал способ определения max_length без необходимости нахождения максимума по полным данным, например, если это значение было известно заранее.