Я не могу использовать обновления ds и execution_date в Airflow

#airflow

Вопрос:

Я пытаюсь использовать датчик для каталога с форматом YYYYMMDD, это означает, что каждый день каталог будет меняться.

Например: Сегодня датчик 20211129 проверяет, существует ли каталог с таким именем, если он не создан, это означает, что файлы не поступили. Завтра, предположим, файлы поступят(и будет создан каталог 20211130), и пользователи должны иметь возможность изменить проверку даты на 20211130.

Я пытаюсь сделать это с помощью макросов, но это не работает должным образом. Просто для тестовых эффектов я настроил каждую минуту, чтобы посмотреть, изменилось ли время.

 dir_main_name = f"archive/files" templated_log_dir = f"{dir_main_name}/" '{{(execution_date).strftime("%Y-%m-%d-%M")}}' args = {  'start_date': datetime(2021, 11, 29), }   with DAG(dag_id='GCS_check', catchup=False, schedule_interval="*/1 * * * * *", default_args=args) as dag:   sensor_check = GoogleCloudStorageObjectSensor(  task_id='gcs_polling',  bucket=BUCKET_NAME,  object=f"{dir_main_name}/" '{{ds}}'   )   sensor_check   

Я тестировал с помощью «ds» и «execution_date», но ни один из них не работает.

Как видно из журнала, время проверок датчиков отличается от времени, которое я указал для имени каталога(просто в качестве примера), имя каталога не меняется:

 [**2021-11-29, 16:48:52** UTC] {gcs.py:84} INFO - Sensor checks existence of : dev-datalake-archive-myralis-com, archive/files/**2021-11-29T16:48:47 00:00** [**2021-11-29, 16:48:52** UTC] {credentials_provider.py:295} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook. [**2021-11-29, 16:49:52** UTC] {gcs.py:84} INFO - Sensor checks existence of : dev-datalake-archive-myralis-com, archive/files/**2021-11-29T16:48:47 00:00** [**2021-11-29, 16:49:52** UTC] {credentials_provider.py:295} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook. [**2021-11-29, 16:50:53** UTC] {gcs.py:84} INFO - Sensor checks existence of : dev-datalake-archive-myralis-com, archive/files/**2021-11-29T16:48:47 00:00**  

Оставайтесь в том же часе(2021-11-29T16:48:47 00:00)

Есть ли способ, чтобы значение каталога совпадало с датой(и часом) и временем проверки датчиков? как и в последней строке, exec 2021-11-29, 16:50:53, а каталог должен быть 2021-11-29T16:50:53 00:00

Комментарии:

1. 1. Ваше расписание-это каждая секунда , а не минута. 2. ds является ли дата выполнения в формате ГГГГ-ММ-ДД, если вы хотите ГГГГМДД, используйте ds_nodash . 3. это журнал из кода выше? с ds , вы не получите 16:48:47 00:00 часть.

2. Привет, этот параметр, который я ввел в интервал, является просто примером для воспроизведения тестов. Проблема в том, что переменные не обновляются., датчик выполняется, но ds и execution_date остаются с той же датой начала. Например, датчик выполнил 16:50:53, а переменная остается на 16:48:47.(последняя строка журнала)

3. 16:50:53-это время ведения журнала и execution_date «логическая дата» dag (своего рода запланированное время), поэтому они обычно не совпадают друг с другом. Хотя я не знаю, откуда взялось «16:48:47». У вас есть {{ ds }} код, и ds у вас не будет времени. Вы выполняете вручную?

4. Привет, Эмма, спасибо за помощь! Проблема заключалась в том, что когда я запустил dag, Airflow создал 16 запусков dag(ограничение в файле airflow.cfg), и я не видел обновления, но на самом деле произошло то, что не было создано больше запусков dag. Я установил тайм-аут, и для предыдущих тегов было установлено значение «сбой», и создал новые с обновленными макросами. Это решит мою проблему!