Создайте список входных данных, которые будут обрабатываться потоком данных вместо луча.Создать(список)

# #python #google-cloud-dataflow #apache-beam

Вопрос:

У меня есть скрипт на Python для выполнения задания потока данных.

 def get_id():  # return a list of id e.g. ['1','2','3']  id_list = get_id()  p = beam.Pipeline() id = p | "Input ID" gt;gt; beam.Create(id_list) # run further process with beam  

Это прекрасно работает

Но если я захочу изменить его, чтобы выполнить get_id() задание в пределах луча. Какова может быть функция или способы обхода, которые могут помочь в этом? По сути, инициируйте конвейер луча с помощью функции для создания списка для дальнейшего использования.

Причина, по которой мне нужно запустить это в Beam, заключается в том, что я хочу, чтобы моя консоль входа в поток данных также регистрировала ввод, плюс на случай, если список ввода может вернуть список с нулем. Работа с лучом справится с этим.

Если бы я побежал

 id_list = get_id() if not id_list:  raise Exception("null list")  p = beam.Pipeline() ...  

Сначала это не позволит мне развернуть шаблон потока данных

Правка: Я сделал что-то вроде этого

 def get_id(_): # add _ input argument  # return a list of id e.g. ['1','2','3']   p = beam.Pipeline() p = p | "Init" gt;gt; beam.Create(['']) id = p | "get id" gt;gt; beam.Map(get_id) ...  

Это сработало, так как я могу обработать функцию get_id() с помощью луча и задания, показанного в консоли. Но не уверен, правильно ли это или нет?

Комментарии:

1. Так как вы ответили на свой собственный вопрос, вы должны опубликовать его как правильный ответ! Я думаю, что ваше решение абсолютно правильное. Вы можете просто измениться Create(['']) Create(None) , но это никоим образом не изменит ваше поведение.