#pyspark #databricks #delta-lake
#pyspark #блоки данных #дельта-озеро #delta-lake
Вопрос:
Я получаю сообщение об ошибке при выполнении следующей строки кода:
deltaTarget.alias('target').merge(df.alias('source'), mergeStatement).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
Ошибка заключается в следующем:
Исключение AnalysisException: не удается разрешить new_column в предложении UPDATE для заданных столбцов {Список целевых столбцов}. ‘new_column’ действительно отсутствует в схеме целевой дельта-таблицы, но, согласно документации, это должно просто обновить существующую схему дельта-таблицы и добавить столбец.
Я также включаю автоматическое объединение с помощью этой команды:
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled ","true")
Я не уверен, что именно вызывает эту ошибку, потому что в прошлом я мог автоматически изменять схему дельта-таблиц с помощью этих точных фрагментов кода.
Есть ли что-то, что я упускаю из виду?
Комментарии:
1. Не могли бы вы поделиться, какую среду выполнения databricks вы используете?
2. В моем случае это оказалось неправильным сообщением об ошибке, скрывающим реальную проблему. (и да, это ошибка для отображения неправильного сообщения об ошибке) — это несоответствие типов между двумя полями, которые я пытался объединить в команде merge into
Ответ №1:
вы должны удалить пробел после ..autoMerge.enabled
spark.conf.set
—> это
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled","true")
, не
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled ","true")`
Ответ №2:
У меня та же проблема, что и у вас, но я обнаружил, что в документах delta lake он может не поддерживать столбцы деталей с помощью upsertAll() и insertAll(); Поэтому я выбираю upsertExpr(), а insertExpr() с большой картой содержит все столбцы.
Комментарии:
1. Можете ли вы привести пример замены ‘whenMatchedUpdateAll ()’ и ‘whenNotMatchedInsertAll’ в ‘updateExpr ()’ и ‘insertExpr ()’.
Ответ №3:
Если я не ошибаюсь, вам нужно использовать параметры insertAll или UpdateAll для операции СЛИЯНИЯ
Комментарии:
1. Но в op указано, что для этого он использует UpdateAll / insertAll (в формате whenMatchedInsertAll и т. Д
Ответ №4:
spark.conf.set(«spark.databricks.delta.schema.autoMerge.enabled»,»true»)
Убедитесь, что после «включено» в строке выше нет пробела.
затем вы можете использовать pass a spark sql:
spark.sql(f"""
MERGE INTO {data_path} delta USING global_temp.src source
ON delta.col1 = source.key1
AND delta.col2 = source.key2
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
""")