#amazon-web-services #stored-procedures #continuous-integration #amazon-redshift #aws-cli
Вопрос:
У нас есть ряд хранимых процедур, которые были встроены в Redshift на AWS. Нам нужно загрузить и загрузить эти хранимые процедуры, чтобы их можно было хранить на GITHUB в качестве средства отслеживания изменений.
Эти процедуры в конечном итоге должны быть частью шаблона cloudformation, чтобы также можно было поддерживать инфраструктуру.
В идеале это можно было бы сделать с помощью интерфейса командной строки AWS, но, похоже, для этого нет команды.
Как осуществляется управление хранением AWS RedShift в среде автоматизации/ CICD?
Ответ №1:
У меня есть часть рабочего решения.
import json import psycopg2 import os def run_sql_commands(input_command, database="mydatabasename", host_port=1234 ,host_url="datawarehouse.redshift.amazonaws.com"): """ :param input_command: sql string to execute :param database: which database to run the query :return: """ results = None # db_user and db_pass will need to be set as environment variables db_user = os.environ["db_user"] db_pass = os.environ["db_pass"] db_host = host_url db_port = host_port db_name = database try: conn = psycopg2.connect( "dbname={} port={} user={} host={} password={}".format(db_name, db_port, db_user, db_host, db_pass)) cursor = conn.cursor() cursor.execute(input_command) results = cursor.fetchall() except Exception as e: return None return results def get_arg_type(oid, database): sql = f"SELECT typname FROM pg_catalog.pg_type WHERE oid={oid}" r = run_sql_commands(sql, database) return r[0][0] def download_all_procedures(database, file_location="./local-code-store"): get_all_stored_procedure_sql = """ SELECT n.nspname, b.usename, p.proname, p.proargnames, p.proargtypes, p.prosrc FROM pg_catalog.pg_namespace n JOIN pg_catalog.pg_proc p ON pronamespace = n.oid JOIN pg_user b ON b.usesysid = p.proowner WHERE nspname NOT IN ('information_schema', 'pg_catalog'); """ r = run_sql_commands(get_all_stored_procedure_sql, database) counted_items=[] for item in r: table = item[0] author = item[1] procedure_name = item[2] procedure_arguments = item[3] procedure_argtypes = item[4] procedure = item[5] t_list = [] for this_oid in procedure_argtypes.split(): t = get_arg_type(this_oid, database="mydatabasename") t_list.append(t) meta_data = {'table': table, 'author': author, 'arguments': procedure_arguments, 'argument_types': t_list} filename = f'{file_location}/{database}/Schemas/{table}/{procedure_name}.sql' os.makedirs(os.path.dirname(filename), exist_ok=True) f = open(filename, 'w') count = f.write(procedure.replace('rn', 'n')) f.close() filename = f'{file_location}/{database}/Schemas/{table}/{procedure_name}.json' os.makedirs(os.path.dirname(filename), exist_ok=True) f = open(filename, 'w') counted_items.append(f.write(json.dumps(meta_data))) f.close() return counted_items if __name__ == "__main__": print("Starting...") c = download_all_procedures("mydatabase") print("... finished")