слияние в таблицу deltalake обновляет все строки

#scala #apache-spark #delta-lake

#scala #apache-spark #дельта-озеро

Вопрос:

я пытаюсь обновить таблицу deltalake, используя фрейм данных spark. Что я хочу сделать, так это обновить все строки, которые отличаются в spark dataframe от таблицы deltalake, и вставить все строки, которые отсутствуют в таблице deltalake.

Я попытался сделать это следующим образом:

 import io.delta.tables._

val not_equal_string = df.schema.fieldNames.map(fn => 
    s"coalesce(not ((updates.${fn} = history.${fn}) or (isnull(history.${fn}) and isnull(updates.${fn})) ),false)"
    ).reduceLeft((x,y) => s"$x OR $y ")

val deltaTable = DeltaTable.forPath(spark, "s3a://sparkdata/delta-table")

deltaTable.as("history").merge(
    df.as("updates"), "updates.EquipmentKey = history.EquipmentKey"
).whenMatched(not_equal_string).updateAll().whenNotMatched().insertAll().execute()

  

это работает, но когда я смотрю в результирующую дельта-таблицу, я вижу, что она фактически удвоилась в размере, даже если я не обновил ни одной записи. Был создан новый файл json с удалением для каждого старого раздела и добавлением для всех новых разделов.

когда я просто запускаю sql-соединение с критерием whenMatched в качестве условия where, я не получаю ни одной строки.

я бы ожидал, что дельта-таблица останется нетронутой после такой операции слияния. я пропустил что-то простое?

Комментарии:

1. Я сталкиваюсь с той же проблемой. В итоге вы нашли решение?

2. нет… но я никогда не тестировал deltalake на databricks. я думаю, что есть довольно много различий между версией OSS и версией databricks, поэтому, возможно, стоит попробовать databricks. кроме того, есть альтернативы, такие как apache hudi.