Как запустить apache beam локально?

#python-3.x #pandas #google-cloud-platform #apache-beam

#python-3.x #pandas #google-облачная платформа #apache-beam

Вопрос:

Я пытаюсь запустить скрипт Python apache beam на своем локальном компьютере, чтобы выполнить некоторое моделирование. Я добавил ‘DirectRunner’ в свои параметры. Однако p.run() выдает мне ошибку «TypeError: Receiver() не принимает аргументов»

Есть идеи, почему это может произойти? Я использую Spyder в качестве IDE.

РЕДАКТИРОВАТЬ: Вот пример кода, он принимает список сообщений в виде:

 { "Val_1": 1, "Val_2": 56, "date": "2019-04-01T15:00:04.340778" }
  

разделите его и поместите в виде

 (1, 56, 2019-04-01T15:00:04.340778)
  

затем сохраните его в текстовый файл.

 p = beam.Pipeline('DirectRunner')
(p | 'ReadMessage' >>  beam.io.textio.ReadFromTextWithFilename('input/inputs.json')
                    | 'Processing' >> beam.ParDo(Split())
                    | 'Write' >> beam.io.WriteToText('input/results.txt'))
p.run().wait_until_finish() 
  

Ошибка:

 "TypeError: Receiver() takes no arguments"
  

Комментарии:

1. Вы прошли через быстрый запуск Beam Python… beam.apache.org/get-started/quickstart-py Сработало ли это?

2. можете ли вы опубликовать какой-нибудь пример кода?

3. Я отредактировал свой вопрос, добавив некоторый код, пожалуйста, проверьте его

Ответ №1:

Вам не нужно указывать ‘DirectRunner’ в качестве аргумента, если вы не укажете никакого runner, т. Е. оставите его пустым, по умолчанию выполняется с использованием DirectRunner. Это должно работать нормально.

     p = beam.Pipeline()
    (p | 'ReadMessage' >>  beam.io.textio.ReadFromTextWithFilename('input/inputs.json')
                        | 'Processing' >> beam.ParDo(Split())
                        | 'Write' >> beam.io.WriteToText('input/results.txt'))
    result = p.run()
    result.wait_until_finish()

if __name__ == "__main__":
    run()
  

Ответ №2:

Вы выполняете свой файл Python Beam точно так же, как любой обычный файл, предполагая, что вы указали свой конвейер в качестве DirectRunner, что вы и сделали с

 p = beam.Pipeline('DirectRunner')
  

Apache Beam в настоящее время имеет ограниченную поддержку Python 3.x . Если вы попытаетесь запустить пример подсчета слов, это выдаст ту же ошибку. Это будет исправлено в будущем, поскольку в настоящее время они работают над полной поддержкой Python 3.

Если вы хотите развернуть свой код Python Beam с облачной платформой Google, я настоятельно рекомендую переключиться на Python 2.7.

Вы можете отслеживать проблемы здесь

Однако я не могу сказать, что именно делает ваша функция разделения, поэтому я предоставляю вам минимальный рабочий пример, чтобы вы могли протестировать свою установку Beam.

 import apache_beam as beam
import ast

# The DoFn to perform on each element in the input PCollection.
class Split(beam.DoFn):
    def process(self, element):
        val = ast.literal_eval(element[1])
        output ='(' ','.join(map(str, val.values()))   ')'
        return [output]

def run():
    p = beam.Pipeline('DirectRunner')
    (p | 'ReadMessage' >>  beam.io.textio.ReadFromTextWithFilename('input/inputs.json')
                        | 'Processing' >> beam.ParDo(Split())
                        | 'Write' >> beam.io.WriteToText('input/results.txt'))
    result = p.run()
    result.wait_until_finish()

if __name__ == "__main__":
    run()