Параллельное задание подзаголовка потока данных, организованное главной группой Dag Airflow, не заполняет таблицу SQL Server

# #airflow #google-cloud-dataflow #google-cloud-composer

Вопрос:

У меня есть главная группа Dag, которая организует набор заданий на прием потока данных API и задание на загрузку данных из gcs в sql server ( 7 api и 7 sql server). Мастер Dag просто запускает вспомогательные dag и имеет датчик Dags, который указывает на успешный запуск dag. Если это не удастся, следующий подзаголовок в строке не выполняется.

 Dagsensor(  dag=dag,  task_id='status_check_abcd',  external_dag_id='ABCD_DF',  poke interval=30,  timeout=4000,  task1_pipeline gt;gt; [trigger_dag_Abcd, status_check_abcd]  

Проблема заключается в том, что когда главная группа Dag запускает подзаголовок потока данных API, он выполняется нормально и создает файл csv в корзине GCS. Далее для следующего набора из 7 вложенных тегов задача состоит в том, чтобы извлечь файл и загрузить его в sql server. Задача для 7 заданий sql server потока данных задается параллельно в Composer.

Главная группа Dag успешно выполняет весь sql-сервер и подтверждается в потоке данных . Но иногда из 7 csv-файлов загружается только 2. Если я выполню задачу вручную для загрузки задания sql server, все будет в порядке.

Не уверен, почему задание потока данных api sql server выполняется параллельно в главной группе Dag.

Является ли причиной датчик Dag ?

Спасибо

Комментарии:

1. Здравствуйте, у вас есть доступ к пользовательскому интерфейсу airflow? вы пробовали оставлять отладочные сообщения о количестве загружаемых записей? Можете ли вы подробно описать свой код? мне кажется,что task1_pipeline-это ваша основная деятельность, и оба триггера, статус-это ваши дети… или это всего лишь один dag??. Кроме того, на стороне потока данных, когда вы сказали «подтверждено», отображается количество записей, которые вы передаете?. Что касается вашего ручного процесса, можете ли вы подробно описать шаги по вашему вопросу?

2. Привет, задание потока данных показывает, что файл csv сгенерирован, и я мог бы подтвердить это сообщением о состоянии json, сгенерированным кодом . Я использую Мастер-Dag для запуска вложенных тегов, которые являются отдельными заданиями потока данных. Локально я передаю те же параметры потока данных в код и выполняю его с помощью локального бегуна, который записывает данные в таблицу sql. Как уже упоминалось, когда я запускаю задание индивидуально из пользовательского интерфейса, оно успешно выполняется.

3. Тогда это означает, что проблема не в задании, а в параметрах, которые отправляются из вашей группы dag на задание. Возможно, вы отправляете одни и те же значения (т. е. dag1-gt; файл1.csv, dag2-gt;gt;файл1.csv, dag3-gt;gt;gt;файл1.csv) в задание потока данных отдельных лиц, и поэтому вы видите одни и те же значения только при загрузке. Я хотел бы знать больше деталей, чтобы воспроизвести вашу проблему на моей стороне, например, код, который вы используете для загрузки в поток данных. полная работа dag, если это возможно.

4. Привет @recyclinguy. Есть какие-нибудь новости?