Snowflake — Выполнение вывода запроса в потоке воздуха

#python #airflow #snowflake-cloud-data-platform

Вопрос:

У меня есть запрос ( foreign_keys.sql ), который создает внешние ключи, выводом являются строки инструкций ALTER для добавления FK, но как я могу выполнить эти инструкции?

пример строки:

 ALTER TABLE "EIV"."RTR"."LINEITEMS"  ADD FOREIGN KEY (ITEM_ID) REFERENCES "EIV"."RTR"."ID_LINEITEMS" (ID);  

Ниже показано, как бы я запустил это в Airflow, но как затем выполнить инструкции?

 snp_create_foreign_keys = SnowflakeQueryOperator(  task_id='create_foreign_keys',  sql='queries/foreign_keys.sql',  params={  'schema': 'qtr'  },  retries=0)  

Вот как выглядит наш оператор SnowflakeQueryОператор:

 class SnowflakeQueryOperator(BaseOperator):  template_fields = ['sql', 'params']  template_ext = ['.sql']   @apply_defaults  def __init__(self,  sql,  params=None,  warehouse=Variable.get('default_snowflake_warehouse'),  *args,  **kwargs):  super().__init__(*args, **kwargs)  self.sql = sql  self.params = params  self.warehouse = warehouse   def execute(self, context):  sf_hook = SnowflakeHook(warehouse=self.warehouse)   self.log.info(f'Running query:')   sf_hook.execute_query(self.sql)  

Комментарии:

1. что вы имеете в виду , ваш код должен запускать файл sql, какая бы команда в нем ни была , вы не показали, но вам нужно подключение snowflake к вашему snowflake в airflow

2. @eshirvana Я обновил, чтобы показать, как выглядит наш оператор SnowflakeQueryОператор, мне интересно, как в запросе я мог бы также не просто выполнить запрос (/foreign_keys.sql), который заполняет строки операторами alter, но как также выполнить цикл по каждой строке и выполнить?

3. каждую строку в файле вы имеете в виду? он будет запускать все в файле строка за строкой .

4. @eshirvana В файле есть только 1 оператор запроса, выводом этого оператора является несколько строк «alter table,… и т. Д.», Я хочу прочитать и выполнить каждую выходную строку

5. Он будет запускать весь код внутри этого файла, почему вы хотите. Прогнать их одного за другим ?

Ответ №1:

Насколько я понимаю, SnowflakeOperator в Airflow не возвращает результаты запроса select, он должен использоваться только для выполнения запросов к Snowflake (как и большинство операторов баз данных) и либо завершаться неудачно, либо завершаться успешно.

Для этого вам нужно будет написать свой собственный оператор.