Как я могу объединить инкрементный набор данных и набор данных моментального снимка, сохранив удаленные строки?

#palantir-foundry #foundry-code-repositories #foundry-python-transform

Вопрос:

У меня есть источник подключения к данным, который создает два набора данных:

  • Набор данных X (Снимок)
  • Набор данных Y (инкрементный)

Два набора данных извлекаются из одного и того же источника. Набор X данных состоит из текущего состояния всех строк в исходной таблице. Набор Y данных извлекает все строки, которые были обновлены с момента последней сборки. Эти два набора данных затем объединяются ниже по потоку в набор Z данных, причем набор Z данных является либо набором X данных, либо самой последней версией каждой строки из набора Y данных . Это позволяет нам как обновлять с низкой задержкой, так и поддерживать хорошее разделение.

Когда строки удаляются из исходной таблицы, строки больше не присутствуют в наборе X данных, но все еще присутствуют в наборе Y данных .

Как лучше всего сохранить эти «удаленные» строки в наборе Z данных ? В идеале я также мог бы сделать снимок набора Y данных без потери каких-либо «удаленных» строк.

Ответ №1:

Хороший вопрос! Насколько я понимаю, вы хотите, чтобы в наборе Z данных были только самые последние строки, включая самые последние удаленные строки. Как обновленные строки, так и удаленные строки присутствуют в Y . В этом случае я бы предложил сначала объединить Y и X объединить, чтобы все строки, включая удаленные строки, присутствовали в наборе данных объединения. Затем используйте функцию окна над столбцом даты, чтобы получить самую последнюю версию каждой строки. Вот краткое описание кода pyspark, который я бы предложил для этого:

 from pyspark.sql import Window
import pyspark.sql.functions as F

window = Window.partitionBy(primary_keys).orderBy(F.col(date_column).desc())
Z = X.unionByName(Y) # union to get all columns, including deleted
Z = Z.withColumn("row_num", F.row_number().over(window)) # rank by date created/updated
Z = Z.filter(F.col("row_num") == 1).drop("row_num") # keep only the latest version of each row
 

Обратите внимание, что это решение не решает проблему того, что произойдет, если Y снимет снимки.