#google-cloud-platform #apache-beam #google-apis-explorer
# #google-cloud-platform #apache-beam #google-api-explorer
Вопрос:
Я хочу запустить поток данных с помощью облачной функции в облачной платформе Google.
В облачной функции я вызываю API следующим образом с помощью Python:
from googleapiclient.discovery import build
service = build('dataflow', 'v1b3', cache_discovery=False)
request = service.projects().locations().templates().launch(
projectId=projectId, gcsPath=srcDataFlowTemplate, location='us', body={
'jobName': jobName,
'parameters': parameters,
'environment':environment
}
)
response = request.execute()
и это канал в шаблоне потока данных, который я запускаю:
(p
| 'Create PCollection' >> beam.Create(inputFile)
| 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1)
| 'Write to BigQuery' >> beam.io.Write(beam.io.WriteToBigQuery (table,
project=projectId,
dataset=dataset,
schema=schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
)
)
В шаблоне потока данных — возможно ли извлечь этот «входной файл», который хранится в теле запроса в разделе parameters property и ProjectID из вызова API?
Комментарии:
1. Можете ли вы указать нужную переменную? и почему? Я могу легко получить некоторые, другие сложнее, и я хочу быть уверенным, прежде чем копаться!
2. Я обновил свой вопрос. Я новичок в этой среде, поэтому не знаю, возможно ли это. Я в основном хочу извлечь этот {ProjectID} в шаблоне потока данных после того, как я сделаю этот вызов: https: // dataflow.googleapis.com/v1b3/projects /{ProjectID}/шаблоны:запуск
Ответ №1:
Когда вы запускаете задание для потока данных с помощью Beam framework, вы используете DataflowRunner. Когда вы создаете конвейер, вы создаете его с помощью объекта «параметры».
p = beam.Pipeline(options=options)
Этот объект содержит базовые параметры, такие как ProjectID, и вы можете расширить его, чтобы добавить свои собственные параметры конвейера.