Обновление структурированных значений столбца типа карты в Pyspark

#python #dataframe #apache-spark #pyspark #apache-spark-sql

#python #фрейм данных #apache-spark #pyspark #apache-spark-sql

Вопрос:

Я пытаюсь изменить столбец Dataframe типа Map, значения которого, в свою очередь, имеют тип Struct:

 root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- MapName: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- field1: double (nullable = true)
 |    |    |-- field2: double (nullable = true)
 

Я создал фрейм данных таким образом, чтобы столбец MapName инициализировался как None, поскольку мне нужно будет заполнить информацию на более поздних этапах. Код, который я использовал, выглядит следующим образом:

 from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, udf, lit
from pyspark.sql.types import StructType, StringType, LongType, DoubleType, MapType

spark = SparkSession.builder.appName("testMapType").getOrCreate()

data = [('Alice', 1)]
df = spark.createDataFrame(data, ['name','age'])

def value_struct():
    schema = StructType([
        StructField('field1', DoubleType(), True),
        StructField('field2', DoubleType(), True)])

    return schema

myschema = StructType([
        StructField('name', StringType(), True),
        StructField('age', LongType(), True),
        StructField('MapName', MapType(StringType(), value_struct()), True)])

df = df.withColumn('MapName',lit(None).cast(MapType(StringType(), value_struct())))
df = spark.createDataFrame(df.rdd, schema=myschema)
df.printSchema()
 

Теперь, когда я пытаюсь обновить столбец MapName, используя udf, подобный этому:

 my_update_udf = udf(lambda x: {**x, **{'a_map_key':{'field1':3.2,'field2':2.6}}}, MapType(StringType(), value_struct()))
df2 = df.withColumn('MapName', my_update_udf(col('MapName')))
df2.collect()
 

Я получаю следующую ошибку:

 TypeError: 'NoneType' object is not a mapping
 

Я не могу найти никаких предложений из прошлых запросов, любая помощь здесь?

Ответ №1:

Для этого вам не нужен UDF. Вы можете использовать соответствующие функции Spark SQL для создания карт и структур.

 import pyspark.sql.functions as F

new_map = F.create_map(
    F.lit('a_map_key'),
    F.struct(
        F.lit(3.2).alias('field1'),
        F.lit(2.6).alias('field2')
    )
)

df2 = df.withColumn(
    'MapName',
    F.when(
        F.col('MapName').isNotNull(),      # need to check null because 
        F.map_concat('MapName', new_map)   # map_concat doesn't work if map is null
    ).otherwise(new_map)
)

df2.show(truncate=False)
 ----- --- ------------------------- 
|name |age|MapName                  |
 ----- --- ------------------------- 
|Alice|1  |[a_map_key -> [3.2, 2.6]]|
 ----- --- -------------------------