Разбор столбца строки PySpark словарей списка ключей на основе отдельного столбца массива ключей

#python #dataframe #apache-spark #pyspark

Вопрос:

В настоящее время я застрял, пытаясь извлечь значения в столбце «Значение ключа» ниже на основе упорядоченного массива ключей в столбце «Ключи» ниже.

 >>> df.select('reference_nbr', 'keyValue', 'Keys').show()
 -------------- ------------------------------------------------------------- --------------------- 
|    ref_number|                                                     keyValue|                 Keys|
 -------------- ------------------------------------------------------------- --------------------- 
|          AZQ5|{key39=[TBAX3, TBAX6, TBAXN], key46=[TBARO, TBAZ4, TBABN],...|[key1, key2, key3,...|
|          NXY3|{key39=[TBAX8, TBAX2, TBAXZ], key46=[TBARD, TBAZK, TBAX9],...|[key1, key2, key3,...|
|          QSW6|{key39=[TBAX5, TBAX3, TBAX8], key46=[TBARB, TBAZN, TBAX4],...|[key1, key2, key3,...|
|          LJB7|{key39=[TBAX3, TBAXN, TBAXL], key46=[TBARM, TBAZ2, TBAX3],...|[key1, key2, key3,...|
|          MKH9|{key39=[TBAX4, TBAX9, TBAXV], key46=[TBARB, TBAZB, TBAX1],...|[key1, key2, key3,...|
|          UFG1|{key39=[TBAX3, TBAX6, TBAXQ], key46=[TBARL, TBAZB, TBAX0],...|[key1, key2, key3,...|
|          WDE4|{key39=[TBAX6, TBAX7, TBAX9], key46=[TBARX, TBAX6, TBAX8],...|[key1, key2, key3,...|
|          VRX8|{key39=[TBAX3, TBAX1, TBAX0], key46=[TBARQ, TBAX9, TBAX3],...|[key1, key2, key3,...|
|          CIZ2|{key39=[TBAX3, TBAXC, TBAX2], key46=[TBARA, TBAXQ, TBAX1],...|[key1, key2, key3,...|
|          BEO3|{key39=[TBAX9, TBAXQ, TBAX4], key46=[TBARP, TBAXV, TBAX2],...|[key1, key2, key3,...|
 -------------- ------------------------------------------------------------- --------------------- 
only showing top 20 rows
 

Если я применю описанные ниже шаги UDF и withColumn (), я могу легко запросить столбец «Значение ключа» на основе определенного ключа и вставить массив значений ключа в новый столбец.

 getKey4 = udf(lambda ar1: ar1.get('key4'))
df = df.withColumn("key4Values", getKey4(df["keyValue"]))
 

Я пытаюсь выполнить те же действия, что и выше, но для каждого ключа в порядке столбца «Ключи». Желаемый результат:

  -------------- ------------------------------------------------------------- --------------------- ------------------------------------------------ 
|    ref_number|                                                     keyValue|                 Keys|                                          Values|
 -------------- ------------------------------------------------------------- --------------------- ------------------------------------------------ 
|          AZQ5|{key39=[TBAX3, TBAX6, TBAXN], key46=[TBARO, TBAZ4, TBABN],...|[key1, key2, key3,...|[[TBAX4, TBAXQ, TBAXD],[TBAR1, TBAZA, TBABW],...|
|          NXY3|{key39=[TBAX8, TBAX2, TBAXZ], key46=[TBARD, TBAZK, TBAX9],...|[key1, key2, key3,...|[[TBAX5, TBAXA, TBAXC],[TBAR2, TBAZS, TBABE],...|
|          QSW6|{key39=[TBAX5, TBAX3, TBAX8], key46=[TBARB, TBAZN, TBAX4],...|[key1, key2, key3,...|[[TBAX6, TBAXZ, TBAXF],[TBAR3, TBAZD, TBABR],...|
|          LJB7|{key39=[TBAX3, TBAXN, TBAXL], key46=[TBARM, TBAZ2, TBAX3],...|[key1, key2, key3,...|[[TBAX7, TBAXC, TBAXG],[TBAR4, TBAZF, TBABT],...|
|          MKH9|{key39=[TBAX4, TBAX9, TBAXV], key46=[TBARB, TBAZB, TBAX1],...|[key1, key2, key3,...|[[TBAX8, TBAXV, TBAXH],[TBAR5, TBAZG, TBABY],...|
|          UFG1|{key39=[TBAX3, TBAX6, TBAXQ], key46=[TBARL, TBAZB, TBAX0],...|[key1, key2, key3,...|[[TBAX9, TBAXB, TBAXJ],[TBAR6, TBAZH, TBABU],...|
|          WDE4|{key39=[TBAX6, TBAX7, TBAX9], key46=[TBARX, TBAX6, TBAX8],...|[key1, key2, key3,...|[[TBAX0, TBAXN, TBAXK],[TBAR7, TBAZJ, TBABI],...|
|          VRX8|{key39=[TBAX3, TBAX1, TBAX0], key46=[TBARQ, TBAX9, TBAX3],...|[key1, key2, key3,...|[[TBAX2, TBAXM, TBAXL],[TBAR8, TBAZK, TBABO],...|
|          CIZ2|{key39=[TBAX3, TBAXC, TBAX2], key46=[TBARA, TBAXQ, TBAX1],...|[key1, key2, key3,...|[[TBAX3, TBAXA, TBAXO],[TBAR9, TBAZL, TBABP],...|
|          BEO3|{key39=[TBAX9, TBAXQ, TBAX4], key46=[TBARP, TBAXV, TBAX2],...|[key1, key2, key3,...|[[TBAX1, TBAXS, TBAXI],[TBAR0, TBAZQ, TBABZ],...|
 -------------- ------------------------------------------------------------- --------------------- ------------------------------------------------ 
 

Я попробовал метод ниже, но я получаю следующую ошибку:

 getKeyValues = udf(lambda ar1, ar2: {ar1.get(x) for x in ar2})
df.withColumn("Values", getKeyValues(df["keyValue"], df["Keys"])).show()

AttributeError: 'unicode' object has no attribute 'get'
 

Я попробовал другой UDF ниже, который также выдает ту же ошибку в юникоде:

 def getKeyVals(ar1, ar2): 
    arr = []
    for x in ar2: 
        arr.append(ar1.get(x, None))
    return arr 
    
udf_split = udf(split, ArrayType(StringType()))

df.withColumn("test", udf_getKeyVals(df['keyValue'], df['Keys'])).show()
 

Я также пробовал следующую функцию ниже, но получаю аналогичные ошибки.
Эта функция взята со следующей веб-страницы MungingData

 def working_fun(mapping):
    def f(ar1):
        for x in ar1:
            return mapping.get(x)
    return F.udf(f)

df.withColumn("test", working_fun(df["keyValue"])(F.col('Keys'))).show()
 

Буду признателен за любые советы или рекомендации-спасибо!

Обновлено, чтобы включить сведения о схеме и версии Spark ниже

 >>> df.select('reference_nbr', 'keyValue', 'Keys').schema.simpleString()
'struct<reference_nbr:string,keyValue:string,Keys:array<string>>'

>>> spark.version
u'2.3.2.3.1.5.6030-1'
 

Комментарии:

1. Пожалуйста, скопируйте эту команду df.select('reference_nbr', 'keyValue', 'Keys').schema.simpleString() и вставьте выходные данные в свой вопрос. Кроме того, какова ваша версия spark?

2. Ценю быстрый ответ @Kafels. Я обновил исходную запись, включив в нее информацию о схеме и версии Spark. Спасибо

3. Если вы не возражаете, не могли бы вы поделиться своими данными, поместив вместо этого вывод этой команды df.select('reference_nbr', 'keyValue', 'Keys').limit(10).collect() ?

4. Привет @Kafels извини, что вернулся поздно. К сожалению, столбцы «Значение ключа» и «Ключи» очень большие, и я не думаю, что смогу опубликовать их в переполнении стека. Пожалуйста, дайте мне знать, если я могу поделиться какой-либо другой информацией, еще раз спасибо

Ответ №1:

Если мы преобразуем столбец keyvalue из строкового типа в тип карты, мы можем использовать функцию map_values для извлечения значений:

Я использовал UDF для замены = в ключевых значениях на:, чтобы мы могли изменить тип на карту с помощью модуля ast.

 from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.master('local[*]').getOrCreate()


def get_dict(c):
    c = c.replace("=", ":")
    import ast
    dict_value = ast.literal_eval(c)
    return dict_value


get_dict_udf = udf(lambda c: get_dict(c), MapType(StringType(), ArrayType(IntegerType())))

# Sample dataframe
df = spark.createDataFrame(
    [('{"k3"= [6, 5, 4], "k1"= [4, 5, 1], "k8"= [8, 5, 6], "k5"= [7, 4, 3]}',
      ["k1", "k3", "k5", "k8"])]).toDF("keyvalue", "key")

df.withColumn("keyvalue", get_dict_udf("keyvalue")). 
    withColumn("values", sort_array(map_values("keyvalue"))).show(truncate=False)

 -------------------------------------------------------------------- ---------------- -------------------------------------------- 
|keyvalue                                                            |key             |values                                      |
 -------------------------------------------------------------------- ---------------- -------------------------------------------- 
|[k3 -> [6, 5, 4], k5 -> [7, 4, 3], k8 -> [8, 5, 6], k1 -> [4, 5, 1]]|[k1, k3, k5, k8]|[[4, 5, 1], [6, 5, 4], [7, 4, 3], [8, 5, 6]]|
 -------------------------------------------------------------------- ---------------- -------------------------------------------- 
 

Комментарии:

1. Спасибо за ответ, спасибо! К сожалению, я получаю следующую ошибку: AttributeError: объект «дикт» не имеет атрибута «заменить». Я предполагаю, что мне, возможно, придется использовать regex_replace() вместо этого?

2. Хорошо, вы можете использовать, но в вашем случае столбец keyvalue является строковым, поэтому здесь переменная » c «в udf будет иметь тип string и должна принимать метод замены на «c».