Получение данных из вложенного json в kafka stream pyspark

#json #dataframe #pyspark #apache-kafka #spark-streaming

#json #dataframe #pyspark #apache-kafka #искровая потоковая передача

Вопрос:

У меня есть производитель kafka, отправляющий большие объемы данных в формате

 {
  '1000': 
    {
       '3': 
        {
           'seq': '1', 
           'state': '2', 
           'CMD': 'XOR' 
        }
    },
 '1001': 
    {
       '5': 
        {
           'seq': '2', 
           'state': '2', 
           'CMD': 'OR' 
        }
    },
 '1003': 
    {
       '5': 
        {
           'seq': '3', 
           'state': '4', 
           'CMD': 'XOR' 
        }
    }
}
  

….
данные, которые я хочу, находятся в последнем цикле: {'seq': '1', 'state': '2', 'CMD': 'XOR'} а ключи в циклах выше ('1000' and '3') являются переменными. Пожалуйста, обратите внимание, что приведенные выше значения приведены только для примера. исходный набор данных огромен множеством переменных ключей. только ключи в последнем цикле {'seq', 'state', 'CMD'} являются постоянными.

Я пытался использовать общие форматы для чтения данных, но получаю неверные данные, поскольку в приведенных выше циклах есть переменные ключи, и я не уверен, как определить схему для анализа этого формата данных.

Результат, которого я пытаюсь достичь, представляет собой фрейм данных формата

 seq    state     CMD
----------------------
 1       2       XOR
 2       2        OR
 3       4       XOR
  

Комментарии:

1. каков конечный результат? готовы ли вы получить его в виде фрейма данных? дополнительные разъяснения по ожидаемому результату помогут здесь

2. @dsk… Я добавил ожидаемый результат. спасибо за комментарий.

3. Не могли бы вы сейчас проверить, является ли приведенное ниже решение тем, что вы ищете — был бы признателен, если бы вы могли принять и поддержать .. 🙂

Ответ №1:

Это может быть рабочим решением для вас — используйте и, как показано ниже explode() getItem()

Загрузите json в фрейм данных здесь

 a_json={
  '1000': 
    {
       '3': 
        {
           'seq': '1', 
           'state': '2', 
           'CMD': 'XOR' 
        }
    }
}
df = spark.createDataFrame([(a_json)])
df.show(truncate=False)

 ----------------------------------------- 
|1000                                     |
 ----------------------------------------- 
|[3 -> [CMD -> XOR, state -> 2, seq -> 1]]|
 ----------------------------------------- 
  

Логика здесь

 df = df.select("*", F.explode("1000").alias("x", "y"))
df = df.withColumn("seq", df.y.getItem("seq")).withColumn("state", df.y.getItem("state")).withColumn("CMD", df.y.getItem("CMD"))
df.show(truncate=False)


  ----------------------------------------- --- ---------------------------------- --- ----- --- 
|1000                                     |x  |y                                 |seq|state|CMD|
 ----------------------------------------- --- ---------------------------------- --- ----- --- 
|[3 -> [CMD -> XOR, state -> 2, seq -> 1]]|3  |[CMD -> XOR, state -> 2, seq -> 1]|1  |2    |XOR|
 ----------------------------------------- --- ---------------------------------- --- ----- --- 
  

Обновление кода на основе дополнительных входных данных

 #Assuming that all the json columns are in a single column, hence making it an array column first.
df = df.withColumn("array_col", F.array("1000", "1001", "1003"))
#Then explode and getItem
df = df.withColumn("explod_col", F.explode("array_col"))
df = df.select("*", F.explode("explod_col").alias("x", "y"))
df_final = df.withColumn("seq", df.y.getItem("seq")).withColumn("state", df.y.getItem("state")).withColumn("CMD", df.y.getItem("CMD"))
df_final.select("seq","state","CMD").show()
|seq|state|CMD|
 --- ----- --- 
|  1|    2|XOR|
|  2|    2| OR|
|  3|    4|XOR|
 --- ----- --- 
  

Комментарии:

1. Привет, ключи являются переменными для первых двух циклов, поэтому использование F.explode(«1000») не решает проблему. Я обновил пример в вопросе.

2. Кроме того, поскольку я получаю данные из потока Kafka, я сохраняю значения в столбце фрейма данных «значение». df = df.select(«*», F.explode(«значение»). псевдоним («x», «y»)) также выдает ошибку: не удается разрешить ‘explode ( value )’ из-за несоответствия типов данных: входные данные для функции explode должны быть типа array или map, а не string;;

3. Я решил это, не могли бы вы, пожалуйста, проверить обновленный ответ, идея здесь состоит в том, чтобы сначала создать столбец explode из столбца value — df = df.withColumn(«explod_col», F.explode(«value»)) и снова разделить ключ и значение снова в двух разных столбцах, подобных этому- df = df.select(«*», F.explode(«explod_col»). псевдоним («x», «y»))

4. набор данных огромен, и 1000, 1001, 1003 — не единственные значения в нем. Таких ключей тысячи. пример в вопросе предназначен только для того, чтобы дать представление о формате набора данных.

5. Приятель, используй df.columns для того, чтобы собрать в массив весь столбец, в конечном итоге вы можете изменить или передать список как свой собственный .. df = df.withColumn(«array_col», F.array(df.columns))