Преобразование нескольких столбцов списка в столбец массива json в фрейме данных в pyspark

#json #apache-spark #pyspark #apache-spark-sql

#json #apache-искра #пайспарк #apache-spark-sql

Вопрос:

У меня есть фрейм данных, он содержит несколько столбцов списка и преобразует столбец массива JSON.

используется приведенная ниже логика, но не работает какая-либо идея?

 def test(test1,test2):
    d = {'data': [{'marks': a, 'grades': t} for a, t in zip(test1, test2)]}
    return d
 

UDF определен как тип массива, как показано ниже, и попытался вызвать его с помощью столбца, но не получается ли у него какая-либо идея?

 arrayToMapUDF = udf(test ,ArrayType(StringType()))

df.withcolumn("jsonarraycolumn", arrayToMapUDF(col("col"),col("col2")))
 
метки оценки
[100, 150, 200, 300, 400] [0.01, 0.02, 0.03, 0.04, 0.05]

необходимо преобразовать, как показано ниже.

метки оценки Json-массив-столбец
[100, 150, 200, 300, 400] [0.01, 0.02, 0.03, 0.04, 0.05] {атрибут:[{отметок: 1000,
оценки: 0,01},
{отметок: 15000,
класс: 0,02},
{отметок: 2000,
оценки: 0,03}
]}

Ответ №1:

Вы можете использовать StringType , потому что он возвращает строку JSON, а не массив строк. Вы также можете использовать json.dumps для преобразования словаря в строку JSON.

 import pyspark.sql.functions as F
from pyspark.sql.types import StringType
import json

def test(test1,test2):
    d = [{'amount': a, 'discount': t} for a, t in zip(test1, test2)]
    return json.dumps(d)

arrayToMapUDF = F.udf(test, StringType())

df2 = df.withColumn("jsonarraycolumn", arrayToMapUDF(F.col("amount"), F.col("discount")))

df2.show(truncate=False)
 ------------------------------- ------------------------------ ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 
|amount                         |discount                      |jsonarraycolumn                                                                                                                                                                      |
 ------------------------------- ------------------------------ ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 
|[1000, 15000, 2000, 3000, 4000]|[0.01, 0.02, 0.03, 0.04, 0.05]|[{"amount": 1000, "discount": 0.01}, {"amount": 15000, "discount": 0.02}, {"amount": 2000, "discount": 0.03}, {"amount": 3000, "discount": 0.04}, {"amount": 4000, "discount": 0.05}]|
 ------------------------------- ------------------------------ ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 
 

Если вам не нужны кавычки,

 import pyspark.sql.functions as F
from pyspark.sql.types import StringType
import json

def test(test1,test2):
    d = [{'amount': a, 'discount': t} for a, t in zip(test1, test2)]
    return json.dumps(d).replace('"', '')

arrayToMapUDF = F.udf(test, StringType())

df2 = df.withColumn("jsonarraycolumn", arrayToMapUDF(F.col("amount"), F.col("discount")))

df2.show(truncate=False)
 ------------------------------- ------------------------------ ----------------------------------------------------------------------------------------------------------------------------------------------------------------- 
|amount                         |discount                      |jsonarraycolumn                                                                                                                                                  |
 ------------------------------- ------------------------------ ----------------------------------------------------------------------------------------------------------------------------------------------------------------- 
|[1000, 15000, 2000, 3000, 4000]|[0.01, 0.02, 0.03, 0.04, 0.05]|[{amount: 1000, discount: 0.01}, {amount: 15000, discount: 0.02}, {amount: 2000, discount: 0.03}, {amount: 3000, discount: 0.04}, {amount: 4000, discount: 0.05}]|
 ------------------------------- ------------------------------ ----------------------------------------------------------------------------------------------------------------------------------------------------------------- 
 

Если вам нужен столбец реального типа JSON:

 def test(test1,test2):
    d = [{'amount': a, 'discount': t} for a, t in zip(test1, test2)]
    return d

arrayToMapUDF = F.udf(test, 
    ArrayType(
        StructType([
            StructField('amount', StringType()), 
            StructField('discount', StringType())
        ])
    )
)

df2 = df.withColumn("jsonarraycolumn", arrayToMapUDF(F.col("amount"), F.col("discount")))

df2.show(truncate=False)
 ------------------------------- ------------------------------ ----------------------------------------------------------------------- 
|amount                         |discount                      |jsonarraycolumn                                                        |
 ------------------------------- ------------------------------ ----------------------------------------------------------------------- 
|[1000, 15000, 2000, 3000, 4000]|[0.01, 0.02, 0.03, 0.04, 0.05]|[[1000, 0.01], [15000, 0.02], [2000, 0.03], [3000, 0.04], [4000, 0.05]]|
 ------------------------------- ------------------------------ ----------------------------------------------------------------------- 

df2.printSchema()
root
 |-- amount: array (nullable = false)
 |    |-- element: integer (containsNull = false)
 |-- discount: array (nullable = false)
 |    |-- element: double (containsNull = false)
 |-- jsonarraycolumn: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- amount: string (nullable = true)
 |    |    |-- discount: string (nullable = true)
 

Комментарии:

1. Возможно, я вас смутил, но данное решение хорошее. но когда я записываю обратно в базу данных

2. Возможно, я вас смутил, но данное решение хорошее. но когда я записываю обратно в базу данных как «атрибуты»: «{«атрибуты»: [{«сумма»: «8000000», » discount»: «0.01»} добавление дополнительного символа обратного наклона и имя атрибутов anslo дублируется (посколькумы упоминали как json), поэтому я хочу сохранить данные в базе данных. Я пытался сделать это, но безуспешно. def zip(xs, ys): возвращает [{‘amount’: a, ‘discount’: t} для a, t zip(xs, ys)] arrayToMapUDF = udf(zip ,(StructType([StructField(‘marks’, IntegerType()),StructField(‘marks1’, Десятичный тип())])))

3. можем ли мы без использования dump JSON создать массив с парами ключ-значение?

4. @mike def zip(xs, ys): возвращает [{‘amount’: a, ‘discount’: t} для a, t zip(xs, ys)] arrayToMapUDF = udf(zip ,(StructType([StructField(‘метки’, IntegerType()),StructField(‘marks1’, DecimalType())])))

5. jsoncolumn [{«сумма»: 1000, «скидка»: 0.01}, {«сумма»: 15000, «скидка»: 0.02}, {«сумма»: 2000, «скидка»: 0.03}, {«сумма»: 3000, «скидка»: 0.04}, {«сумма»: 4000, «скидка»: 0.05}]

Ответ №2:

Чтобы избежать использования функций udf, вы можете использовать функции высокого порядка:

 import pyspark.sql.functions as f

transform_expr = "TRANSFORM(arrays_zip(amount, discount), value -> value)"
df = df.withColumn('jsonarraycolumn', f.to_json(f.expr(transform_expr)))

df.show(truncate=False)
 

Вывод:

  ------------------------------- ------------------------------ ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 
|amount                         |discount                      |jsonarraycolumn                                                                                                                                                             |
 ------------------------------- ------------------------------ ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 
|[1000, 15000, 2000, 3000, 4000]|[0.01, 0.02, 0.03, 0.04, 0.05]|[{"amount":1000.0,"discount":0.01},{"amount":15000.0,"discount":0.02},{"amount":2000.0,"discount":0.03},{"amount":3000.0,"discount":0.04},{"amount":4000.0,"discount":0.05}]|
 ------------------------------- ------------------------------ ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 
 

Комментарии:

1. когда я запускаю код, я получаю сообщение об ошибке. Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.expr. : org.apache.spark.sql.catalyst.parser.ParseException: extraneous input '->' expecting {')', ','}(line 1, pos 43)

2. Какая у вас версия spark?

3. « Spark 2.4.5 «

4. @john проверьте мое редактирование, я обнаружил, как работать с версией spark 2.4.5

5. @ Kafels « Ошибка типа: объект ‘str’ не вызывается ————————————————————————— Обратная трассировка ошибки типа (последний последний вызов) <command-3566074724935406> в <модуле> 12 .С помощью столбца («id», col(«SupplierName»)) —>14 volumediscountDf = volumediscountDf.withColumn(‘jsonarraycolumn’, to_json(выражение (выражение))) Ошибка типа: объект ‘str’ не вызывается «