#streaming #google-cloud-dataflow
#потоковая передача #google-облако-поток данных
Вопрос:
В настоящее время я создаю потоковое задание потока данных, которое выполняет вычисления только тогда и только тогда, когда в столбце «Кольцо» моих данных есть приращение.
Мой код потока данных
Job= (p | "Read" >> beam.io.ReadFromPubSub(topic=topic)
| "Parse Json" >> beam.Map(json.loads)
| "ParDo Divisors" >> ParDo(UpdateDelayTable()))
Данные, поступающие из pubsub:
Ring [
{...,"Ring":1},
{...,"Ring":1},
{...,"Ring":1},
{...,"Ring":2}
...]
Я хочу, чтобы мой поток данных отслеживал текущий номер звонка и запускал функцию только тогда и только тогда, когда номер звонка увеличился. Как мне это сделать.
Ответ №1:
Pub / Sub
Нет никакой гарантии, что {"Ring": 2}
это обязательно будет получено / отправлено Pub / Sub после {"Ring": 1}
.
Похоже, вам нужно сначала включить прием сообщений по порядку для Pub / Sub. А также убедитесь, что служба Pub / Sub получает Ring
данные постепенно.
Поток данных
Затем, чтобы достичь этого с помощью потока данных, вы можете использовать обработку с отслеживанием состояния.
Но имейте в виду, что «состояние» «звонка» зависит от ключа (и для каждого окна). Чтобы делать то, что вы хотите, все элементы должны иметь один и тот же ключ и попадать в одно и то же окно (в данном случае глобальное окно). Это будет очень «горячая» клавиша.
Пример кода:
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec
from apache_beam.coders import coders
class RingFn(beam.DoFn):
RING_STATE = ReadModifyWriteStateSpec(
name='Ring', coder=coders.VarIntCoder())
def process(self, element, ring=beam.DoFn.StateParam(RING_STATE)):
current_ring = ring.read() or 0
if element['Ring'] > current_ring:
print('Carry out your computation here!')
ring.write(element['Ring'])
# Usage
pcoll | beam.ParDo(RingFn())
# Check your keys if you are not sure what they are.
pcoll | beam.Keys() | beam.Map(print)
Комментарии:
1. Привет! Спасибо за комментарий. Я уже настроил порядок сообщений в pubsub, так что это было бы меньшей проблемой для беспокойства. Данные передаются со скоростью примерно 1 сообщение в секунду, и смена колец обычно происходит только в среднем каждые 3 часа, что означает, что функция записи колец в big table будет выполняться только тогда. В настоящее время только 10 компьютеров передают данные в pub sub. Затем, читая из Pub sub, я могу различать эти 10 машин по ключу «machine». Могу ли я каким-то образом установить это в качестве ключа? Я новичок в потоковом и функциональном программировании, ваша помощь приветствуется!
2. Да, при публикации в Pub / Sub, если вы структурируете имя компьютера в сообщении, вы можете использовать эту информацию позже в конвейере Beam. Например: beam.apache.org/documentation/transforms/python/elementwise /. … Вы можете сопоставить свои элементы с кортежами (machine_name, ring_data). Также вы всегда можете
pcoll | beam.Keys() | beam.Map(print)
проверить, соответствует ли ключ тому, что вы намеревались. Вы также можете использовать записные книжки ( cloud.google.com/dataflow/docs/guides /… ) для создания прототипа вашего конвейера, пока вы не убедитесь, что он правильный.3. Привет, спасибо за помощь, теперь это работает, мне удалось назначить ключи для моей коллекции P с помощью WithKeys(лямбда x: x[«ключ»]. Еще раз спасибо