Как отслеживать состояния между бегунами в задании потока данных?

#streaming #google-cloud-dataflow

#потоковая передача #google-облако-поток данных

Вопрос:

В настоящее время я создаю потоковое задание потока данных, которое выполняет вычисления только тогда и только тогда, когда в столбце «Кольцо» моих данных есть приращение.

Мой код потока данных

 Job=      (p | "Read" >> beam.io.ReadFromPubSub(topic=topic)
         | "Parse Json" >> beam.Map(json.loads)
         | "ParDo Divisors" >> ParDo(UpdateDelayTable()))
 

Данные, поступающие из pubsub:

 Ring [
{...,"Ring":1},
{...,"Ring":1},
{...,"Ring":1},
{...,"Ring":2}
...]
 

Я хочу, чтобы мой поток данных отслеживал текущий номер звонка и запускал функцию только тогда и только тогда, когда номер звонка увеличился. Как мне это сделать.

Ответ №1:

Pub / Sub

Нет никакой гарантии, что {"Ring": 2} это обязательно будет получено / отправлено Pub / Sub после {"Ring": 1} .

Похоже, вам нужно сначала включить прием сообщений по порядку для Pub / Sub. А также убедитесь, что служба Pub / Sub получает Ring данные постепенно.

Поток данных

Затем, чтобы достичь этого с помощью потока данных, вы можете использовать обработку с отслеживанием состояния.

Но имейте в виду, что «состояние» «звонка» зависит от ключа (и для каждого окна). Чтобы делать то, что вы хотите, все элементы должны иметь один и тот же ключ и попадать в одно и то же окно (в данном случае глобальное окно). Это будет очень «горячая» клавиша.

Пример кода:

 from apache_beam.transforms.userstate import ReadModifyWriteStateSpec
from apache_beam.coders import coders


class RingFn(beam.DoFn):
  RING_STATE = ReadModifyWriteStateSpec(
      name='Ring', coder=coders.VarIntCoder())

  def process(self, element, ring=beam.DoFn.StateParam(RING_STATE)):
    current_ring = ring.read() or 0
    if element['Ring'] > current_ring:
        print('Carry out your computation here!')
        ring.write(element['Ring'])


  # Usage
  pcoll | beam.ParDo(RingFn())

  # Check your keys if you are not sure what they are.
  pcoll | beam.Keys() | beam.Map(print)
 

Комментарии:

1. Привет! Спасибо за комментарий. Я уже настроил порядок сообщений в pubsub, так что это было бы меньшей проблемой для беспокойства. Данные передаются со скоростью примерно 1 сообщение в секунду, и смена колец обычно происходит только в среднем каждые 3 часа, что означает, что функция записи колец в big table будет выполняться только тогда. В настоящее время только 10 компьютеров передают данные в pub sub. Затем, читая из Pub sub, я могу различать эти 10 машин по ключу «machine». Могу ли я каким-то образом установить это в качестве ключа? Я новичок в потоковом и функциональном программировании, ваша помощь приветствуется!

2. Да, при публикации в Pub / Sub, если вы структурируете имя компьютера в сообщении, вы можете использовать эту информацию позже в конвейере Beam. Например: beam.apache.org/documentation/transforms/python/elementwise /. … Вы можете сопоставить свои элементы с кортежами (machine_name, ring_data). Также вы всегда можете pcoll | beam.Keys() | beam.Map(print) проверить, соответствует ли ключ тому, что вы намеревались. Вы также можете использовать записные книжки ( cloud.google.com/dataflow/docs/guides /… ) для создания прототипа вашего конвейера, пока вы не убедитесь, что он правильный.

3. Привет, спасибо за помощь, теперь это работает, мне удалось назначить ключи для моей коллекции P с помощью WithKeys(лямбда x: x[«ключ»]. Еще раз спасибо