#python #apache-spark #pyspark
#python #apache-spark #pyspark
Вопрос:
У меня есть следующие примеры данных :
------ ------ ------ ---- ---- ---- ----------------- ---------
|selcn1|selcn2|selcn3|var1|var2|var3| selarr| vararr|
------ ------ ------ ---- ---- ---- ----------------- ---------
| one| three| two| 1| 3| 2|[one, three, two]|[1, 3, 2]|
| three| one| two| 3| 1| 2|[three, one, two]|[3, 1, 2]|
| two| null| one| 2| 3| 1| [two,, one]|[2, 3, 1]|
------ ------ ------ ---- ---- ---- ----------------- ---------
Я хочу преобразовать это в следующую структуру JSON для каждой строки, затем сохранить в отдельном столбце, он должен быть отсортирован по ключу, и если в ключе есть null, для него не должно быть никакой записи в JSON (например, третья строка) :
first row : [{"key":"one","value":"1"},{"key":"two","value":"2"}{"key":"three","value":"3"}]
second row : [{"key":"one","value":"1"},{"key":"two","value":"2"}{"key":"three","value":"3"}]
third row : [{"key":"one","value":"1"},{"key":"three","value":"3"}]
Я пробовал это, но не смог отсортировать и пропустить записи, не имеющие ключей :
structure = F.array([ struct(col('selcn1').alias('key') , col('var1').alias('value') ),struct(col('selcn2').alias('key') , col('var2').alias('value') ), struct(col('selcn3').alias('key') , col('var3').alias('value') ) ])
df2.withColumn('temp',to_json(structure)).select('temp').show(23,False)
---------------------------------------------------------------------------------
|temp |
---------------------------------------------------------------------------------
|[{"key":"one","value":"1"},{"key":"three","value":"3"},{"key":"two","value":"2"}]|
|[{"key":"three","value":"3"},{"key":"one","value":"1"},{"key":"two","value":"2"}]|
|[{"key":"two","value":"2"},{"value":"3"},{"key":"one","value":"1"}] |
---------------------------------------------------------------------------------
Комментарии:
1. Я не понимаю часть сортировки, о которой вы упомянули. Вы хотите, чтобы записи JSON сортировались? Каков порядок сортировки здесь?
Ответ №1:
ПРИМЕЧАНИЕ. В вашем примере ваши ключи представляют собой слова и сортируются не как слова, а как числа, т.Е. При сортировке слова three
будут отображаться раньше two
, h
чем раньше w
в алфавите.
Если ваш фактический набор данных не предназначен для сортировки числовых слов, вы можете попробовать следующее:
from pyspark.sql import functions as F
df2=(
df.withColumn(
"json",
F.to_json(
F.filter(
F.array_sort(
F.array([ F.expr("struct(selcn{0} as key, var{0} as value)".format(i)) for i in range(1,4)])
),
lambda rcol : rcol.getField("key").isNotNull()
)
)
)
)
df2.printSchema()
df2.show(truncate=False)
root
|-- selcn1: string (nullable = true)
|-- selcn2: string (nullable = true)
|-- selcn3: string (nullable = true)
|-- var1: string (nullable = true)
|-- var2: string (nullable = true)
|-- var3: string (nullable = true)
|-- selarr: array (nullable = true)
| |-- element: string (containsNull = true)
|-- vararr: array (nullable = true)
| |-- element: string (containsNull = true)
|-- json: string (nullable = true)
------ ------ ------ ---- ---- ---- ----------------- --------- ---------------------------------------------------------------------------------
|selcn1|selcn2|selcn3|var1|var2|var3|selarr |vararr |json |
------ ------ ------ ---- ---- ---- ----------------- --------- ---------------------------------------------------------------------------------
|one |three |two |1 |3 |2 |[one, three, two]|[1, 3, 2]|[{"key":"one","value":"1"},{"key":"three","value":"3"},{"key":"two","value":"2"}]|
|three |one |two |3 |1 |2 |[three, one, two]|[3, 1, 2]|[{"key":"one","value":"1"},{"key":"three","value":"3"},{"key":"two","value":"2"}]|
|two |null |one |2 |3 |1 |[two, , one] |[2, 3, 1]|[{"key":"one","value":"1"},{"key":"two","value":"2"}] |
------ ------ ------ ---- ---- ---- ----------------- --------- ---------------------------------------------------------------------------------
В приведенном выше коде
F.array([ F.expr("struct(selcn{0} as key, var{0} as value)".format(i)) for i in range(1,4)])
создает массивstructs
с желаемымkey
иvalue
. Сокращенный подход достигается с использованием понимания списка python и sparksexpr
, который анализирует выражение sql для создания желаемой структуры- затем array_sort используется для сортировки массива. ПРИМЕЧАНИЕ. Сортировка в spark выполняется в лексикографическом порядке
- затем фильтр используется для фильтрации структур с
null
ключами. Вы можете настроить эту лямбда-функцию (lambda rcol : rcol.getField("key").isNotNull()
) по своему усмотрению, чтобы настроить свой фильтр to_json
затем используется для преобразования отфильтрованного результата в JSON
Дайте мне знать, работает ли это для вашего варианта использования, или вы можете поделиться дополнительными примерами, которые помогут воспроизвести ваш вариант использования.
Дополнительные ссылки
- Массив Pyspark
- Pyspark expr