Как извлечь данные из базы данных Oracle с помощью AWS Glue и других сервисов AWS

#python #amazon-web-services #amazon-s3 #aws-glue #aws-glue-data-catalog

Вопрос:

Я новичок в AWS glue и других материалах AWS. У меня есть требование создать платформу ETL для проекта. Это диаграмма высокого уровня. Я хочу понять, вместо того, чтобы создавать 400 клеевых конвейеров, могу ли я создать шаблон, основанный на справочных данных из postgres aurora/mysql. Я знаком с Python. У кого — нибудь есть какие-нибудь идеи по этому поводу? Любые ссылки, примеры кода.

введите описание изображения здесь

Комментарии:

1. Ваш вопрос не ясен. Какой шаблон? Что такое справочные данные? Зачем вам понадобилось 400 работ по клею?

2. Шаблон означает общее задание склеивания, которое может выполняться с метаданными(исходные и целевые таблицы справочных данных). На самом деле мне не нужно 400 работ по клею. Я хотел знать, как я могу создать эту работу с клеем в качестве многоразовой работы для всех таблиц oracle.

3. Мы сделали аналогичную реализацию, в которой мы использовали таблицу в Aurora RDS mysql для хранения всех запросов на создание / выбор / вставку для различных исходных данных с соответствующим статусом. У нас есть только одно задание склеивания, и в зависимости от триггера события S3 считывайте имя входящего файла и получайте соответствующие имена запросов и целевых таблиц из таблицы mysql, выполняйте дальнейшую очистку/вставку/обновления в основную таблицу. Сейчас это находится в производстве.

4. @Yuva Отлично, у вас есть какие-нибудь примеры кода?

5. конечно, пожалуйста, проверьте мой ответ, так как его невозможно предоставить здесь в комментариях.

Ответ №1:

  1. У нас была главная таблица конфигурации в нашей базе данных mysql. Столбцы для удобства у нас было source_table_name в качестве идентификатора для извлечения соответствующих имен столбцов таблицы/запросов для СОЗДАНИЯ ТАБЛИЦЫ STG, ЗАГРУЗКИ ДАННЫХ В ТАБЛИЦУ STG, ВСТАВКИ/ОБНОВЛЕНИЯ В ЦЕЛЕВЫЕ таблицы и т. Д.
  2. Мы также разделили ВСТАВКУ/ОБНОВЛЕНИЕ на два разных столбца в мастере конфигурации, так как мы использовали ДУБЛИКАТ КЛЮЧА для обновления существующих записей.
  3. получите имя исходной таблицы, обработав лямбда-события, которые будут иметь имя целевого файла.
  4. Извлеките все необходимые данные из мастера конфигурации для имени исходной таблицы. Это было бы что-то вроде следующего:
 sql_query = "SELECT * FROM {0}.CONFIG_MASTER WHERE src_tbl_name = %s ".format(mydb)
cur.execute(sql_query, (source_fname))
result = cur.fetchall()
for row in result:
stg_table_name = row[1]
tgt_table_name = row[2]
create_stg_table_qry = row[3]
load_data_stg_table_qry = row[4]
insert_tgt_table_qry = row[5]
insert_tgt_table_qry_part_1 = row[6]
insert_tgt_table_qry_part_2 = row[7]
conn.commit()
cur.close()
 

Передайте соответствующие параметры универсальным функциям, как показано ниже:

 create_stg_table(stg_table_name, create_stg_table_qry, load_data_stg_table_qry)
loaddata(tgt_table_name, insert_tgt_table_qry_part_1, insert_tgt_table_qry_part_2, stg_table_name)
 

Общие функции будут примерно такими, как показано ниже, это для aurora RDS, пожалуйста, внесите необходимые изменения.

 def create_stg_table(stg_table_name, create_stg_table_qry, load_data_stg_table_qry):
cur, conn = connect()
createStgTable1 = "DROP TABLE IF EXISTS {0}.{1}".format(mydb, stg_table_name)
createStgTable2 = "CREATE TABLE {0}.{1} {2}".format(mydb, stg_table_name, create_stg_table_qry)
loadQry = "LOAD DATA FROM S3 PREFIX 's3://' REPLACE INTO TABLE ...".format()
cur.execute(createStgTable1)
cur.execute(createStgTable2)
cur.execute(loadQry)
conn.commit()
conn.close()

def loaddata(tgt_table_name, insert_tgt_table_qry_part_1, insert_tgt_table_qry_part_2, stg_table_name):
cur, conn = connect()
insertQry = "INSERT INTO target table, from the staging table query here"
print(insertQry)
cur.execute(insertQry)
conn.commit()
conn.close()
 

Надеюсь, это даст представление.

Спасибо