# #python #google-cloud-dataflow #apache-beam
Вопрос:
Я немного занимался программированием на Python, но я не являюсь опытным разработчиком ни при каких обстоятельствах. У нас есть программа etl на Python, которая была настроена как облачная функция, но время ее работы истекает, так как требуется загрузить слишком много данных, и мы хотим переписать ее для работы в потоке данных.
Код в данный момент просто подключается к API, который возвращает JSON с разделителем новой строки, а затем данные загружаются в новую таблицу в BigQuery.
Мы впервые используем поток данных, и мы просто пытаемся разобраться в том, как он работает. Кажется, довольно легко получить данные в BigQuery. Камнем преткновения, с которым мы сталкиваемся, является то, как получить данные из API. Нам неясно, как мы можем заставить это работать, нужно ли нам идти по пути разработки нового разъема ввода-вывода в соответствии с [Разработать разъем ввода-вывода]? Или есть другой вариант, поскольку разработка нового соединителя кажется сложной?
Мы много гуглили, но не нашли ничего очевидного, что могло бы помочь.
Вот пример нашего кода, но мы не на 100% уверены, что он на правильном пути. Код не работает, и мы думаем, что изначально он должен быть a .io.read
, а не a .ParDo
, но мы не совсем уверены, что с этим делать. Некоторые рекомендации были бы очень признательны!
class callAPI(beam.DoFn):
def __init__(self, input_header, input_uri):
self.headers = input_header
self.remote_url = input_uri
def process(self):
try:
res = requests.get(self.remote_url, headers=self.headers)
res.raise_for_status()
except HTTPError as message:
logging.error(message)
return
return res.text
with beam.Pipeline() as p:
data = ( p
| 'Call API ' >> beam.ParDo(callAPI(HEADER, REMOTE_URI))
| beam.Map(print))
Заранее спасибо.
Ответ №1:
Вы на правильном пути, но есть пара вещей, которые нужно исправить.
Как вы указываете, корень конвейера должен быть каким-то считывателем. Операция ParDo обрабатывает набор элементов (в идеале параллельно), но для обработки требуется некоторый ввод. Вы могли бы сделать
p | beam.Create(['a', 'b', 'c']) | beam.ParDo(SomeDoFn())
в котором SomeDoFn
будет передано a
, b
, и c
в его process
метод. Существует специальная p | beam.Impulse()
операция, которая создаст один None
элемент, если нет разумных входных данных, и вы хотите убедиться, что ваш DoFn вызывается только один раз. Вы также можете считывать элементы из файла (или аналогичного). Обратите внимание, что ваш process
метод принимает и self
то, и другое, и обрабатываемый элемент, и возвращает итерационный (чтобы разрешить ноль или более выходов. Существует также beam.Map
и beam.FlatMap
который инкапсулирует более простой шаблон). Так что вы могли бы сделать что-то вроде
class CallAPI(beam.DoFn):
def __init__(self, input_header):
self.headers = input_header
def process(self, input_uri):
try:
res = requests.get(input_uri, headers=self.headers)
res.raise_for_status()
except HTTPError as message:
logging.error(message)
yield res.text
with beam.Pipeline() as p:
data = (
p
| beam.Create([REMOTE_URI])
| 'Call API ' >> beam.ParDo(CallAPI(HEADER))
| beam.Map(print))
что позволит вам считывать данные из более чем одного URI (параллельно) в одном конвейере.
Вы могли бы написать полный соединитель ввода-вывода, если ваш источник таков, что его можно разделить (в идеале динамически), а не читать только в одном огромном запросе.
Комментарии:
1. Большое спасибо, я рад, что мы были на правильном пути, и мы были не так уж далеко. Мне пришлось немного подправить код, так как он не нуждался в этой
self.remote_url
части, но теперь я могу подключиться к API.
Ответ №2:
Можете ли вы поделиться кодом из своей облачной функции? Является ли это запланированной задачей или вызвано событием? Если это запланированная задача, Apache Airflow может быть лучшим вариантом, вы можете использовать операторы Python потока данных и BigQueryОператоры для выполнения того, что вы ищете
- Воздушный поток Apache https://airflow.apache.org/
- Потокооператор данных https://airflow.apache.org/docs/apache-airflow/1.10.6/_api/airflow/contrib/operators/dataflow_operator/index.html#airflow.contrib.operators.dataflow_operator.Потокооператор данных
- BigQueryОператор https://airflow.apache.org/docs/apache-airflow/1.10.14/_api/airflow/contrib/operators/bigquery_operator/index.html
Комментарии:
1. Мы присматриваемся к облачному композитору, к сожалению, в настоящее время затраты немного велики, хотя это определенно было бы полезно. В настоящее время мы используем облачный суб/Паб и облачный планировщик для запуска функции, к сожалению, ее запуск занимает всего 9 минут, поэтому необходимо изучить использование потока данных.