#timer #apache-beam #dataflow
Вопрос:
как все поживают? я пытаюсь настроить боковой вход, который обновляется каждые 1 минуту или около того в потоке данных apache beam cloud с помощью python. Этот боковой ввод должен запросить базу данных SQL и передать результаты в «основной» конвейер, который использует его для извлечения дополнительной информации. Идея состоит в том, чтобы использовать таймеры, но я прочитал документы и обнаружил, что этого как бы не хватает, а также попытался найти несколько примеров, но безрезультатно.
На данный момент код, который у меня есть, выглядит так:
class QuerySQL(beam.DoFn):
def __init__(self, host, user, password, database):
pass
# set connection values
def setup(self):
# sql connection
# query for the dictionary
self.mapping = self._fetch_dictionary()
def _update_dictionary(self):
# updates dictionary
self.mapping = self._fetch_dictionary()
def _fetch_dictionary(self):
# queries sql and converts it to a python dictionary
return dictionary
def process(self, element):
yield str(self.mapping)
class AddSystemId(beam.DoFn):
def process(self, message, mapping):
mapping = eval(mapping)
# adds the corresponding value of
# the key present in mapping into the message
# Pipeline
with beam.Pipeline(options=options) as p:
# side input
mapping = (
p
| beam.Create([('', '')])
| 'sql_device_mapping' >> beam.ParDo(QuerySQL(**sql_connection))
)
# main pipeline
t1 = (
p
| 'read_telemetry_from_pubsub' >> ReadFromPubSub(topic=TOPIC_PATH)
| 'window_telemetry' >> beam.WindowInto(beam.window.FixedWindows(WINDOW_SIZE))
| 'format_telemetry_for_influx' >> beam.ParDo(FormatInfluxDB())
| 'add_system_id_to_telemetry' >> beam.ParDo(AddSystemId(), mapping=beam.pvalue.AsSingleton(mapping))
)
Я запрашиваю SQL ровно один раз и передаю результат в основной канал, но я хотел бы, чтобы функция «update_dictionary» обновилась сама. Есть ли простой способ? Заранее спасибо!