Как правильно откладывать использование таймеров луча apache и потока данных с помощью Python

#timer #apache-beam #dataflow

Вопрос:

как все поживают? я пытаюсь настроить боковой вход, который обновляется каждые 1 минуту или около того в потоке данных apache beam cloud с помощью python. Этот боковой ввод должен запросить базу данных SQL и передать результаты в «основной» конвейер, который использует его для извлечения дополнительной информации. Идея состоит в том, чтобы использовать таймеры, но я прочитал документы и обнаружил, что этого как бы не хватает, а также попытался найти несколько примеров, но безрезультатно.

На данный момент код, который у меня есть, выглядит так:

 class QuerySQL(beam.DoFn):
    def __init__(self, host, user, password, database):
        pass
        # set connection values

    def setup(self):
        # sql connection
        # query for the dictionary
        self.mapping = self._fetch_dictionary()

    def _update_dictionary(self):
        # updates dictionary
        self.mapping = self._fetch_dictionary()

    def _fetch_dictionary(self):
        # queries sql and converts it to a python dictionary
        return dictionary

    def process(self, element):
        yield str(self.mapping)


class AddSystemId(beam.DoFn):
    def process(self, message, mapping):
        mapping = eval(mapping)
        # adds the corresponding value of
        # the key present in mapping into the message

# Pipeline
with beam.Pipeline(options=options) as p:

    # side input
    mapping = (
        p
        | beam.Create([('', '')])
        | 'sql_device_mapping' >> beam.ParDo(QuerySQL(**sql_connection))
    )

    # main pipeline
    t1 = (
            p
            | 'read_telemetry_from_pubsub' >> ReadFromPubSub(topic=TOPIC_PATH)
            | 'window_telemetry' >> beam.WindowInto(beam.window.FixedWindows(WINDOW_SIZE))
            | 'format_telemetry_for_influx' >> beam.ParDo(FormatInfluxDB())
            | 'add_system_id_to_telemetry' >> beam.ParDo(AddSystemId(), mapping=beam.pvalue.AsSingleton(mapping))
    )
 

Я запрашиваю SQL ровно один раз и передаю результат в основной канал, но я хотел бы, чтобы функция «update_dictionary» обновилась сама. Есть ли простой способ? Заранее спасибо!