Как передать параметры из Google composer в шаблон потока данных

#airflow #dataflow #google-cloud-composer

#воздушный поток #поток данных #google-cloud-composer

Вопрос:

Я пытаюсь передать параметр из Google Composer в шаблон потока данных следующим образом, но это не работает.

 # composer code
trigger_dataflow = DataflowTemplateOperator(
     task_id="trigger_dataflow",
     template="gs://mybucket/my_template",
     dag=dag,
     job_name='appsflyer_events_daily',
     parameters={
         "input": f'gs://my_bucket/'   "{{ ds }}"   "/*.gz"
     }
)

# template code
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
    parser.add_value_provider_argument(
        '--input',
        default='gs://my_bucket/*.gz',
        help='path of input file')

def main():
    pipeline_options = PipelineOptions()
    user_options = pipeline_options.view_as(UserOptions) 
    p = beam.Pipeline(options=pipeline_options)
    lines = (
        p
        | MatchFiles(user_options.input)
    )
 

Комментарии:

1. Какую версию Airflow вы используете?

2. @vdolez Я использую версию: 1.10.10 composer

3. Кроме того, вы можете предоставить некоторые журналы ошибок? Возможно, шаблон был создан неправильно, или, возможно, Composer не хватает прав доступа.

4. Не так много журналов @vdolez. Конвейер просто не соответствует никаким файлам и завершается без каких-либо ошибок. Если я жестко закодирую шаблон файла в шаблон, все будет работать нормально.

5. Можете ли вы запустить шаблон без Composer? Например, из пользовательского интерфейса? Вы выполнили шаги, описанные в документации ? Загруженный файл метаданных и т. Д

Ответ №1:

Вы можете передать, как показано ниже.

 DataflowTemplateOperator(,
         task_id="task1",
         template=get_variable_value("template"),
         on_failure_callback=update_job_message,
         parameters={
             "fileBucket": get_variable_value("file_bucket"),
             "basePath": get_variable_value("path_input"),         
             "Day": "{{ json.loads(ti.xcom_pull(key=run_id))['day'] }}",
         },
    )
 

Мы используем Java, и в заданиях потока данных у нас есть класс опций get и set, как показано ниже

 public interface MyOptions extends CommonOptions {
 
    @Description("The output bucket")
    @Validation.Required
    ValueProvider<String> getFileBucket();
    void setFileBucket(ValueProvider<String> value);
    
}
 

Нам нужно создать шаблон для этого задания потока данных, и этот шаблон будет запущен с помощью базы данных composer.

Ответ №2:

Переход от классического шаблона потока данных к гибкому шаблону исправил проблему.