Как управлять хранимой процедурой в AWS Redshift с помощью средства автоматизации или командной строки?

#amazon-web-services #stored-procedures #continuous-integration #amazon-redshift #aws-cli

Вопрос:

У нас есть ряд хранимых процедур, которые были встроены в Redshift на AWS. Нам нужно загрузить и загрузить эти хранимые процедуры, чтобы их можно было хранить на GITHUB в качестве средства отслеживания изменений.

Эти процедуры в конечном итоге должны быть частью шаблона cloudformation, чтобы также можно было поддерживать инфраструктуру.

В идеале это можно было бы сделать с помощью интерфейса командной строки AWS, но, похоже, для этого нет команды.

Как осуществляется управление хранением AWS RedShift в среде автоматизации/ CICD?

Ответ №1:

У меня есть часть рабочего решения.

 import json import psycopg2 import os   def run_sql_commands(input_command, database="mydatabasename", host_port=1234 ,host_url="datawarehouse.redshift.amazonaws.com"):  """  :param input_command: sql string to execute  :param database: which database to run the query  :return:  """  results = None  # db_user and db_pass will need to be set as environment variables  db_user = os.environ["db_user"]  db_pass = os.environ["db_pass"]   db_host = host_url  db_port = host_port  db_name = database   try:  conn = psycopg2.connect(  "dbname={} port={} user={} host={} password={}".format(db_name, db_port, db_user, db_host, db_pass))   cursor = conn.cursor()  cursor.execute(input_command)  results = cursor.fetchall()   except Exception as e:  return None   return results   def get_arg_type(oid, database):  sql = f"SELECT typname FROM pg_catalog.pg_type WHERE oid={oid}"  r = run_sql_commands(sql, database)  return r[0][0]   def download_all_procedures(database, file_location="./local-code-store"):  get_all_stored_procedure_sql = """  SELECT  n.nspname,  b.usename,  p.proname,  p.proargnames,  p.proargtypes,  p.prosrc  FROM  pg_catalog.pg_namespace n  JOIN pg_catalog.pg_proc p ON  pronamespace = n.oid  JOIN pg_user b ON  b.usesysid = p.proowner  WHERE  nspname NOT IN ('information_schema', 'pg_catalog');  """   r = run_sql_commands(get_all_stored_procedure_sql, database)  counted_items=[]  for item in r:  table = item[0]  author = item[1]  procedure_name = item[2]  procedure_arguments = item[3]  procedure_argtypes = item[4]  procedure = item[5]  t_list = []  for this_oid in procedure_argtypes.split():  t = get_arg_type(this_oid, database="mydatabasename")  t_list.append(t)  meta_data = {'table': table,  'author': author,  'arguments': procedure_arguments,  'argument_types': t_list}  filename = f'{file_location}/{database}/Schemas/{table}/{procedure_name}.sql'  os.makedirs(os.path.dirname(filename), exist_ok=True)  f = open(filename, 'w')  count = f.write(procedure.replace('rn', 'n'))  f.close()  filename = f'{file_location}/{database}/Schemas/{table}/{procedure_name}.json'  os.makedirs(os.path.dirname(filename), exist_ok=True)  f = open(filename, 'w')  counted_items.append(f.write(json.dumps(meta_data)))  f.close()   return counted_items   if __name__ == "__main__":  print("Starting...")  c = download_all_procedures("mydatabase")  print("... finished")