Собирайте строки в виде массива фрейма данных Spark после группы с помощью PySpark

#apache-spark #pyspark #apache-spark-sql

Вопрос:

У меня есть следующий df:

 url | source | value | name ---------------------------------- a | USA | 1 | registry_a a | USA | 1 | registry_b a | FRA | 1 | registry_a b | DEU | 2 | null b | DEU | 1 | registry_b b | FRA | 1 | registry_a c | ITA | 1 | registry_a c | ITA | 0 | registry_b  

и я хотел бы иметь возможность группироваться по url и source и создавать новый столбец, содержащий все строки группы в столбце массива под названием data .

 url | source | data  ---------------------------------------------------------------------------------- a | USA | [{"url": "a", "source": "USA", "value": 1, "name": "registry_a"},  | | {"url": "a", "source": "USA", "value": 1, "name": "registry_b"},  | | {"url": "a", "source": "FRA", "value": 1, "name": "registry_a"}]  a | FRA | [{"url": "a", "source": "FRA", "value": 1, "name": "registry_a"}]  b | DEU | [{"url": "b", "source": "DEU", "value": 2, "name": null},  | | {"url": "b", "source": "DEU", "value": 1, "name": "registry_a"}]  b | FRA | [{"url": "b", "source": "FRA", "value": 1, "name": "registry_a"}]  c | ITA | [{"url": "c", "source": "ITA", "value": 1, "name": "registry_a"},  | | {"url": "c", "source": "ITA", "value": 0, "name": "registry_b"}]   

Я пробовал это, но это не работает:

 samples_to_map_df = (samples  .groupBy("url", "source")  .agg(F.map_from_arrays(F.collect_list(F.col("url")),   F.collect_list(F.col("source")),  F.collect_list(F.col("value")),  F.collect_list(F.col("value")))   .alias("data")) )  

Я получаю эту ошибку:

Ошибка типа: map_from_arrays() принимает 2 позиционных аргумента, но было дано 4

Ответ №1:

Вам нужно собрать список структур, а затем использовать to_json функцию, чтобы получить желаемый результат:

 import pyspark.sql.functions as F  samples_to_map_df = samples.groupBy("url", "source").agg(  F.to_json(  F.collect_list(  F.struct(*[F.col(c).alias(c) for c in samples.columns])  )  ).alias("data") )  samples_to_map_df.show(truncate=False)  # --- ------ -----------------------------------------------------------------------------------------------------------------------  #|url|source|data | # --- ------ -----------------------------------------------------------------------------------------------------------------------  #|a |USA |[{"url":"a","source":"USA","value":"1","name":"registry_a"},{"url":"a","source":"USA","value":"1","name":"registry_b"}]| #|c |ITA |[{"url":"c","source":"ITA","value":"1","name":"registry_a"},{"url":"c","source":"ITA","value":"0","name":"registry_b"}]| #|b |DEU |[{"url":"b","source":"DEU","value":"2"},{"url":"b","source":"DEU","value":"1","name":"registry_b"}] | #|a |FRA |[{"url":"a","source":"FRA","value":"1","name":"registry_a"}] | #|b |FRA |[{"url":"b","source":"FRA","value":"1","name":"registry_a"}] | # --- ------ -----------------------------------------------------------------------------------------------------------------------