#python #amazon-web-services #amazon-s3 #aws-glue #aws-glue-data-catalog
Вопрос:
Я новичок в AWS glue и других материалах AWS. У меня есть требование создать платформу ETL для проекта. Это диаграмма высокого уровня. Я хочу понять, вместо того, чтобы создавать 400 клеевых конвейеров, могу ли я создать шаблон, основанный на справочных данных из postgres aurora/mysql. Я знаком с Python. У кого — нибудь есть какие-нибудь идеи по этому поводу? Любые ссылки, примеры кода.
Комментарии:
1. Ваш вопрос не ясен. Какой шаблон? Что такое справочные данные? Зачем вам понадобилось 400 работ по клею?
2. Шаблон означает общее задание склеивания, которое может выполняться с метаданными(исходные и целевые таблицы справочных данных). На самом деле мне не нужно 400 работ по клею. Я хотел знать, как я могу создать эту работу с клеем в качестве многоразовой работы для всех таблиц oracle.
3. Мы сделали аналогичную реализацию, в которой мы использовали таблицу в Aurora RDS mysql для хранения всех запросов на создание / выбор / вставку для различных исходных данных с соответствующим статусом. У нас есть только одно задание склеивания, и в зависимости от триггера события S3 считывайте имя входящего файла и получайте соответствующие имена запросов и целевых таблиц из таблицы mysql, выполняйте дальнейшую очистку/вставку/обновления в основную таблицу. Сейчас это находится в производстве.
4. @Yuva Отлично, у вас есть какие-нибудь примеры кода?
5. конечно, пожалуйста, проверьте мой ответ, так как его невозможно предоставить здесь в комментариях.
Ответ №1:
- У нас была главная таблица конфигурации в нашей базе данных mysql. Столбцы для удобства у нас было source_table_name в качестве идентификатора для извлечения соответствующих имен столбцов таблицы/запросов для СОЗДАНИЯ ТАБЛИЦЫ STG, ЗАГРУЗКИ ДАННЫХ В ТАБЛИЦУ STG, ВСТАВКИ/ОБНОВЛЕНИЯ В ЦЕЛЕВЫЕ таблицы и т. Д.
- Мы также разделили ВСТАВКУ/ОБНОВЛЕНИЕ на два разных столбца в мастере конфигурации, так как мы использовали ДУБЛИКАТ КЛЮЧА для обновления существующих записей.
- получите имя исходной таблицы, обработав лямбда-события, которые будут иметь имя целевого файла.
- Извлеките все необходимые данные из мастера конфигурации для имени исходной таблицы. Это было бы что-то вроде следующего:
sql_query = "SELECT * FROM {0}.CONFIG_MASTER WHERE src_tbl_name = %s ".format(mydb) cur.execute(sql_query, (source_fname)) result = cur.fetchall() for row in result: stg_table_name = row[1] tgt_table_name = row[2] create_stg_table_qry = row[3] load_data_stg_table_qry = row[4] insert_tgt_table_qry = row[5] insert_tgt_table_qry_part_1 = row[6] insert_tgt_table_qry_part_2 = row[7] conn.commit() cur.close()
Передайте соответствующие параметры универсальным функциям, как показано ниже:
create_stg_table(stg_table_name, create_stg_table_qry, load_data_stg_table_qry)
loaddata(tgt_table_name, insert_tgt_table_qry_part_1, insert_tgt_table_qry_part_2, stg_table_name)
Общие функции будут примерно такими, как показано ниже, это для aurora RDS, пожалуйста, внесите необходимые изменения.
def create_stg_table(stg_table_name, create_stg_table_qry, load_data_stg_table_qry):
cur, conn = connect()
createStgTable1 = "DROP TABLE IF EXISTS {0}.{1}".format(mydb, stg_table_name)
createStgTable2 = "CREATE TABLE {0}.{1} {2}".format(mydb, stg_table_name, create_stg_table_qry)
loadQry = "LOAD DATA FROM S3 PREFIX 's3://' REPLACE INTO TABLE ...".format()
cur.execute(createStgTable1)
cur.execute(createStgTable2)
cur.execute(loadQry)
conn.commit()
conn.close()
def loaddata(tgt_table_name, insert_tgt_table_qry_part_1, insert_tgt_table_qry_part_2, stg_table_name):
cur, conn = connect()
insertQry = "INSERT INTO target table, from the staging table query here"
print(insertQry)
cur.execute(insertQry)
conn.commit()
conn.close()
Надеюсь, это даст представление.
Спасибо