используйте spark SQL для создания массива столбцов карт на основе сопоставления ключей

#apache-spark #pyspark #apache-spark-sql #higher-order-functions

#apache-spark #pyspark #apache-spark-sql #функции более высокого порядка

Вопрос:

У меня есть две таблицы:

entities

 id | i | sources                        | name
----------------------------------------------------
1a | 0 | {"UK/bla": 1, "FR/blu": 2} | "mae"
1a | 1 | {"UK/bla": 1, "IT/bli": 2} | "coulson"
 

source_mapping

 source_name | source_metadata
-----------------------------------------------------------------------------------------
"UK/bla"    | {"source_name": "UK/bla", "description": "this is a description"}
"FR/blu"    | {"source_name": "FR/blu", "description": "ceci est une description"}
"IT/bli"    | {"source_name": "IT/bli", "description": "questa è una descrizione"}

 

Что я хотел бы сделать, так это добавить столбец a в мою таблицу сущностей типа:

 id | i | sources                        | name |  metadata   
---------------------------------------------------------------
1a | 0 | [{"UK/bla": 1}, {"FR/blu": 2}] | ...  | [{"source_name": "UK/bla", "description": "this is a description"}, {"source_name": "FR/blu", "description": "ceci est une description"}]
1a | 1 | [{"UK/bla": 1}, {"IT/bli": 2}] | ...  | [{"source_name": "UK/bla", "description": "this is a description"}, {"source_name": "IT/bli", "description": "questa è una descrizione"}]
 

Я нашел способ сделать это, выполнив:

 entities_sources_exploded = (entities.select(F.col("id"), 
                                             F.col("i"),
                                             F.explode(F.col("sources")))
                                     .withColumnRenamed("key", "source_name")
                                     .drop("value"))  # get rid of it

entities_sources_exploded_with_metadata = (entities_sources_exploded
                                           .join(sources_mapping,
                                                 entities_sources_exploded.source_name == sources_mapping.source_name,
                                                 "left"))
entities_with_metadata = (entities_sources_exploded_with_metadata
                          .groupBy(F.col("id"), F.col("i"))
                          .agg(F.collect_list("source_metadata").alias("metadata")))
 

И это работает — но у меня есть подлые подозрения, что есть способы сделать это без взрыва и работы с HOF в spark SQL, завернутом в an .expr() — я бы хотел посмотреть, как кто-то более свободно, чем я, решит эту проблему.

Ответ №1:

Я думаю, это должно сработать:

 import pandas as pd

# Setup data
data1 = pd.DataFrame({
    "id": ["1a", "1a"],
    "i": [0, 1],
    "sources": [{"UK/bla": 1, "FR/blu": 2}, {"UK/bla": 1, "IT/bli": 2}],
    "name": ["mae", "coulson"]
})
df1 = spark.createDataFrame(data1)
data2 = pd.DataFrame({
    "source_name": ["UK/bla", "FR/blu", "IT/bli"],
    "source_metadata": [
        {"source_name": "UK/bla", "description": "this is a description"},
        {"source_name": "FR/blu", "description": "ceci est une description"},
        {"source_name": "IT/bli", "description": "questa è una descrizione"}
    ]
})
df2 = spark.createDataFrame(data2)

# Create temp tables and execute SQL
df1.registerTempTable("df1")
df2.registerTempTable("df2")
query = """
    SELECT
        temp.id,
        temp.i,
        COLLECT_LIST(source) AS sources,
        temp.name,
        COLLECT_LIST(source_metadata) AS metadata
    FROM (
        SELECT
            *,
            map(key, value) AS source
        FROM (
            SELECT
                df1.id,
                df1.i,
                df1.name,
                EXPLODE(df1.sources)
            FROM df1
        ) AS df1_exploded
        JOIN df2
        ON df2.source_name = df1_exploded.key
    ) AS temp
    GROUP BY temp.id, temp.i, temp.name
"""
result = spark.sql(query)
result.show(5)
 

Комментарии:

1. красиво — спасибо!