#python #airflow #snowflake-cloud-data-platform
Вопрос:
У меня есть запрос ( foreign_keys.sql
), который создает внешние ключи, выводом являются строки инструкций ALTER для добавления FK, но как я могу выполнить эти инструкции?
пример строки:
ALTER TABLE "EIV"."RTR"."LINEITEMS" ADD FOREIGN KEY (ITEM_ID) REFERENCES "EIV"."RTR"."ID_LINEITEMS" (ID);
Ниже показано, как бы я запустил это в Airflow, но как затем выполнить инструкции?
snp_create_foreign_keys = SnowflakeQueryOperator( task_id='create_foreign_keys', sql='queries/foreign_keys.sql', params={ 'schema': 'qtr' }, retries=0)
Вот как выглядит наш оператор SnowflakeQueryОператор:
class SnowflakeQueryOperator(BaseOperator): template_fields = ['sql', 'params'] template_ext = ['.sql'] @apply_defaults def __init__(self, sql, params=None, warehouse=Variable.get('default_snowflake_warehouse'), *args, **kwargs): super().__init__(*args, **kwargs) self.sql = sql self.params = params self.warehouse = warehouse def execute(self, context): sf_hook = SnowflakeHook(warehouse=self.warehouse) self.log.info(f'Running query:') sf_hook.execute_query(self.sql)
Комментарии:
1. что вы имеете в виду , ваш код должен запускать файл sql, какая бы команда в нем ни была , вы не показали, но вам нужно подключение snowflake к вашему snowflake в airflow
2. @eshirvana Я обновил, чтобы показать, как выглядит наш оператор SnowflakeQueryОператор, мне интересно, как в запросе я мог бы также не просто выполнить запрос (/foreign_keys.sql), который заполняет строки операторами alter, но как также выполнить цикл по каждой строке и выполнить?
3. каждую строку в файле вы имеете в виду? он будет запускать все в файле строка за строкой .
4. @eshirvana В файле есть только 1 оператор запроса, выводом этого оператора является несколько строк «alter table,… и т. Д.», Я хочу прочитать и выполнить каждую выходную строку
5. Он будет запускать весь код внутри этого файла, почему вы хотите. Прогнать их одного за другим ?
Ответ №1:
Насколько я понимаю, SnowflakeOperator в Airflow не возвращает результаты запроса select, он должен использоваться только для выполнения запросов к Snowflake (как и большинство операторов баз данных) и либо завершаться неудачно, либо завершаться успешно.
Для этого вам нужно будет написать свой собственный оператор.