#python #google-cloud-platform #google-cloud-dataflow #apache-beam
#python #google-cloud-platform #google-cloud-поток данных #apache-beam
Вопрос:
Я создаю конвейер с данными динамической конфигурации, которые обновляются при каждом запуске.
Существует 5 разделов PubSub, 1 раздел посвящен данным IoT, остальные 4 — конфигурациям, которые будут использоваться для преобразования данных IoT.
Конфигурация хранится в облачных документах Firestore. Когда каждый документ обновляется, облачная функция считывает и отправляет обновленную конфигурацию 1 из 4 подписок PubSub.
Когда конфигурация обновляется, новые входные данные на стороне смешиваются и на самом деле не заменяются новыми входными данными на стороне.И я ждал не менее 30 минут, старый боковой ввод все еще присутствует. Я использую Dataflow Runner V2
p = beam.Pipeline(options=options)
class Transform(beam.DoFn):
def process(self, configuration):
...
yield output
def run():
...
iot_data = (p
| 'ReadIotData' >> ReadFromPubSub(SUBSCRIPTION_IOT)
config_1 = (p
| 'ReadConfig' >> ReadFromPubSub(SUBSCRIPTION_1)
| 'WindowUserData' >> beam.WindowInto(
window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterCount(1)),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| 'LoadsUserData' >> beam.Map(lambda x: ('data', x.decode().replace('\','')))
config_2 = # same as config_1 with different PubSub subscription
config_3 = # same as config_1 with different PubSub subscription
config_4 = # same as config_1 with different PubSub subscription
output = (iot_data
| 'transform' >> beam.ParDo(Transform(),
pvalue.AsDict(config_1),
pvalue.AsDict(config_2),
pvalue.AsDict(config_3),
pvalue.AsDict(config_4))
| 'Output' >> WriteToPubSub(TOPIC_C)
Обновить
Добавление дополнительных рабочих элементов помогает повысить производительность бокового ввода. Это становится немного быстрее пропорционально добавленным рабочим. Но все еще не там, где я хочу.
Комментарии:
1. Давайте посмотрим, правильно ли я понимаю: выходные данные для config_1, например, содержат старые данные и не обновляются при отправке нового сообщения в SUBSCRIPTION_1?
2. @R.Esteves Когда на SUBSCRIPTION_1 отправляется новое сообщение, количество элементов, добавленных на диаграмме, увеличивается, поэтому я думаю, что оно входит. Но ParDo просто не захватывает и не использует новый config_1 мгновенно, он все еще обрабатывает старые данные config_1. Вывод в TOPIC_C содержит смесь старых и новых данных config_1.
3. Есть ли у вас какая-то конкретная причина использовать окно в конвейерах config_x? Кажется, что вы не выполняете никакой агрегации
4. Я почерпнул идею из другой темы stackoverflow о медленном изменении кэша поиска . Я также пробовал запускать конвейер без управления окнами, но данные обрабатываются не все вместе.