Входные данные на стороне обновляются очень поздно — Python Apache Beam

#python #google-cloud-platform #google-cloud-dataflow #apache-beam

#python #google-cloud-platform #google-cloud-поток данных #apache-beam

Вопрос:

Я создаю конвейер с данными динамической конфигурации, которые обновляются при каждом запуске.

Существует 5 разделов PubSub, 1 раздел посвящен данным IoT, остальные 4 — конфигурациям, которые будут использоваться для преобразования данных IoT.

Конфигурация хранится в облачных документах Firestore. Когда каждый документ обновляется, облачная функция считывает и отправляет обновленную конфигурацию 1 из 4 подписок PubSub.

Когда конфигурация обновляется, новые входные данные на стороне смешиваются и на самом деле не заменяются новыми входными данными на стороне.И я ждал не менее 30 минут, старый боковой ввод все еще присутствует. Я использую Dataflow Runner V2

 p = beam.Pipeline(options=options)

class Transform(beam.DoFn):
    def process(self, configuration):
        ...
        yield output

def run():
    ...
    iot_data = (p
        | 'ReadIotData' >> ReadFromPubSub(SUBSCRIPTION_IOT)

    config_1 = (p
        | 'ReadConfig' >> ReadFromPubSub(SUBSCRIPTION_1)
        | 'WindowUserData' >> beam.WindowInto(
            window.GlobalWindows(),
            trigger=trigger.Repeatedly(trigger.AfterCount(1)),
            accumulation_mode=trigger.AccumulationMode.DISCARDING)
        | 'LoadsUserData' >> beam.Map(lambda x: ('data', x.decode().replace('\','')))

    config_2 = # same as config_1 with different PubSub subscription
    config_3 = # same as config_1 with different PubSub subscription
    config_4 = # same as config_1 with different PubSub subscription

    output = (iot_data
        | 'transform' >> beam.ParDo(Transform(),
            pvalue.AsDict(config_1),
            pvalue.AsDict(config_2),
            pvalue.AsDict(config_3),
            pvalue.AsDict(config_4))
        | 'Output' >> WriteToPubSub(TOPIC_C)
  

Обновить

Добавление дополнительных рабочих элементов помогает повысить производительность бокового ввода. Это становится немного быстрее пропорционально добавленным рабочим. Но все еще не там, где я хочу.

Комментарии:

1. Давайте посмотрим, правильно ли я понимаю: выходные данные для config_1, например, содержат старые данные и не обновляются при отправке нового сообщения в SUBSCRIPTION_1?

2. @R.Esteves Когда на SUBSCRIPTION_1 отправляется новое сообщение, количество элементов, добавленных на диаграмме, увеличивается, поэтому я думаю, что оно входит. Но ParDo просто не захватывает и не использует новый config_1 мгновенно, он все еще обрабатывает старые данные config_1. Вывод в TOPIC_C содержит смесь старых и новых данных config_1.

3. Есть ли у вас какая-то конкретная причина использовать окно в конвейерах config_x? Кажется, что вы не выполняете никакой агрегации

4. Я почерпнул идею из другой темы stackoverflow о медленном изменении кэша поиска . Я также пробовал запускать конвейер без управления окнами, но данные обрабатываются не все вместе.