# #python #google-cloud-dataflow #apache-beam
Вопрос:
У меня есть скрипт на Python для выполнения задания потока данных.
def get_id(): # return a list of id e.g. ['1','2','3'] id_list = get_id() p = beam.Pipeline() id = p | "Input ID" gt;gt; beam.Create(id_list) # run further process with beam
Это прекрасно работает
Но если я захочу изменить его, чтобы выполнить get_id()
задание в пределах луча. Какова может быть функция или способы обхода, которые могут помочь в этом? По сути, инициируйте конвейер луча с помощью функции для создания списка для дальнейшего использования.
Причина, по которой мне нужно запустить это в Beam, заключается в том, что я хочу, чтобы моя консоль входа в поток данных также регистрировала ввод, плюс на случай, если список ввода может вернуть список с нулем. Работа с лучом справится с этим.
Если бы я побежал
id_list = get_id() if not id_list: raise Exception("null list") p = beam.Pipeline() ...
Сначала это не позволит мне развернуть шаблон потока данных
Правка: Я сделал что-то вроде этого
def get_id(_): # add _ input argument # return a list of id e.g. ['1','2','3'] p = beam.Pipeline() p = p | "Init" gt;gt; beam.Create(['']) id = p | "get id" gt;gt; beam.Map(get_id) ...
Это сработало, так как я могу обработать функцию get_id() с помощью луча и задания, показанного в консоли. Но не уверен, правильно ли это или нет?
Комментарии:
1. Так как вы ответили на свой собственный вопрос, вы должны опубликовать его как правильный ответ! Я думаю, что ваше решение абсолютно правильное. Вы можете просто измениться
Create([''])
Create(None)
, но это никоим образом не изменит ваше поведение.