#airflow
Вопрос:
Я пытаюсь использовать датчик для каталога с форматом YYYYMMDD, это означает, что каждый день каталог будет меняться.
Например: Сегодня датчик 20211129 проверяет, существует ли каталог с таким именем, если он не создан, это означает, что файлы не поступили. Завтра, предположим, файлы поступят(и будет создан каталог 20211130), и пользователи должны иметь возможность изменить проверку даты на 20211130.
Я пытаюсь сделать это с помощью макросов, но это не работает должным образом. Просто для тестовых эффектов я настроил каждую минуту, чтобы посмотреть, изменилось ли время.
dir_main_name = f"archive/files" templated_log_dir = f"{dir_main_name}/" '{{(execution_date).strftime("%Y-%m-%d-%M")}}' args = { 'start_date': datetime(2021, 11, 29), } with DAG(dag_id='GCS_check', catchup=False, schedule_interval="*/1 * * * * *", default_args=args) as dag: sensor_check = GoogleCloudStorageObjectSensor( task_id='gcs_polling', bucket=BUCKET_NAME, object=f"{dir_main_name}/" '{{ds}}' ) sensor_check
Я тестировал с помощью «ds» и «execution_date», но ни один из них не работает.
Как видно из журнала, время проверок датчиков отличается от времени, которое я указал для имени каталога(просто в качестве примера), имя каталога не меняется:
[**2021-11-29, 16:48:52** UTC] {gcs.py:84} INFO - Sensor checks existence of : dev-datalake-archive-myralis-com, archive/files/**2021-11-29T16:48:47 00:00** [**2021-11-29, 16:48:52** UTC] {credentials_provider.py:295} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook. [**2021-11-29, 16:49:52** UTC] {gcs.py:84} INFO - Sensor checks existence of : dev-datalake-archive-myralis-com, archive/files/**2021-11-29T16:48:47 00:00** [**2021-11-29, 16:49:52** UTC] {credentials_provider.py:295} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook. [**2021-11-29, 16:50:53** UTC] {gcs.py:84} INFO - Sensor checks existence of : dev-datalake-archive-myralis-com, archive/files/**2021-11-29T16:48:47 00:00**
Оставайтесь в том же часе(2021-11-29T16:48:47 00:00)
Есть ли способ, чтобы значение каталога совпадало с датой(и часом) и временем проверки датчиков? как и в последней строке, exec 2021-11-29, 16:50:53, а каталог должен быть 2021-11-29T16:50:53 00:00
Комментарии:
1. 1. Ваше расписание-это каждая секунда , а не минута. 2.
ds
является ли дата выполнения в формате ГГГГ-ММ-ДД, если вы хотите ГГГГМДД, используйтеds_nodash
. 3. это журнал из кода выше? сds
, вы не получите 16:48:47 00:00 часть.2. Привет, этот параметр, который я ввел в интервал, является просто примером для воспроизведения тестов. Проблема в том, что переменные не обновляются., датчик выполняется, но ds и execution_date остаются с той же датой начала. Например, датчик выполнил 16:50:53, а переменная остается на 16:48:47.(последняя строка журнала)
3. 16:50:53-это время ведения журнала и
execution_date
«логическая дата» dag (своего рода запланированное время), поэтому они обычно не совпадают друг с другом. Хотя я не знаю, откуда взялось «16:48:47». У вас есть{{ ds }}
код, иds
у вас не будет времени. Вы выполняете вручную?4. Привет, Эмма, спасибо за помощь! Проблема заключалась в том, что когда я запустил dag, Airflow создал 16 запусков dag(ограничение в файле airflow.cfg), и я не видел обновления, но на самом деле произошло то, что не было создано больше запусков dag. Я установил тайм-аут, и для предыдущих тегов было установлено значение «сбой», и создал новые с обновленными макросами. Это решит мою проблему!