PySpark: запись в режиме «добавления» и перезапись, если совпадают определенные критерии

#dataframe #pyspark #append #amazon-redshift #write

#фрейм данных #pyspark #добавить #amazon-redshift

Вопрос:

Я добавляю следующий фрейм данных Spark в существующую базу данных Redshift. И я хочу использовать «месяц» и «состояние» в качестве критериев для проверки и замены данных в таблице красного смещения, если month = '2021-12' and state = 'ga'

фрейм данных, который мы хотим добавить: df

месяц состояние продукт
2021-12 ca ho0
2021-12 ca ho1
2021-12 ca ho2
2021-12 ca ho3
2021-12 ca ho4
2021-12 ga ho5
2021-12 ga ho6
2021-12 ga ho7
2021-12 ga ho8
2021-12 ga ho9

Я попробовал следующий скрипт, чтобы добавить его, похоже, он только df добавляет фрейм данных, не заменяя (перезаписывая) существующие записи за месяц «2021-12» и состояние «ga».

 df.write 
  .format("xxx") 
  .option("url", "xxx") 
  .option("dbtable", "table1") 
  .option("tempdir", "xxxx") 
  .option("aws_iam_role", "xxxx") 
  .mode("append") 
  .option("replaceWhere", "month == '2021-12' AND state == 'ga'") 
  .save()
 

Я думаю .option("replaceWhere", "month == '2021-12' AND state == 'ga'") , это не работает. Как я могу внести изменения? Спасибо!
(Я также попробовал следующую часть, похоже, что существующие записи исчезли и заменены на df )

   .mode("overwrite") 
  .option("replaceWhere", "month == '2021-12' AND state == 'ga'") 
 

Комментарии:

1. Поскольку я использую databricks, он не поддерживает такого рода частичное обновление / перезапись; нужно полагаться на Delta Lake, который в настоящее время пытается: docs.databricks.com/delta/delta-update.html#language-python

2. Не уверен, что ваша точка зрения верна

Ответ №1:

replaceWhere Этот параметр работает почти как раздел динамической перезаписи, в основном вы говорите Spark перезаписывать только те данные, которые находятся в этих разделах диапазона. Кроме того, данные будут сохранены только в том случае, если ваш фрейм данных соответствует условию replaceWhere , в противном случае, если одна строка не совпадает, replaceWhere будет выдано исключение, данные которого не совпадают. Вы ссылаетесь на ссылку.Если вы хотите обработать тот же сценарий внутри раздела, вы можете использовать MERGE INTO .

Комментарии:

1. Спасибо, думаю, я использовал эту функцию, но изначально не в Delta Lake, но это полезно, спасибо

Ответ №2:

Spark не может выполнить слияние данных с Redshift напрямую, поскольку мы используем Databricks, он предоставляет «Delta Lake» в качестве среды для выполнения обновления / слияния данных, затем мы записываем таблицу из Delta Lake в Redshift. https://docs.databricks.com/delta/delta-update.html#language-python

Ответ №3:

При работе с Redshift вам необходимо перейти по этой ссылке ниже. Немного больше, чем вам нужно, но применимо. Ссылка показывает имитацию UPSERT. Ссылка http://www.silota.com/blog/amazon-redshift-upsert-support-staging-table-replace-rows /

Даже вставка в Redshift — это не искра, я писал об этом в прошлом. Стандартно используется КОПИРОВАНИЕ из s3.

Комментарии:

1. Спасибо, что поделились! Похоже, это альтернативный подход, помимо моего, использующий платформу Databricks.

2. извините, мы не хотим полагаться на SQL в данный момент, хотя это применимо, спасибо за ваше предложение, но не могу его принять, поскольку я ищу решение, связанное с Spark.

3. Ну, не многие это делают.

4. Верно, лол, но спасибо