#json #dataframe #pyspark #apache-kafka #spark-streaming
#json #dataframe #pyspark #apache-kafka #искровая потоковая передача
Вопрос:
У меня есть производитель kafka, отправляющий большие объемы данных в формате
{
'1000':
{
'3':
{
'seq': '1',
'state': '2',
'CMD': 'XOR'
}
},
'1001':
{
'5':
{
'seq': '2',
'state': '2',
'CMD': 'OR'
}
},
'1003':
{
'5':
{
'seq': '3',
'state': '4',
'CMD': 'XOR'
}
}
}
….
данные, которые я хочу, находятся в последнем цикле: {'seq': '1', 'state': '2', 'CMD': 'XOR'}
а ключи в циклах выше ('1000' and '3')
являются переменными. Пожалуйста, обратите внимание, что приведенные выше значения приведены только для примера. исходный набор данных огромен множеством переменных ключей. только ключи в последнем цикле {'seq', 'state', 'CMD'}
являются постоянными.
Я пытался использовать общие форматы для чтения данных, но получаю неверные данные, поскольку в приведенных выше циклах есть переменные ключи, и я не уверен, как определить схему для анализа этого формата данных.
Результат, которого я пытаюсь достичь, представляет собой фрейм данных формата
seq state CMD
----------------------
1 2 XOR
2 2 OR
3 4 XOR
Комментарии:
1. каков конечный результат? готовы ли вы получить его в виде фрейма данных? дополнительные разъяснения по ожидаемому результату помогут здесь
2. @dsk… Я добавил ожидаемый результат. спасибо за комментарий.
3. Не могли бы вы сейчас проверить, является ли приведенное ниже решение тем, что вы ищете — был бы признателен, если бы вы могли принять и поддержать .. 🙂
Ответ №1:
Это может быть рабочим решением для вас — используйте и, как показано ниже explode()
getItem()
—
Загрузите json в фрейм данных здесь
a_json={
'1000':
{
'3':
{
'seq': '1',
'state': '2',
'CMD': 'XOR'
}
}
}
df = spark.createDataFrame([(a_json)])
df.show(truncate=False)
-----------------------------------------
|1000 |
-----------------------------------------
|[3 -> [CMD -> XOR, state -> 2, seq -> 1]]|
-----------------------------------------
Логика здесь
df = df.select("*", F.explode("1000").alias("x", "y"))
df = df.withColumn("seq", df.y.getItem("seq")).withColumn("state", df.y.getItem("state")).withColumn("CMD", df.y.getItem("CMD"))
df.show(truncate=False)
----------------------------------------- --- ---------------------------------- --- ----- ---
|1000 |x |y |seq|state|CMD|
----------------------------------------- --- ---------------------------------- --- ----- ---
|[3 -> [CMD -> XOR, state -> 2, seq -> 1]]|3 |[CMD -> XOR, state -> 2, seq -> 1]|1 |2 |XOR|
----------------------------------------- --- ---------------------------------- --- ----- ---
Обновление кода на основе дополнительных входных данных
#Assuming that all the json columns are in a single column, hence making it an array column first.
df = df.withColumn("array_col", F.array("1000", "1001", "1003"))
#Then explode and getItem
df = df.withColumn("explod_col", F.explode("array_col"))
df = df.select("*", F.explode("explod_col").alias("x", "y"))
df_final = df.withColumn("seq", df.y.getItem("seq")).withColumn("state", df.y.getItem("state")).withColumn("CMD", df.y.getItem("CMD"))
df_final.select("seq","state","CMD").show()
|seq|state|CMD|
--- ----- ---
| 1| 2|XOR|
| 2| 2| OR|
| 3| 4|XOR|
--- ----- ---
Комментарии:
1. Привет, ключи являются переменными для первых двух циклов, поэтому использование F.explode(«1000») не решает проблему. Я обновил пример в вопросе.
2. Кроме того, поскольку я получаю данные из потока Kafka, я сохраняю значения в столбце фрейма данных «значение». df = df.select(«*», F.explode(«значение»). псевдоним («x», «y»)) также выдает ошибку: не удается разрешить ‘explode (
value
)’ из-за несоответствия типов данных: входные данные для функции explode должны быть типа array или map, а не string;;3. Я решил это, не могли бы вы, пожалуйста, проверить обновленный ответ, идея здесь состоит в том, чтобы сначала создать столбец explode из столбца value — df = df.withColumn(«explod_col», F.explode(«value»)) и снова разделить ключ и значение снова в двух разных столбцах, подобных этому- df = df.select(«*», F.explode(«explod_col»). псевдоним («x», «y»))
4. набор данных огромен, и 1000, 1001, 1003 — не единственные значения в нем. Таких ключей тысячи. пример в вопросе предназначен только для того, чтобы дать представление о формате набора данных.
5. Приятель, используй df.columns для того, чтобы собрать в массив весь столбец, в конечном итоге вы можете изменить или передать список как свой собственный .. df = df.withColumn(«array_col», F.array(df.columns))