Как скопировать данные в airflow

#python #airflow

#python #воздушный поток

Вопрос:

Я использую appache airflow в своем проекте. В этом пользователь может подключить свою базу данных к нашему проекту и скопировать свою таблицу в нашу базу данных.

Итак, я могу установить соединение, используя следующие строки

     import json 
    
    from airflow.models.connection import Connection 
    
    c = Connection(
         conn_id='some_conn',
         conn_type='mysql',
         description='connection description',
         host='myhost.com',
         login='myname',
         schema = 'myschema'
         password='mypassword',
         extra=json.dumps(dict(this_param='some val', that_param='other val*')),
     )
    print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.get_uri()}'")
    
    hook = MySqlHook(c.conn_id)

   result = hook.get_records(f'SELECT table_name FROM information_schema.tables WHERE table_schema = {c.schema};')
 

Теперь я могу получить имена таблиц, связанные с подключенной базой данных….

Как скопировать данные из этой подключенной базы данных в нашу базу данных…. Пожалуйста, помогите мне с некоторыми советами по этому поводу

Ответ №1:

Это зависит от того, между какими базами данных вы хотите копировать данные.

Простой подход может быть изложен в виде следующих шагов.

  1. Извлеките записи из базы данных A.
  2. Вставьте записи в базу данных B.

Вы должны создать пользовательский оператор, который будет выполнять эти шаги по порядку. Возможно, даже уже созданы операторы, которые выполняют эти функции. Я бы посоветовал вам сначала заглянуть в Airflow Github.

Пожалуйста, обратите внимание, что этот подход не подходит для больших наборов данных, поскольку данные сохраняются в памяти во время выполнения задачи. Вы также можете записывать на диск, но тогда этот маршрут зависит от машины, на которой работает работник Airflow.

Если база данных находится в том же кластере / сервере, тогда будет работать простой SQL-скрипт. Например, HiveOperator будет достаточно для перемещения данных с помощью некоторых INSERT INTO команд sql.