Как получить данные из API с помощью Apache Beam (поток данных)?

# #python #google-cloud-dataflow #apache-beam

Вопрос:

Я немного занимался программированием на Python, но я не являюсь опытным разработчиком ни при каких обстоятельствах. У нас есть программа etl на Python, которая была настроена как облачная функция, но время ее работы истекает, так как требуется загрузить слишком много данных, и мы хотим переписать ее для работы в потоке данных.

Код в данный момент просто подключается к API, который возвращает JSON с разделителем новой строки, а затем данные загружаются в новую таблицу в BigQuery.

Мы впервые используем поток данных, и мы просто пытаемся разобраться в том, как он работает. Кажется, довольно легко получить данные в BigQuery. Камнем преткновения, с которым мы сталкиваемся, является то, как получить данные из API. Нам неясно, как мы можем заставить это работать, нужно ли нам идти по пути разработки нового разъема ввода-вывода в соответствии с [Разработать разъем ввода-вывода]? Или есть другой вариант, поскольку разработка нового соединителя кажется сложной?

Мы много гуглили, но не нашли ничего очевидного, что могло бы помочь.

Вот пример нашего кода, но мы не на 100% уверены, что он на правильном пути. Код не работает, и мы думаем, что изначально он должен быть a .io.read , а не a .ParDo , но мы не совсем уверены, что с этим делать. Некоторые рекомендации были бы очень признательны!

 class callAPI(beam.DoFn):
   def __init__(self, input_header, input_uri):
       self.headers = input_header
       self.remote_url = input_uri

   def process(self):
       try:
           res = requests.get(self.remote_url, headers=self.headers) 
           res.raise_for_status()
       except HTTPError as message:
           logging.error(message)
           return
       return res.text

with beam.Pipeline() as p:

    data = ( p 
                | 'Call  API ' >> beam.ParDo(callAPI(HEADER, REMOTE_URI))
                | beam.Map(print))
 

Заранее спасибо.

Ответ №1:

Вы на правильном пути, но есть пара вещей, которые нужно исправить.

Как вы указываете, корень конвейера должен быть каким-то считывателем. Операция ParDo обрабатывает набор элементов (в идеале параллельно), но для обработки требуется некоторый ввод. Вы могли бы сделать

 p | beam.Create(['a', 'b', 'c']) | beam.ParDo(SomeDoFn())
 

в котором SomeDoFn будет передано a , b , и c в его process метод. Существует специальная p | beam.Impulse() операция, которая создаст один None элемент, если нет разумных входных данных, и вы хотите убедиться, что ваш DoFn вызывается только один раз. Вы также можете считывать элементы из файла (или аналогичного). Обратите внимание, что ваш process метод принимает и self то, и другое, и обрабатываемый элемент, и возвращает итерационный (чтобы разрешить ноль или более выходов. Существует также beam.Map и beam.FlatMap который инкапсулирует более простой шаблон). Так что вы могли бы сделать что-то вроде

 class CallAPI(beam.DoFn):
   def __init__(self, input_header):
       self.headers = input_header

   def process(self, input_uri):
       try:
           res = requests.get(input_uri, headers=self.headers) 
           res.raise_for_status()
       except HTTPError as message:
           logging.error(message)
       yield res.text

with beam.Pipeline() as p:

    data = (
        p
        | beam.Create([REMOTE_URI]) 
        | 'Call API ' >> beam.ParDo(CallAPI(HEADER))
        | beam.Map(print))
 

что позволит вам считывать данные из более чем одного URI (параллельно) в одном конвейере.

Вы могли бы написать полный соединитель ввода-вывода, если ваш источник таков, что его можно разделить (в идеале динамически), а не читать только в одном огромном запросе.

Комментарии:

1. Большое спасибо, я рад, что мы были на правильном пути, и мы были не так уж далеко. Мне пришлось немного подправить код, так как он не нуждался в этой self.remote_url части, но теперь я могу подключиться к API.

Ответ №2:

Можете ли вы поделиться кодом из своей облачной функции? Является ли это запланированной задачей или вызвано событием? Если это запланированная задача, Apache Airflow может быть лучшим вариантом, вы можете использовать операторы Python потока данных и BigQueryОператоры для выполнения того, что вы ищете

Комментарии:

1. Мы присматриваемся к облачному композитору, к сожалению, в настоящее время затраты немного велики, хотя это определенно было бы полезно. В настоящее время мы используем облачный суб/Паб и облачный планировщик для запуска функции, к сожалению, ее запуск занимает всего 9 минут, поэтому необходимо изучить использование потока данных.