Извлечение вложенного столбца в python spark dataframe

#python #apache-spark #dataframe #pyspark #apache-spark-sql

#python #apache-spark #фрейм данных #писпарк #apache-искра-sql #pyspark #apache-spark-sql

Вопрос:

Часть моей схемы df:

 -- result: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- error: string (nullable = true)
 |    |    |-- hop: long (nullable = true)
 |    |    |-- resuLt: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- from: string (nullable = true)
 |    |    |    |    |-- rtt: double (nullable = true)
 |    |    |    |    |-- size: long (nullable = true)
 |    |    |    |    |-- ttl: long (nullable = true)
 |    |    |-- result: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- Rtt: double (nullable = true)
 |    |    |    |    |-- Ttl: long (nullable = true)
 |    |    |    |    |-- dstoptsize: long (nullable = true)
 |    |    |    |    |-- dup: boolean (nullable = true)
 |    |    |    |    |-- edst: string (nullable = true)
 |    |    |    |    |-- err: string (nullable = true)
 |    |    |    |    |-- error: string (nullable = true)
 |    |    |    |    |-- flags: string (nullable = true)
 |    |    |    |    |-- from: string (nullable = true)
 |    |    |    |    |-- hdropts: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- mss: long (nullable = true)
 |    |    |    |    |-- icmpext: struct (nullable = true)
 |    |    |    |    |    |-- obj: array (nullable = true)
 |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |-- class: long (nullable = true)
 |    |    |    |    |    |    |    |-- mpls: array (nullable = true)
 |    |    |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |    |    |-- exp: long (nullable = true)
 |    |    |    |    |    |    |    |    |    |-- label: long (nullable = true)
 |    |    |    |    |    |    |    |    |    |-- s: long (nullable = true)
 |    |    |    |    |    |    |    |    |    |-- ttl: long (nullable = true)
 |    |    |    |    |    |    |    |-- type: long (nullable = true)
 |    |    |    |    |    |-- rfc4884: long (nullable = true)
 |    |    |    |    |    |-- version: long (nullable = true)
 |    |    |    |    |-- itos: long (nullable = true)
 |    |    |    |    |-- ittl: long (nullable = true)
 |    |    |    |    |-- late: long (nullable = true)
 |    |    |    |    |-- mtu: long (nullable = true)
 |    |    |    |    |-- rtt: double (nullable = true)
 |    |    |    |    |-- sIze: long (nullable = true)
 |    |    |    |    |-- size: long (nullable = true)
 |    |    |    |    |-- tos: long (nullable = true)
 |    |    |    |    |-- ttl: long (nullable = true)
 |    |    |    |    |-- x: string (nullable = true)
  

Как я могу запросить вложенный столбец, как result.result.dstopsize например? Я хотел бы иметь возможность отображать все, начиная с result или даже result.result или result.resuLt (в моей конфигурации spark включен учет регистра)

Когда я пытаюсь:

file_df.select("result.resuLt.dstopsize").show(10)

Я получаю эту ошибку:

cannot resolve '`result`.`resuLt`['dstopsize']' due to data type mismatch: argument 2 requires integral type, however, ''dstopsize'' is of string type.;;

РЕДАКТИРОВАТЬ: вот некоторые примеры данных

 |_corrupt_record| af|       dst_addr|       dst_name|   endtime|         from|  fw|group_id|lts|  msm_id|  msm_name|paris_id|prb_id|proto|              result|size|     src_addr| timestamp| ttr|      type|
 --------------- --- --------------- --------------- ---------- ------------- ---- -------- --- -------- ---------- -------- ------ ----- -------------------- ---- ------------- ---------- ---- ---------- 
|           null|  4|213.133.109.134|213.133.109.134|1551658584|78.197.253.14|4940|    null| 71|    5019|Traceroute|       3| 13230|  UDP|[[, 1,, [[,,,,,,,...|  40|192.168.0.130|1551658577|null|traceroute|
|           null|  4|   37.143.33.15|   37.143.33.15|1551658584|78.197.253.14|4940|15254159| 71|15254159|Traceroute|      12| 13230| ICMP|[[, 1,, [[,,,,,,,...|  48|192.168.0.130|1551658583|null|traceroute|
|           null|  4|  139.162.27.28|  139.162.27.28|1551658612|78.197.253.14|4940|    null| 20|    5027|Traceroute|       3| 13230|  UDP|[[, 1,, [[,,,,,,,...|  40|192.168.0.130|1551658606|null|traceroute|
|           null|  4|    45.33.72.12|    45.33.72.12|1551658610|78.197.253.14|4940|    null| 18|    5029|Traceroute|       3| 13230|  UDP|[[, 1,, [[,,,,,,,...|  40|192.168.0.130|1551658608|null|traceroute|
|           null|  4|104.237.152.132|104.237.152.132|1551658615|78.197.253.14|4940|    null| 23|    5028|Traceroute|       3| 13230|  UDP|[[, 1,, [[,,,,,,,...|  40|192.168.0.130|1551658608|null|traceroute|
|           null|  4|  94.126.208.18|  94.126.208.18|1551658516|37.14.215.183|4940| 9183324| 20| 9183324|Traceroute|      15| 11958| ICMP|[[, 1,, [[,,,,,,,...|  48| 192.168.22.2|1551658439|null|traceroute|
|           null|  4|196.192.112.244|196.192.112.244|1551658554|37.14.215.183|4940| 9181461| 25| 9181461|Traceroute|      15| 11958| ICMP|[[, 1,, [[,,,,,,,...|  48| 192.168.22.2|1551658474|null|traceroute|
|           null|  4|    46.234.34.8|    46.234.34.8|1551658539|37.14.215.183|4940| 9180758| 10| 9180758|Traceroute|      15| 11958| ICMP|[[, 1,, [[,,,,,,,...|  48| 192.168.22.2|1551658479|null|traceroute|
|           null|  4|    185.2.64.76|    185.2.64.76|1551658560|37.14.215.183|4940| 9181290| 31| 9181290|Traceroute|      15| 11958| ICMP|[[, 1,, [[,,,,,,,...|  48| 192.168.22.2|1551658511|null|traceroute|
|           null|  4|  208.80.155.69|  208.80.155.69|1551658597|37.14.215.183|4940| 9183716|  8| 9183716|Traceroute|      15| 11958| ICMP|[[, 1,, [[,,,,,,,...|  48| 192.168.22.2|1551658546|null|traceroute|
 --------------- --- --------------- --------------- ---------- ------------- ---- -------- --- -------- ---------- -------- ------ ----- -------------------- ---- ------------- ---------- ---- ---------- ```
  

Комментарии:

1. Вставьте некоторые воспроизводимые образцы данных. Это очень поможет.

2. Кажется, в этом примере есть некоторые опечатки.. в лучшем случае нам остается только догадываться. Например, вы показываете код для запроса с помощью dstopsize и resuLt, но схема должна соответствовать result и dstoptsize.

3. Я опубликовал схему только для result столбца, как и другие, которые я могу запросить без проблем

4. Было бы очень интересно для нас, и, вероятно, для вас тоже, разработать минимальный экземпляр вашей проблемы. Довольно часто этот процесс позволяет более точно определить источник проблемы, а иногда даже решить ее. Затем, если вам все еще нужна помощь на этом этапе, пожалуйста, предоставьте нам способ воспроизвести вашу проблему (примерные данные минимальный код, который воспроизводит проблему). Я настаиваю на минимальном, потому что это поможет нам легче найти ответ и поможет другим с аналогичной проблемой в будущем найти ответ без необходимости спрашивать.

5. Моя минимальная версия: класс case C(dstoptsize: Long) класс case B(результат: массив [C]) класс case A(результат: массив[B]) значение df = List(A(Array(B(Array(C(10))))))).toDF df.select(«результат.result.dstoptsize»).показать org.apache.spark.sql.AnalysisException: не удается разрешить ‘ result . result [‘dstoptsize’] ‘ из-за несоответствия типов данных: аргумент 2 требует целочисленного типа, однако «dstoptsize» имеет строковый тип.;;

Ответ №1:

result Имеет array тип, поэтому вам нужно будет explode или explode_outer , а затем получить доступ ко всему, что вам нужно для доступа.

 from pyspark.sql.functions import explode_outer, col
file_df.withColumn("exploded_result", explode_outer(col("result")))
       .select("exploded_result.resuLt.dstopsize").show(10)
  

Однако будьте осторожны, у вас будет несколько строк, соответствующих отдельной строке, в зависимости от количества элементов.

Ответ №2:

Это ужасно некрасиво, но это вернет правильный ответ, учитывая, что вы знаете, какие индексы столбцов являются вашими вложенными полями. Это может быть вычислено на основе схемы.

 case class C(x: String, dstoptsize: Long, y: String)
case class B(result: Array[C])
case class A(result: Array[B])

val df = List(
    A(Array(
      B(Array(C("x10", 10, "y10"), C("x11", 11, "y11"))),
      B(Array(C("x12", 12, "y12"), C("x13", 13, "y13")))
      )), 
    A(Array(B(Array(C("x20", 20, "y20"), C("x21", 21, "y21")))))).toDF


val selectInner = udf((x: Seq[Row]) => { x.map(_.getSeq[Row](0).map(_.getLong(1))) })

df.select(selectInner($"result")).show

 -------------------- 
|         UDF(result)|
 -------------------- 
|[[10, 11], [12, 13]]|
|          [[20, 21]]|
 --------------------