Базы данных — Как объединить таблицу с идентификаторами, содержащимися в столбце типа struct<массив>

#apache-spark #pyspark #apache-spark-sql #databricks #delta-lake

Вопрос:

В настоящее время у меня есть файл JSON, из которого я могу выгрузить его данные во временное представление с помощью. следуя логике Python (PySpark) :

  departMentData = spark 
                .read 
                .option("multiLine", True) 
                .option("mode", "PERMISSIVE") 
                .json("C:\Testdata.json") 
                .createOrReplaceTempView("vw_TestView")
 

Это временное представление содержит данные отделов и список сотрудников в этом отделе в массиве. Один сотрудник может быть частью нескольких отделов.

Ниже приведены типы данных этого представления:

  • Отдел: строка
  • Имя отдела: строка
  • Идентификаторы сотрудников: массив<строка.>

и данные таблицы для vw_TestView выглядят следующим образом

Отдел Название отдела Сотрудники
D01 разработка [«U1234», «U6789»]
D02 qa [«U1234», «U2345»]

и в другой таблице Сотрудники имеют сведения обо всех этих сотрудниках следующим образом:

ЭмпИД Имя
U1234 джон
U6789 кузнец
U2345 наташа

Мне нужно, чтобы конечный результат для новой таблицы был следующим:

Отдел Название отдела Сотрудники Имена сотрудников
D01 разработка [«U1234», «U6789»] [«джон», «смит»]
D02 qa [«U1234», «U2345»] [«джон», «наташа»]

Как такие объединения могут выполняться в SQL-источниках данных или через PySpark?

Ответ №1:

Вы можете попробовать следующее, которое используется explode для разделения списка идентификаторов сотрудников на разные строки, прежде чем присоединять их и использовать collect_list для объединения записей в список.

Использование spark sql:

ПРИМЕЧАНИЕ. Убедитесь Employees , что доступно в виде таблицы/представления, например EmployeeData.createOrReplaceTempView("Employees")

 WITH dept_employees AS (
    SELECT
        DeptId,
        DeptName,
        explode(EmployeeIDs)
    FROM
        vw_TestView
)
SELECT
    d.DeptId,
    d.DeptName,
    collect_list(e.EmpID) as EmployeeIDs,
    collect_list(e.EmpName) as EmployeeNames
FROM
    dept_employees d
INNER JOIN
    Employees e ON d.col=e.EmpID
GROUP BY
    d.Deptid,
    d.DeptName
 

или с помощью api pyspark:

 from pyspark.sql import functions as F

output_df = (
    departMentData.select(
        F.col("DeptId"),
        F.col("DeptName"),
        F.explode("EmployeeIDs")
    )
    .alias("d")
    .join(
        EmployeeData.alias("e"),
        F.col("d.col")==F.col("e.EmpID"),
        "inner"
    )
    .groupBy("d.DeptId","d.DeptName")
    .agg(
        F.collect_list("e.EmpID").alias("EmployeeIDs"),
        F.collect_list("e.EmpName").alias("EmployeeNames")
    )
)
 

Дайте мне знать, если это сработает для вас.

Комментарии:

1. Оба этих подхода работают.