Вставка или обновление дельта-таблицы из фрейма данных в Pyspark

#apache-spark #pyspark #delta-lake

#apache-spark #pyspark #дельта-озеро

Вопрос:

В настоящее время у меня есть фрейм данных pyspark, из которого я изначально создал дельта-таблицу, используя приведенный ниже код —

 df.write.format("delta").saveAsTable("events")
 

Теперь, поскольку приведенный выше фрейм данных ежедневно заполняет данные в соответствии с моим требованием, следовательно, для добавления новых записей в дельта-таблицу я использовал приведенный ниже синтаксис —

 df.write.format("delta").mode("append").saveAsTable("events")
 

Теперь все это я сделал в databricks и в моем кластере. Я хочу знать, как я могу написать общий код pyspark на python, который создаст дельта-таблицу, если она не существует, и добавит записи, если дельта-таблица существует.Это то, что я хочу сделать, потому что, если я передам кому-то свой пакет python, у них не будет такой же дельта-таблицы в их среде, поэтому она должна создаваться динамически из кода.

Комментарии:

1. Это именно то определение режима добавления при записи.

Ответ №1:

Если у вас еще нет дельта-таблицы, она будет создана при использовании append режима. Таким образом, вам не нужно писать какой-либо специальный код для обработки случая, когда таблица еще не существует, и когда она завершается.

P.S. Такой код вам понадобится только в том случае, если вы выполняете слияние с таблицей, а не добавление. В этом случае код будет выглядеть следующим образом:

 if table_exists:
  do_merge
else:
  df.write....
 

PS вот общая реализация этого шаблона

Ответ №2:

В конечном итоге с spark доступны две операции

  • Таблица сохранения: — создайте или замените таблицу, если она присутствует или отсутствует, текущим фреймом данных
  • insertInto: — Успешно, если таблица присутствует и выполняет операцию на основе режима («перезаписать» или «добавить»). для этого требуется, чтобы таблица была доступна в базе данных.

Таблица .saveAsTable («события») в основном переписывает таблицу каждый раз, когда вы ее вызываете. это означает, что, даже если у вас есть таблица, присутствующая ранее или нет, она заменит таблицу текущим значением фрейма данных. Вместо этого вы можете выполнить приведенную ниже операцию, чтобы быть в более безопасной стороне:

Шаг 1: Создайте таблицу, даже если она присутствует или нет. Если таковые имеются, удалите данные из таблицы и добавьте новые записи фрейма данных, в противном случае создайте таблицу и добавьте данные.

df.createOrReplaceTempView(‘df_table’)

spark.sql(«создать таблицу, ЕСЛИ НЕ СУЩЕСТВУЕТ имя_таблицы, используя delta select * из df_table, где 1= 2»)

df.write.format(«дельта»).mode(«добавить»).insertInto(«события»)

Таким образом, каждый раз, когда он будет проверять, доступна таблица или нет, в противном случае он создаст таблицу и перейдет к следующему шагу. В противном случае, если таблица доступна, добавьте данные в таблицу.