Слияние Delta lake не обновляет схему (включена автоматическая эволюция схемы)

#pyspark #databricks #delta-lake

#pyspark #блоки данных #дельта-озеро #delta-lake

Вопрос:

Я получаю сообщение об ошибке при выполнении следующей строки кода:

 deltaTarget.alias('target').merge(df.alias('source'), mergeStatement).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
  

Ошибка заключается в следующем:

Исключение AnalysisException: не удается разрешить new_column в предложении UPDATE для заданных столбцов {Список целевых столбцов}. ‘new_column’ действительно отсутствует в схеме целевой дельта-таблицы, но, согласно документации, это должно просто обновить существующую схему дельта-таблицы и добавить столбец.

Я также включаю автоматическое объединение с помощью этой команды:

 spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled ","true")
  

Я не уверен, что именно вызывает эту ошибку, потому что в прошлом я мог автоматически изменять схему дельта-таблиц с помощью этих точных фрагментов кода.

Есть ли что-то, что я упускаю из виду?

Комментарии:

1. Не могли бы вы поделиться, какую среду выполнения databricks вы используете?

2. В моем случае это оказалось неправильным сообщением об ошибке, скрывающим реальную проблему. (и да, это ошибка для отображения неправильного сообщения об ошибке) — это несоответствие типов между двумя полями, которые я пытался объединить в команде merge into

Ответ №1:

вы должны удалить пробел после ..autoMerge.enabled spark.conf.set

—> это

 spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled","true")
  

, не

 spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled ","true")`
  

Ответ №2:

У меня та же проблема, что и у вас, но я обнаружил, что в документах delta lake он может не поддерживать столбцы деталей с помощью upsertAll() и insertAll(); Поэтому я выбираю upsertExpr(), а insertExpr() с большой картой содержит все столбцы.

слияние Delta lake: проверка схемы

Комментарии:

1. Можете ли вы привести пример замены ‘whenMatchedUpdateAll ()’ и ‘whenNotMatchedInsertAll’ в ‘updateExpr ()’ и ‘insertExpr ()’.

Ответ №3:

Если я не ошибаюсь, вам нужно использовать параметры insertAll или UpdateAll для операции СЛИЯНИЯ

Комментарии:

1. Но в op указано, что для этого он использует UpdateAll / insertAll (в формате whenMatchedInsertAll и т. Д

Ответ №4:

spark.conf.set(«spark.databricks.delta.schema.autoMerge.enabled»,»true»)

Убедитесь, что после «включено» в строке выше нет пробела.

затем вы можете использовать pass a spark sql:

 spark.sql(f"""
MERGE INTO {data_path} delta USING global_temp.src source
       ON   delta.col1   =    source.key1
       AND  delta.col2   =    source.key2
      
       WHEN MATCHED THEN
         UPDATE SET *
         
       WHEN NOT MATCHED THEN
         INSERT *
""")