#apache-spark #pyspark #apache-spark-sql #databricks #delta-lake
Вопрос:
В настоящее время у меня есть файл JSON, из которого я могу выгрузить его данные во временное представление с помощью. следуя логике Python (PySpark) :
departMentData = spark
.read
.option("multiLine", True)
.option("mode", "PERMISSIVE")
.json("C:\Testdata.json")
.createOrReplaceTempView("vw_TestView")
Это временное представление содержит данные отделов и список сотрудников в этом отделе в массиве. Один сотрудник может быть частью нескольких отделов.
Ниже приведены типы данных этого представления:
- Отдел: строка
- Имя отдела: строка
- Идентификаторы сотрудников: массив<строка.>
и данные таблицы для vw_TestView выглядят следующим образом
Отдел | Название отдела | Сотрудники |
---|---|---|
D01 | разработка | [«U1234», «U6789»] |
D02 | qa | [«U1234», «U2345»] |
и в другой таблице Сотрудники имеют сведения обо всех этих сотрудниках следующим образом:
ЭмпИД | Имя |
---|---|
U1234 | джон |
U6789 | кузнец |
U2345 | наташа |
Мне нужно, чтобы конечный результат для новой таблицы был следующим:
Отдел | Название отдела | Сотрудники | Имена сотрудников |
---|---|---|---|
D01 | разработка | [«U1234», «U6789»] | [«джон», «смит»] |
D02 | qa | [«U1234», «U2345»] | [«джон», «наташа»] |
Как такие объединения могут выполняться в SQL-источниках данных или через PySpark?
Ответ №1:
Вы можете попробовать следующее, которое используется explode
для разделения списка идентификаторов сотрудников на разные строки, прежде чем присоединять их и использовать collect_list
для объединения записей в список.
Использование spark sql:
ПРИМЕЧАНИЕ. Убедитесь Employees
, что доступно в виде таблицы/представления, например EmployeeData.createOrReplaceTempView("Employees")
WITH dept_employees AS (
SELECT
DeptId,
DeptName,
explode(EmployeeIDs)
FROM
vw_TestView
)
SELECT
d.DeptId,
d.DeptName,
collect_list(e.EmpID) as EmployeeIDs,
collect_list(e.EmpName) as EmployeeNames
FROM
dept_employees d
INNER JOIN
Employees e ON d.col=e.EmpID
GROUP BY
d.Deptid,
d.DeptName
или с помощью api pyspark:
from pyspark.sql import functions as F
output_df = (
departMentData.select(
F.col("DeptId"),
F.col("DeptName"),
F.explode("EmployeeIDs")
)
.alias("d")
.join(
EmployeeData.alias("e"),
F.col("d.col")==F.col("e.EmpID"),
"inner"
)
.groupBy("d.DeptId","d.DeptName")
.agg(
F.collect_list("e.EmpID").alias("EmployeeIDs"),
F.collect_list("e.EmpName").alias("EmployeeNames")
)
)
Дайте мне знать, если это сработает для вас.
Комментарии:
1. Оба этих подхода работают.