#python #airflow
#python #воздушный поток
Вопрос:
Я использую appache airflow в своем проекте. В этом пользователь может подключить свою базу данных к нашему проекту и скопировать свою таблицу в нашу базу данных.
Итак, я могу установить соединение, используя следующие строки
import json
from airflow.models.connection import Connection
c = Connection(
conn_id='some_conn',
conn_type='mysql',
description='connection description',
host='myhost.com',
login='myname',
schema = 'myschema'
password='mypassword',
extra=json.dumps(dict(this_param='some val', that_param='other val*')),
)
print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.get_uri()}'")
hook = MySqlHook(c.conn_id)
result = hook.get_records(f'SELECT table_name FROM information_schema.tables WHERE table_schema = {c.schema};')
Теперь я могу получить имена таблиц, связанные с подключенной базой данных….
Как скопировать данные из этой подключенной базы данных в нашу базу данных…. Пожалуйста, помогите мне с некоторыми советами по этому поводу
Ответ №1:
Это зависит от того, между какими базами данных вы хотите копировать данные.
Простой подход может быть изложен в виде следующих шагов.
- Извлеките записи из базы данных A.
- Вставьте записи в базу данных B.
Вы должны создать пользовательский оператор, который будет выполнять эти шаги по порядку. Возможно, даже уже созданы операторы, которые выполняют эти функции. Я бы посоветовал вам сначала заглянуть в Airflow Github.
Пожалуйста, обратите внимание, что этот подход не подходит для больших наборов данных, поскольку данные сохраняются в памяти во время выполнения задачи. Вы также можете записывать на диск, но тогда этот маршрут зависит от машины, на которой работает работник Airflow.
Если база данных находится в том же кластере / сервере, тогда будет работать простой SQL-скрипт. Например, HiveOperator будет достаточно для перемещения данных с помощью некоторых INSERT INTO
команд sql.