Не удается запустить конвейер python через airflow BeamRunPythonPipelineOperator

# #google-cloud-platform #airflow #google-cloud-dataflow #apache-beam #google-cloud-composer

Вопрос:

Я не могу запустить конвейер python через airflow BeamRunPythonPipelineOperator . Ниже приведен мой полный код:

ФАЙЛ DAG

 
    import os
    from datetime import datetime, timedelta
    from airflow.utils.dates import days_ago
    from airflow import DAG
    from airflow.providers.google.cloud.operators.dataflow import DataflowConfiguration
    from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
    from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
    
       
    
    default_args = {
        "owner": "<...>",
        "start_date": days_ago(1),
        'dataflow_default_options': {
        "project": "<...>",
        }
    }
    
    dag = DAG(
        dag_id="word_count",
        default_args=default_args,
        schedule_interval="@once"
    )
    
    start_python_pipeline_dataflow_runner = BeamRunPythonPipelineOperator(
        task_id="start_python_pipeline_dataflow_runner",
        runner="DataflowRunner",
        py_file="gs://<...>/word_count.py",
        pipeline_options={
            'input':"gs://<...>/kinglear.txt",
            'output':"gs://<...>/output.txt",
            'temp_location':"gs://<...>/temp/",
            'staging_location':"gs://<...>/temp/",
        },
        py_options=[],
        py_requirements=['apache-beam[gcp]==2.26.0'],
        py_interpreter='python3',
        py_system_site_packages=False,
        dataflow_config=DataflowConfiguration(
            job_name='{{task.task_id}}', project_id="<...>", location="us-central1"
        ),
        dag=dag,
    )

 

Файл Python (word_count.py )

     """A word-counting workflow."""
    
    # pytype: skip-file
    
    import argparse
    import logging
    import re
    
    import apache_beam as beam
    from apache_beam.io import ReadFromText
    from apache_beam.io import WriteToText
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.options.pipeline_options import SetupOptions
    
    
    class WordExtractingDoFn(beam.DoFn):
      """Parse each line of input text into words."""
      def process(self, element):
        """Returns an iterator over the words of this element.
    
        The element is a line of text.  If the line is blank, note that, too.
    
        Args:
          element: the element being processed
    
        Returns:
          The processed element.
        """
        return re.findall(r'[w'] ', element, re.UNICODE)
    
    
    def run(argv=None, save_main_session=True):
      """Main entry point; defines and runs the wordcount pipeline."""
      parser = argparse.ArgumentParser()
      parser.add_argument(
          '--input',
          dest='input',
          default='gs://<...>/kinglear.txt',
          help='Input file to process.')
      parser.add_argument(
          '--output',
          dest='output',
          default='gs://<...>/output.txt',
          help='Output file to write results to.')
    
      argv = [
        '--project=<...>',
        '--region=us-central1',  
        '--runner=DataflowRunner',
        '--staging_location=gs://<...>/temp/',
        '--temp_location=gs://<...>/temp/',
        '--template_location=gs://<...>/templates/word_count_template'
      ]
      
      known_args, pipeline_args = parser.parse_known_args(argv)
    
      # We use the save_main_session option because one or more DoFn's in this
      # workflow rely on global context (e.g., a module imported at module level).
      pipeline_options = PipelineOptions(pipeline_args)
      pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    
      # The pipeline will be run on exiting the with block.
      with beam.Pipeline(argv=argv,options=pipeline_options) as p:
    
        # Read the text file[pattern] into a PCollection.
        lines = p | 'Read' >> ReadFromText(known_args.input)
    
        counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'PairWIthOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum))
    
        # Format the counts into a PCollection of strings.
        def format_result(word, count):
          return '%s: %d' % (word, count)
    
        output = counts | 'Format' >> beam.MapTuple(format_result)
    
        # Write the output using a "Write" transform that has side effects.
        # pylint: disable=expression-not-assigned
        output | 'Write' >> WriteToText(known_args.output)
    
    
    if __name__ == '__main__':
      logging.getLogger().setLevel(logging.INFO)
      run()
 

Ниже приведен скриншот композитора:

Ниже приведен скриншот композитора

Я не могу видеть задание потока данных в консоли, а также подсчитывать результат в корзине. Может ли кто-нибудь предложить мне правильный подход или какие-либо предложения по этому поводу?

Комментарии:

1. Во-первых, пожалуйста, удалите все PII в вопросе, пожалуйста. Не могли бы вы предоставить мне более подробную информацию о вашей архитектуре? Вы запускаете dag из консоли airflow или консоли GCP composer? Обрабатываете ли вы эти данные в экземпляре потока данных в том же проекте?

2. У меня есть скрипт на python, который содержит код потока данных для чтения файла из gcs и подсчета появления слов в файле, я пытаюсь запустить этот скрипт через airflow BeamRunPythonPipelineOperator, который предполагает создание задания потока данных для моей обработки файлов / данных

Ответ №1:

У вас DAG в порядке, проблема в файле Beam Python, при отправке аргументов потока argv данных в. Лучший подход extend pipeline_args . И задание не отправляется, потому что вы отправляете argv в beam.Pipeline .

Ниже приведен исправленный код:

word_count.py :

 """A word-counting workflow."""

# pytype: skip-file

import argparse
import logging
import re
import os

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


class WordExtractingDoFn(beam.DoFn):
  """Parse each line of input text into words."""
  def process(self, element):
    """Returns an iterator over the words of this element.
    The element is a line of text.  If the line is blank, note that, too.
    Args:
      element: the element being processed
    Returns:
      The processed element.
    """
    return re.findall(r'[w'] ', element, re.UNICODE)


def run(argv=None, save_main_session=True):
  """Main entry point; defines and runs the wordcount pipeline."""
  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--input',
      dest='input',
      default='gs://dataflow-samples/shakespeare/kinglear.txt',
      help='Input file to process.')
  parser.add_argument(
      '--output',
      dest='output',
      default='gs://<bucket>/newoutput',
      help='Output file to write results to.')

  #argv = [
  #      '--project=<...>',
  #      '--region=us-central1',  
  #      '--runner=DataflowRunner',
  #      '--staging_location=gs://<...>/temp/',
  #      '--temp_location=gs://<...>/temp/',
  #      '--template_location=gs://<...>/templates/word_count_template'
  #    ]

  known_args, pipeline_args = parser.parse_known_args(argv)

  pipeline_args.extend([

      '--runner=DataflowRunner',
      '--project=<project-name>',
      '--region=<region>',
      '--staging_location=gs://<bucket>/',
      '--temp_location=gs://<bucket>/temp',
      '--job_name=your-wordcount-job',
  ])

  
  # We use the save_main_session option because one or more DoFn's in this
  # workflow rely on global context (e.g., a module imported at module level).
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

  # The pipeline will be run on exiting the with block.
  with beam.Pipeline(options=pipeline_options) as p:

    lines = p | 'Read' >> ReadFromText(known_args.input)
    
    counts = (
        lines
        | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
        | 'PairWIthOne' >> beam.Map(lambda x: (x, 1))
        | 'GroupAndSum' >> beam.CombinePerKey(sum))
    
        # Format the counts into a PCollection of strings.
    def format_result(word, count):
      return '%s: %d' % (word, count)
    
    output = counts | 'Format' >> beam.MapTuple(format_result)
    
    # Write the output using a "Write" transform that has side effects.
    # pylint: disable=expression-not-assigned
    output | 'Write' >> WriteToText(known_args.output)


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()