Создайте управляемую секционированную таблицу hive с помощью фрейма данных pyspark и добавляйте данные для каждой строки

#pyspark #hive

Вопрос:

У меня есть фрейм данных spark, на основе которого я пытаюсь создать секционированную таблицу в hive.

У меня есть флаг, чтобы сказать, существует таблица или нет. При первом запуске должна быть создана таблица, а со второго запуска данные должны быть вставлены в таблицу без перезаписи существующих данных.

Мой вопрос заключается в том, как создать секционированную таблицу и вставить ее в уже существующую секционированную таблицу без переопределения существующих данных.

Таблица разделена столбцом под названием дата.

То, что я пробовал до сих пор.(без разделов)

 df.createOrReplaceTempView("df_view")
if table_exists:
   spark.sql("insert into mytable select * from df_view")
else:
   spark.sql("create table if not exists mytable as select * from df_view")
 

Но я должен сделать то же самое с разделенной датой столбца.

На одну и ту же дату может быть несколько запусков. Так можно ли добавить данные в тот же раздел вместо их перезаписи.

Ожидаемый результат: После 1-го запуска: таблица должна быть создана со столбцом раздела в качестве даты.

 Name date        timestamp
A.   2021-09-16  2021-09-16 12:00:01
B.   2021-09-16  2021-09-16 12:00:01
 

После второго запуска в ту же дату:(данные должны быть добавлены в тот же раздел)

 Name date        timestamp
A.   2021-09-16  2021-09-16 12:00:01
B.   2021-09-16  2021-09-16 12:00:01
A.   2021-09-16  2021-09-16 12:20:01
B.   2021-09-16  2021-09-16 12:20:01
 

Третий запуск на следующую дату: (новый раздел должен быть создан с сохранением всех существующих данных)

 Name date        timestamp
A.   2021-09-16  2021-09-16 12:00:01
B.   2021-09-16  2021-09-16 12:00:01
A.   2021-09-16  2021-09-16 12:20:01
B.   2021-09-16  2021-09-16 12:20:01
A.   2021-09-17  2021-09-17 12:20:01
B.   2021-09-17  2021-09-17 12:20:01
 

Как этого добиться в Пыспарке.

Комментарии:

1. Почему бы вам не использовать write append режим ?

2. Ок.. как добавить при создании таблицы-улья с разделом

3. Могу ли я ответить, используя pyspark без spark.sql() ?

4. ДА. Пожалуйста. Пока это писпарк, для меня все в порядке

Ответ №1:

Следуя документации, ваш код может быть примерно таким:

 df.write.saveAsTable('[table_name_here]', 
                     format='[format_here]', 
                     mode='append', 
                     partitionBy='date')
 

В этом коде нет необходимости проверять, существует ли таблица, append автоматически создается, если она не существует.

Ответ №2:

вы можете запустить parts = spark.sql('show partitions mytable') , даже преобразовать его в список Python или Панд и проверить, существует ли раздел

Комментарии:

1. Спасибо. Но мой вопрос был больше связан с созданием раздела и добавлением данных в раздел