# #google-cloud-platform #airflow #google-cloud-dataflow #apache-beam #google-cloud-composer
Вопрос:
Я не могу запустить конвейер python через airflow BeamRunPythonPipelineOperator
. Ниже приведен мой полный код:
ФАЙЛ DAG
import os
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.providers.google.cloud.operators.dataflow import DataflowConfiguration
from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
default_args = {
"owner": "<...>",
"start_date": days_ago(1),
'dataflow_default_options': {
"project": "<...>",
}
}
dag = DAG(
dag_id="word_count",
default_args=default_args,
schedule_interval="@once"
)
start_python_pipeline_dataflow_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_dataflow_runner",
runner="DataflowRunner",
py_file="gs://<...>/word_count.py",
pipeline_options={
'input':"gs://<...>/kinglear.txt",
'output':"gs://<...>/output.txt",
'temp_location':"gs://<...>/temp/",
'staging_location':"gs://<...>/temp/",
},
py_options=[],
py_requirements=['apache-beam[gcp]==2.26.0'],
py_interpreter='python3',
py_system_site_packages=False,
dataflow_config=DataflowConfiguration(
job_name='{{task.task_id}}', project_id="<...>", location="us-central1"
),
dag=dag,
)
Файл Python (word_count.py )
"""A word-counting workflow."""
# pytype: skip-file
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class WordExtractingDoFn(beam.DoFn):
"""Parse each line of input text into words."""
def process(self, element):
"""Returns an iterator over the words of this element.
The element is a line of text. If the line is blank, note that, too.
Args:
element: the element being processed
Returns:
The processed element.
"""
return re.findall(r'[w'] ', element, re.UNICODE)
def run(argv=None, save_main_session=True):
"""Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default='gs://<...>/kinglear.txt',
help='Input file to process.')
parser.add_argument(
'--output',
dest='output',
default='gs://<...>/output.txt',
help='Output file to write results to.')
argv = [
'--project=<...>',
'--region=us-central1',
'--runner=DataflowRunner',
'--staging_location=gs://<...>/temp/',
'--temp_location=gs://<...>/temp/',
'--template_location=gs://<...>/templates/word_count_template'
]
known_args, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
# The pipeline will be run on exiting the with block.
with beam.Pipeline(argv=argv,options=pipeline_options) as p:
# Read the text file[pattern] into a PCollection.
lines = p | 'Read' >> ReadFromText(known_args.input)
counts = (
lines
| 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
| 'PairWIthOne' >> beam.Map(lambda x: (x, 1))
| 'GroupAndSum' >> beam.CombinePerKey(sum))
# Format the counts into a PCollection of strings.
def format_result(word, count):
return '%s: %d' % (word, count)
output = counts | 'Format' >> beam.MapTuple(format_result)
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | 'Write' >> WriteToText(known_args.output)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Ниже приведен скриншот композитора:
Я не могу видеть задание потока данных в консоли, а также подсчитывать результат в корзине. Может ли кто-нибудь предложить мне правильный подход или какие-либо предложения по этому поводу?
Комментарии:
1. Во-первых, пожалуйста, удалите все PII в вопросе, пожалуйста. Не могли бы вы предоставить мне более подробную информацию о вашей архитектуре? Вы запускаете dag из консоли airflow или консоли GCP composer? Обрабатываете ли вы эти данные в экземпляре потока данных в том же проекте?
2. У меня есть скрипт на python, который содержит код потока данных для чтения файла из gcs и подсчета появления слов в файле, я пытаюсь запустить этот скрипт через airflow BeamRunPythonPipelineOperator, который предполагает создание задания потока данных для моей обработки файлов / данных
Ответ №1:
У вас DAG в порядке, проблема в файле Beam Python, при отправке аргументов потока argv
данных в. Лучший подход extend
pipeline_args
. И задание не отправляется, потому что вы отправляете argv
в beam.Pipeline
.
Ниже приведен исправленный код:
word_count.py :
"""A word-counting workflow."""
# pytype: skip-file
import argparse
import logging
import re
import os
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class WordExtractingDoFn(beam.DoFn):
"""Parse each line of input text into words."""
def process(self, element):
"""Returns an iterator over the words of this element.
The element is a line of text. If the line is blank, note that, too.
Args:
element: the element being processed
Returns:
The processed element.
"""
return re.findall(r'[w'] ', element, re.UNICODE)
def run(argv=None, save_main_session=True):
"""Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='Input file to process.')
parser.add_argument(
'--output',
dest='output',
default='gs://<bucket>/newoutput',
help='Output file to write results to.')
#argv = [
# '--project=<...>',
# '--region=us-central1',
# '--runner=DataflowRunner',
# '--staging_location=gs://<...>/temp/',
# '--temp_location=gs://<...>/temp/',
# '--template_location=gs://<...>/templates/word_count_template'
# ]
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DataflowRunner',
'--project=<project-name>',
'--region=<region>',
'--staging_location=gs://<bucket>/',
'--temp_location=gs://<bucket>/temp',
'--job_name=your-wordcount-job',
])
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
# The pipeline will be run on exiting the with block.
with beam.Pipeline(options=pipeline_options) as p:
lines = p | 'Read' >> ReadFromText(known_args.input)
counts = (
lines
| 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
| 'PairWIthOne' >> beam.Map(lambda x: (x, 1))
| 'GroupAndSum' >> beam.CombinePerKey(sum))
# Format the counts into a PCollection of strings.
def format_result(word, count):
return '%s: %d' % (word, count)
output = counts | 'Format' >> beam.MapTuple(format_result)
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | 'Write' >> WriteToText(known_args.output)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()