#python-3.x #pandas #google-cloud-platform #apache-beam
#python-3.x #pandas #google-облачная платформа #apache-beam
Вопрос:
Я пытаюсь запустить скрипт Python apache beam на своем локальном компьютере, чтобы выполнить некоторое моделирование. Я добавил ‘DirectRunner’ в свои параметры. Однако p.run() выдает мне ошибку «TypeError: Receiver() не принимает аргументов»
Есть идеи, почему это может произойти? Я использую Spyder в качестве IDE.
РЕДАКТИРОВАТЬ: Вот пример кода, он принимает список сообщений в виде:
{ "Val_1": 1, "Val_2": 56, "date": "2019-04-01T15:00:04.340778" }
разделите его и поместите в виде
(1, 56, 2019-04-01T15:00:04.340778)
затем сохраните его в текстовый файл.
p = beam.Pipeline('DirectRunner')
(p | 'ReadMessage' >> beam.io.textio.ReadFromTextWithFilename('input/inputs.json')
| 'Processing' >> beam.ParDo(Split())
| 'Write' >> beam.io.WriteToText('input/results.txt'))
p.run().wait_until_finish()
Ошибка:
"TypeError: Receiver() takes no arguments"
Комментарии:
1. Вы прошли через быстрый запуск Beam Python… beam.apache.org/get-started/quickstart-py Сработало ли это?
2. можете ли вы опубликовать какой-нибудь пример кода?
3. Я отредактировал свой вопрос, добавив некоторый код, пожалуйста, проверьте его
Ответ №1:
Вам не нужно указывать ‘DirectRunner’ в качестве аргумента, если вы не укажете никакого runner, т. Е. оставите его пустым, по умолчанию выполняется с использованием DirectRunner. Это должно работать нормально.
p = beam.Pipeline()
(p | 'ReadMessage' >> beam.io.textio.ReadFromTextWithFilename('input/inputs.json')
| 'Processing' >> beam.ParDo(Split())
| 'Write' >> beam.io.WriteToText('input/results.txt'))
result = p.run()
result.wait_until_finish()
if __name__ == "__main__":
run()
Ответ №2:
Вы выполняете свой файл Python Beam точно так же, как любой обычный файл, предполагая, что вы указали свой конвейер в качестве DirectRunner, что вы и сделали с
p = beam.Pipeline('DirectRunner')
Apache Beam в настоящее время имеет ограниченную поддержку Python 3.x . Если вы попытаетесь запустить пример подсчета слов, это выдаст ту же ошибку. Это будет исправлено в будущем, поскольку в настоящее время они работают над полной поддержкой Python 3.
Если вы хотите развернуть свой код Python Beam с облачной платформой Google, я настоятельно рекомендую переключиться на Python 2.7.
Вы можете отслеживать проблемы здесь
Однако я не могу сказать, что именно делает ваша функция разделения, поэтому я предоставляю вам минимальный рабочий пример, чтобы вы могли протестировать свою установку Beam.
import apache_beam as beam
import ast
# The DoFn to perform on each element in the input PCollection.
class Split(beam.DoFn):
def process(self, element):
val = ast.literal_eval(element[1])
output ='(' ','.join(map(str, val.values())) ')'
return [output]
def run():
p = beam.Pipeline('DirectRunner')
(p | 'ReadMessage' >> beam.io.textio.ReadFromTextWithFilename('input/inputs.json')
| 'Processing' >> beam.ParDo(Split())
| 'Write' >> beam.io.WriteToText('input/results.txt'))
result = p.run()
result.wait_until_finish()
if __name__ == "__main__":
run()