#python #dataframe #apache-spark #pyspark
Вопрос:
В настоящее время я застрял, пытаясь извлечь значения в столбце «Значение ключа» ниже на основе упорядоченного массива ключей в столбце «Ключи» ниже.
>>> df.select('reference_nbr', 'keyValue', 'Keys').show()
-------------- ------------------------------------------------------------- ---------------------
| ref_number| keyValue| Keys|
-------------- ------------------------------------------------------------- ---------------------
| AZQ5|{key39=[TBAX3, TBAX6, TBAXN], key46=[TBARO, TBAZ4, TBABN],...|[key1, key2, key3,...|
| NXY3|{key39=[TBAX8, TBAX2, TBAXZ], key46=[TBARD, TBAZK, TBAX9],...|[key1, key2, key3,...|
| QSW6|{key39=[TBAX5, TBAX3, TBAX8], key46=[TBARB, TBAZN, TBAX4],...|[key1, key2, key3,...|
| LJB7|{key39=[TBAX3, TBAXN, TBAXL], key46=[TBARM, TBAZ2, TBAX3],...|[key1, key2, key3,...|
| MKH9|{key39=[TBAX4, TBAX9, TBAXV], key46=[TBARB, TBAZB, TBAX1],...|[key1, key2, key3,...|
| UFG1|{key39=[TBAX3, TBAX6, TBAXQ], key46=[TBARL, TBAZB, TBAX0],...|[key1, key2, key3,...|
| WDE4|{key39=[TBAX6, TBAX7, TBAX9], key46=[TBARX, TBAX6, TBAX8],...|[key1, key2, key3,...|
| VRX8|{key39=[TBAX3, TBAX1, TBAX0], key46=[TBARQ, TBAX9, TBAX3],...|[key1, key2, key3,...|
| CIZ2|{key39=[TBAX3, TBAXC, TBAX2], key46=[TBARA, TBAXQ, TBAX1],...|[key1, key2, key3,...|
| BEO3|{key39=[TBAX9, TBAXQ, TBAX4], key46=[TBARP, TBAXV, TBAX2],...|[key1, key2, key3,...|
-------------- ------------------------------------------------------------- ---------------------
only showing top 20 rows
Если я применю описанные ниже шаги UDF и withColumn (), я могу легко запросить столбец «Значение ключа» на основе определенного ключа и вставить массив значений ключа в новый столбец.
getKey4 = udf(lambda ar1: ar1.get('key4'))
df = df.withColumn("key4Values", getKey4(df["keyValue"]))
Я пытаюсь выполнить те же действия, что и выше, но для каждого ключа в порядке столбца «Ключи». Желаемый результат:
-------------- ------------------------------------------------------------- --------------------- ------------------------------------------------
| ref_number| keyValue| Keys| Values|
-------------- ------------------------------------------------------------- --------------------- ------------------------------------------------
| AZQ5|{key39=[TBAX3, TBAX6, TBAXN], key46=[TBARO, TBAZ4, TBABN],...|[key1, key2, key3,...|[[TBAX4, TBAXQ, TBAXD],[TBAR1, TBAZA, TBABW],...|
| NXY3|{key39=[TBAX8, TBAX2, TBAXZ], key46=[TBARD, TBAZK, TBAX9],...|[key1, key2, key3,...|[[TBAX5, TBAXA, TBAXC],[TBAR2, TBAZS, TBABE],...|
| QSW6|{key39=[TBAX5, TBAX3, TBAX8], key46=[TBARB, TBAZN, TBAX4],...|[key1, key2, key3,...|[[TBAX6, TBAXZ, TBAXF],[TBAR3, TBAZD, TBABR],...|
| LJB7|{key39=[TBAX3, TBAXN, TBAXL], key46=[TBARM, TBAZ2, TBAX3],...|[key1, key2, key3,...|[[TBAX7, TBAXC, TBAXG],[TBAR4, TBAZF, TBABT],...|
| MKH9|{key39=[TBAX4, TBAX9, TBAXV], key46=[TBARB, TBAZB, TBAX1],...|[key1, key2, key3,...|[[TBAX8, TBAXV, TBAXH],[TBAR5, TBAZG, TBABY],...|
| UFG1|{key39=[TBAX3, TBAX6, TBAXQ], key46=[TBARL, TBAZB, TBAX0],...|[key1, key2, key3,...|[[TBAX9, TBAXB, TBAXJ],[TBAR6, TBAZH, TBABU],...|
| WDE4|{key39=[TBAX6, TBAX7, TBAX9], key46=[TBARX, TBAX6, TBAX8],...|[key1, key2, key3,...|[[TBAX0, TBAXN, TBAXK],[TBAR7, TBAZJ, TBABI],...|
| VRX8|{key39=[TBAX3, TBAX1, TBAX0], key46=[TBARQ, TBAX9, TBAX3],...|[key1, key2, key3,...|[[TBAX2, TBAXM, TBAXL],[TBAR8, TBAZK, TBABO],...|
| CIZ2|{key39=[TBAX3, TBAXC, TBAX2], key46=[TBARA, TBAXQ, TBAX1],...|[key1, key2, key3,...|[[TBAX3, TBAXA, TBAXO],[TBAR9, TBAZL, TBABP],...|
| BEO3|{key39=[TBAX9, TBAXQ, TBAX4], key46=[TBARP, TBAXV, TBAX2],...|[key1, key2, key3,...|[[TBAX1, TBAXS, TBAXI],[TBAR0, TBAZQ, TBABZ],...|
-------------- ------------------------------------------------------------- --------------------- ------------------------------------------------
Я попробовал метод ниже, но я получаю следующую ошибку:
getKeyValues = udf(lambda ar1, ar2: {ar1.get(x) for x in ar2})
df.withColumn("Values", getKeyValues(df["keyValue"], df["Keys"])).show()
AttributeError: 'unicode' object has no attribute 'get'
Я попробовал другой UDF ниже, который также выдает ту же ошибку в юникоде:
def getKeyVals(ar1, ar2):
arr = []
for x in ar2:
arr.append(ar1.get(x, None))
return arr
udf_split = udf(split, ArrayType(StringType()))
df.withColumn("test", udf_getKeyVals(df['keyValue'], df['Keys'])).show()
Я также пробовал следующую функцию ниже, но получаю аналогичные ошибки.
Эта функция взята со следующей веб-страницы MungingData
def working_fun(mapping):
def f(ar1):
for x in ar1:
return mapping.get(x)
return F.udf(f)
df.withColumn("test", working_fun(df["keyValue"])(F.col('Keys'))).show()
Буду признателен за любые советы или рекомендации-спасибо!
Обновлено, чтобы включить сведения о схеме и версии Spark ниже
>>> df.select('reference_nbr', 'keyValue', 'Keys').schema.simpleString()
'struct<reference_nbr:string,keyValue:string,Keys:array<string>>'
>>> spark.version
u'2.3.2.3.1.5.6030-1'
Комментарии:
1. Пожалуйста, скопируйте эту команду
df.select('reference_nbr', 'keyValue', 'Keys').schema.simpleString()
и вставьте выходные данные в свой вопрос. Кроме того, какова ваша версия spark?2. Ценю быстрый ответ @Kafels. Я обновил исходную запись, включив в нее информацию о схеме и версии Spark. Спасибо
3. Если вы не возражаете, не могли бы вы поделиться своими данными, поместив вместо этого вывод этой команды
df.select('reference_nbr', 'keyValue', 'Keys').limit(10).collect()
?4. Привет @Kafels извини, что вернулся поздно. К сожалению, столбцы «Значение ключа» и «Ключи» очень большие, и я не думаю, что смогу опубликовать их в переполнении стека. Пожалуйста, дайте мне знать, если я могу поделиться какой-либо другой информацией, еще раз спасибо
Ответ №1:
Если мы преобразуем столбец keyvalue из строкового типа в тип карты, мы можем использовать функцию map_values для извлечения значений:
Я использовал UDF для замены = в ключевых значениях на:, чтобы мы могли изменить тип на карту с помощью модуля ast.
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.master('local[*]').getOrCreate()
def get_dict(c):
c = c.replace("=", ":")
import ast
dict_value = ast.literal_eval(c)
return dict_value
get_dict_udf = udf(lambda c: get_dict(c), MapType(StringType(), ArrayType(IntegerType())))
# Sample dataframe
df = spark.createDataFrame(
[('{"k3"= [6, 5, 4], "k1"= [4, 5, 1], "k8"= [8, 5, 6], "k5"= [7, 4, 3]}',
["k1", "k3", "k5", "k8"])]).toDF("keyvalue", "key")
df.withColumn("keyvalue", get_dict_udf("keyvalue")).
withColumn("values", sort_array(map_values("keyvalue"))).show(truncate=False)
-------------------------------------------------------------------- ---------------- --------------------------------------------
|keyvalue |key |values |
-------------------------------------------------------------------- ---------------- --------------------------------------------
|[k3 -> [6, 5, 4], k5 -> [7, 4, 3], k8 -> [8, 5, 6], k1 -> [4, 5, 1]]|[k1, k3, k5, k8]|[[4, 5, 1], [6, 5, 4], [7, 4, 3], [8, 5, 6]]|
-------------------------------------------------------------------- ---------------- --------------------------------------------
Комментарии:
1. Спасибо за ответ, спасибо! К сожалению, я получаю следующую ошибку: AttributeError: объект «дикт» не имеет атрибута «заменить». Я предполагаю, что мне, возможно, придется использовать regex_replace() вместо этого?
2. Хорошо, вы можете использовать, но в вашем случае столбец keyvalue является строковым, поэтому здесь переменная » c «в udf будет иметь тип string и должна принимать метод замены на «c».