#pyspark #hive
Вопрос:
У меня есть фрейм данных spark, на основе которого я пытаюсь создать секционированную таблицу в hive.
У меня есть флаг, чтобы сказать, существует таблица или нет. При первом запуске должна быть создана таблица, а со второго запуска данные должны быть вставлены в таблицу без перезаписи существующих данных.
Мой вопрос заключается в том, как создать секционированную таблицу и вставить ее в уже существующую секционированную таблицу без переопределения существующих данных.
Таблица разделена столбцом под названием дата.
То, что я пробовал до сих пор.(без разделов)
df.createOrReplaceTempView("df_view")
if table_exists:
spark.sql("insert into mytable select * from df_view")
else:
spark.sql("create table if not exists mytable as select * from df_view")
Но я должен сделать то же самое с разделенной датой столбца.
На одну и ту же дату может быть несколько запусков. Так можно ли добавить данные в тот же раздел вместо их перезаписи.
Ожидаемый результат: После 1-го запуска: таблица должна быть создана со столбцом раздела в качестве даты.
Name date timestamp
A. 2021-09-16 2021-09-16 12:00:01
B. 2021-09-16 2021-09-16 12:00:01
После второго запуска в ту же дату:(данные должны быть добавлены в тот же раздел)
Name date timestamp
A. 2021-09-16 2021-09-16 12:00:01
B. 2021-09-16 2021-09-16 12:00:01
A. 2021-09-16 2021-09-16 12:20:01
B. 2021-09-16 2021-09-16 12:20:01
Третий запуск на следующую дату: (новый раздел должен быть создан с сохранением всех существующих данных)
Name date timestamp
A. 2021-09-16 2021-09-16 12:00:01
B. 2021-09-16 2021-09-16 12:00:01
A. 2021-09-16 2021-09-16 12:20:01
B. 2021-09-16 2021-09-16 12:20:01
A. 2021-09-17 2021-09-17 12:20:01
B. 2021-09-17 2021-09-17 12:20:01
Как этого добиться в Пыспарке.
Комментарии:
1. Почему бы вам не использовать
write
append
режим ?2. Ок.. как добавить при создании таблицы-улья с разделом
3. Могу ли я ответить, используя pyspark без
spark.sql()
?4. ДА. Пожалуйста. Пока это писпарк, для меня все в порядке
Ответ №1:
Следуя документации, ваш код может быть примерно таким:
df.write.saveAsTable('[table_name_here]',
format='[format_here]',
mode='append',
partitionBy='date')
В этом коде нет необходимости проверять, существует ли таблица, append
автоматически создается, если она не существует.
Ответ №2:
вы можете запустить parts = spark.sql('show partitions mytable')
, даже преобразовать его в список Python или Панд и проверить, существует ли раздел
Комментарии:
1. Спасибо. Но мой вопрос был больше связан с созданием раздела и добавлением данных в раздел