#apache-spark #pyspark #delta-lake
#apache-spark #pyspark #дельта-озеро
Вопрос:
В настоящее время у меня есть фрейм данных pyspark, из которого я изначально создал дельта-таблицу, используя приведенный ниже код —
df.write.format("delta").saveAsTable("events")
Теперь, поскольку приведенный выше фрейм данных ежедневно заполняет данные в соответствии с моим требованием, следовательно, для добавления новых записей в дельта-таблицу я использовал приведенный ниже синтаксис —
df.write.format("delta").mode("append").saveAsTable("events")
Теперь все это я сделал в databricks и в моем кластере. Я хочу знать, как я могу написать общий код pyspark на python, который создаст дельта-таблицу, если она не существует, и добавит записи, если дельта-таблица существует.Это то, что я хочу сделать, потому что, если я передам кому-то свой пакет python, у них не будет такой же дельта-таблицы в их среде, поэтому она должна создаваться динамически из кода.
Комментарии:
1. Это именно то определение режима добавления при записи.
Ответ №1:
Если у вас еще нет дельта-таблицы, она будет создана при использовании append
режима. Таким образом, вам не нужно писать какой-либо специальный код для обработки случая, когда таблица еще не существует, и когда она завершается.
P.S. Такой код вам понадобится только в том случае, если вы выполняете слияние с таблицей, а не добавление. В этом случае код будет выглядеть следующим образом:
if table_exists:
do_merge
else:
df.write....
PS вот общая реализация этого шаблона
Ответ №2:
В конечном итоге с spark доступны две операции
- Таблица сохранения: — создайте или замените таблицу, если она присутствует или отсутствует, текущим фреймом данных
- insertInto: — Успешно, если таблица присутствует и выполняет операцию на основе режима («перезаписать» или «добавить»). для этого требуется, чтобы таблица была доступна в базе данных.
Таблица .saveAsTable («события») в основном переписывает таблицу каждый раз, когда вы ее вызываете. это означает, что, даже если у вас есть таблица, присутствующая ранее или нет, она заменит таблицу текущим значением фрейма данных. Вместо этого вы можете выполнить приведенную ниже операцию, чтобы быть в более безопасной стороне:
Шаг 1: Создайте таблицу, даже если она присутствует или нет. Если таковые имеются, удалите данные из таблицы и добавьте новые записи фрейма данных, в противном случае создайте таблицу и добавьте данные.
df.createOrReplaceTempView(‘df_table’)
spark.sql(«создать таблицу, ЕСЛИ НЕ СУЩЕСТВУЕТ имя_таблицы, используя delta select * из df_table, где 1= 2»)
df.write.format(«дельта»).mode(«добавить»).insertInto(«события»)
Таким образом, каждый раз, когда он будет проверять, доступна таблица или нет, в противном случае он создаст таблицу и перейдет к следующему шагу. В противном случае, если таблица доступна, добавьте данные в таблицу.