Правильный способ использования AsyncMachine и нескольких объектов в переходах

#python #python-asyncio #transition #pytransitions

#python #python-asyncio #переход #pytransitions

Вопрос:

я пытаюсь реализовать клиент-серверное приложение через websockets, и у меня есть несколько сомнений в том, как это сделать правильно, чтобы поддерживать состояние каждого подключенного клиента.

глобальная машина много объектов для каждого соединения?
машина объект — для каждого соединения?

итак, я начал с нескольких тестов, чтобы проверить, как это работает одновременно

базовая машина

 class AsyncModel:
    def __init__(self, id_):
        self.req_id = id_

    async def prepare_model(self, _):
        print("prepare_model", self.req_id)


    async def before_change(self, _):
        print("before_change", self.req_id)


    async def after_change(self, _):
        print("After change", self.req_id)


transition = dict(trigger="start", source="Start", dest="Done",
                  prepare="prepare_model",
                  before=["before_change"],
                  after="after_change")

  

и несколько запущенных типов

я хочу, чтобы все модели меняли свое состояние одновременно

 
async def main():
    tasks = []
    machine = AsyncMachine(model=None,
                           states=["Start", "Done"],
                           transitions=[transition],
                           initial='Start',
                           send_event=True,
                           queued=True)
    for i in range(3):
        model = AsyncModel(id_=i)
        machine.add_model(model)

        tasks.append(model.start())

    await asyncio.gather(*tasks)

    for m in machine.models:
        machine.remove_model(m)

asyncio.run(main())

  

но результат:

 
prepare_model 0
before_change 0
After change 0
prepare_model 1
before_change 1
After change 1
prepare_model 2
before_change 2
After change 2

  

если я создам машину модель:

 
async def main():
    tasks = []

    for i in range(3):
        model = AsyncModel(id_=i)
        machine = AsyncMachine(model=model,
                               states=["Start", "Done"],
                               transitions=[transition],
                               initial='Start',
                               send_event=True,
                               queued=True)


        tasks.append(model.start())

    await asyncio.gather(*tasks)
  

вывод:

 prepare_model 0
prepare_model 1
prepare_model 2
before_change 0
before_change 1
before_change 2
After change 0
After change 1
After change 2
  

каков правильный способ?

Обновить

я хочу иметь доступный contextvar для каждой запущенной модели, чтобы иметь возможность корректно регистрировать все действия из других модулей, которые вызывает модель, чтобы не передавать явно некоторый идентификатор для каждого вызова внешней функции (класс модели outisde)
см. Какой-нибудь пример https://pastebin.com/qMfh0kNb , это работает не так, как ожидалось, assert срабатывает

Комментарии:

1. Добро пожаловать в StackOverflow! Это может быть неочевидно для вас, но ваш вопрос очень трудно понять. Неясно, что означает «машина», что делает код ( add_model например, вы никогда не показываете определение) или какой результат вы ожидаете для каждого фрагмента. Я предлагаю вам отредактировать ответ, чтобы: 1) обрезать ненужный код, но сделать примеры выполнимыми, 2) четко указать, какой результат вы ожидаете (и почему), и 3) задать четкий и ответственный вопрос. Ваш существующий вопрос: «Каков правильный путь?» является расплывчатым и неясным, если только вы уже не понимаете, что вы намереваетесь сделать.

Ответ №1:

Общий ответ на вопрос «Каков правильный путь?» — «Ну, это зависит …». Без четкого представления о том, чего вы хотите достичь, я могу ответить только на общие вопросы, которые я могу определить в вашем сообщении.

transitions Должен ли я использовать одну машину для КАЖДОЙ модели или ОДНУ машину для ВСЕЙ модели?

При использовании transitions это модель с отслеживанием состояния и содержит обратные вызовы перехода. Там машина действует как своего рода «свод правил». Таким образом, когда машины имеют идентичную конфигурацию, я бы рекомендовал использовать ОДНУ машину для ВСЕХ моделей в большинстве случаев использования. Использование нескольких машин с одинаковой конфигурацией в большинстве случаев просто увеличивает объем памяти и сложность кода. С моей точки зрения, я могу придумать один вариант использования, когда может быть полезно иметь несколько машин с одинаковыми конфигурациями. Но сначала вы можете задаться вопросом, почему обе версии ведут себя по-разному, хотя я только что сказал, что это не должно иметь никакого значения.

Почему обратные вызовы вызываются в другом порядке при использовании one AsyncMachine vs many AsyncMachines ?

Без пользовательских параметров использование одного AsyncMachine или нескольких AsyncMachines не имеет значения. Однако вы передали queued=True конструктор, который, согласно документации, делает это:

Если включена обработка в очереди, переход будет завершен до запуска следующего перехода

Вот почему ваша отдельная машина будет рассматривать один переход за раз, обрабатывая все обратные вызовы ОДНОЙ модели перед переходом к следующему событию / переходу. Поскольку каждая машина имеет свою собственную очередь событий / переходов, события будут обрабатываться мгновенно при использовании нескольких машин. Передача queued=True не имеет никакого эффекта в вашем примере с несколькими машинами. Вы можете получить такое же поведение для одной машины, не передавая queued параметр или передавая queued=False (значение по умолчанию). Я немного адаптировал ваш пример для иллюстрации:

 from transitions.extensions import AsyncMachine
import asyncio


class AsyncModel:
    def __init__(self, id_):
        self.req_id = id_

    async def prepare_model(self):
        print("prepare_model", self.req_id)

    async def before_change(self):
        print("before_change", self.req_id)

    async def after_change(self):
        print("after change", self.req_id)


transition = dict(trigger="start", source="Start", dest="Done",
                  prepare="prepare_model",
                  before="before_change",
                  after="after_change")

models = [AsyncModel(i) for i in range(3)]


async def main(queued):
    machine = AsyncMachine(model=models,
                           states=["Start", "Done"],
                           transitions=[transition],
                           initial='Start',
                           queued=queued)

    await asyncio.gather(*[model.start() for model in models])
    # alternatively you can dispatch an event to all models of a machine by name
    # await machine.dispatch("start")

print(">>> Queued=True")
asyncio.run(main(queued=False))
print(">>> Queued=False")
asyncio.run(main(queued=False))
  

Так что это зависит от того, что вам нужно. На ОДНОЙ машине вы можете иметь как последовательную обработку событий queued=True , так и параллельную обработку queued=False .

Вы упомянули, что есть один вариант использования, когда может потребоваться несколько машин…

В документации есть этот отрывок:

Вам следует рассмотреть возможность перехода queued=True к конструктору TimeoutMachine. Это гарантирует, что события обрабатываются последовательно, и позволяет избежать асинхронных условий гонок, которые могут возникнуть, когда время ожидания и событие происходят в непосредственной близости.

При использовании событий тайм-аута или других событий, которые происходят в тесной последовательности, могут возникнуть условия гонки, когда несколько переходов в одной и той же модели обрабатываются одновременно. Поэтому, когда эта проблема возникает в вашем случае использования и вам требуется параллельная обработка переходов на отдельных моделях, решением может быть наличие нескольких машин с одинаковыми конфигурациями.

Как работать с контекстами в AsyncMachine ?

Для меня это тонкий лед, и я могу ошибаться. Я могу попытаться дать краткое изложение моего текущего понимания того, почему вещи ведут себя определенным образом. Рассмотрим этот пример:

 from transitions.extensions import AsyncMachine
import asyncio
import contextvars

context_model = contextvars.ContextVar('model', default=None)
context_message = contextvars.ContextVar('message', default="unset")

def process():
    model = context_model.get()
    print(f"Processing {model.id} Request {model.count} => '{context_message.get()}'")


class Model:

    def __init__(self, id):
        self.id = id
        self.count = 0

    def request(self):
        self.count  = 1
        context_message.set(f"Super secret request from {self.id}")

    def nested(self):
        context_message.set(f"Not so secret message from {self.id}")
        process()


models = [Model(i) for i in range(3)]


async def model_loop(model):
    context_model.set(model)
    context_message.set(f"Hello from the model loop of {model.id}")
    while model.count < 3:
        await model.loop()


async def main():
    machine = AsyncMachine(model=models, initial='Start', transitions=[['loop', 'Start', '=']],
                           before_state_change='request',
                           after_state_change=[process, 'nested'])
    await asyncio.gather(*[model_loop(model) for model in models])

asyncio.run(main())
  

Вывод:

 # Processing 0 Request 1 => 'Hello from the model loop of 0'
# Processing 0 Request 1 => 'Not so secret message from 0'
# Processing 1 Request 1 => 'Hello from the model loop of 1'
# Processing 1 Request 1 => 'Not so secret message from 1'
# Processing 2 Request 1 => 'Hello from the model loop of 2'
# Processing 2 Request 1 => 'Not so secret message from 2'
# Processing 0 Request 2 => 'Hello from the model loop of 0'
# Processing 0 Request 2 => 'Not so secret message from 0'
# Processing 1 Request 2 => 'Hello from the model loop of 1'
# Processing 1 Request 2 => 'Not so secret message from 1'
# Processing 2 Request 2 => 'Hello from the model loop of 2'
# Processing 2 Request 2 => 'Not so secret message from 2'
# Processing 0 Request 3 => 'Hello from the model loop of 0'
# Processing 0 Request 3 => 'Not so secret message from 0'
# Processing 1 Request 3 => 'Hello from the model loop of 1'
# Processing 1 Request 3 => 'Not so secret message from 1'
# Processing 2 Request 3 => 'Hello from the model loop of 2'
# Processing 2 Request 3 => 'Not so secret message from 2'
  

Запускающие события были перенаправлены в циклы модели, которые устанавливают две контекстные переменные. Оба используются process глобальной функцией, которая использует контекстные переменные для обработки. Когда запускается переход, Model.request он вызывается прямо перед переходом и увеличивает Model.count . После Model.state изменения process Model.nested будет вызвана глобальная функция and .

process вызывается два раза: один раз в цикле модели и один раз в Model.nested обратном вызове. Измененный context_message from Model.request недоступен, но изменения в Model.nested доступны для process . Как это? Потому process что и Model.request совместно используют один и тот же родительский контекст ( Model может извлекать текущее значение context_message ), но при Model установке переменной она доступна только в ее текущем локальном контексте, который недоступен при последующем вызове (в другом обратном вызове) process . Если вы хотите, чтобы локальные изменения были доступны, process вам нужно будет запустить их ИЗ обратного вызова, как это сделано в Model.nested .

Короче говоря: обратные вызовы для AsyncMachine совместно используют один и тот же родительский контекст, но не могут получить доступ к локальному контексту друг друга, и поэтому изменение значений не имеет никакого эффекта. Однако, когда контекстная переменная является ссылкой (например context_model ), изменения в модели доступны в других обратных вызовах.

Работа с transitions очередями событий ( queued=True ) и использование contextvars требует некоторых дополнительных соображений, поскольку, как указано в документации, «при обработке событий в очереди вызов триггера всегда будет возвращать True, поскольку во время постановки в очередь невозможно определить, завершится ли переход, включающий вызовы в очереди, в конечном итоге успешно. Это верно даже тогда, когда обрабатывается только одно событие.«. Запущенное событие может быть добавлено только в очередь. Сразу после этого его контекст остается до обработки события. Если вам нужна обработка в очереди И contextvars, А также вы не можете вызывать функции из обратных вызовов модели, вам следует проверить asyncio.Заблокируйте и завершите вызов loop , но оставьте queued=False , чтобы вызовы функций не возвращались до их завершения.

Комментарии:

1. Спасибо за ответ! Прежде всего, моя идея состояла в том, чтобы использовать contextvars с AsyncMachine — чтобы каждая модель работала в каждом собственном контексте с собственными доступными для нее contextvars. Но я не смог этого достичь. Я пробовал разные способы создания моделей / машин / установки значений для contextvars — мне ничего не помогло. Я думаю, что может быть какой-то способ сделать это .. может быть, как LockedMachine делает?

2. @Eugene: Я добавил абзац об contextvar и AsyncMachine . AsyncMachine Я не могу сказать, подходит ли вам лучше async () или threads ( LockedMachine ). В обоих сценариях должна быть возможность совместно использовать контекстные переменные. Когда дело доходит до параллельной обработки — асинхронной или потоковой, — хорошей идеей является глубокое понимание того, что происходит. Использование кода для решения проблемы до тех пор, пока она не заработает (на данный момент), может привести к трудностям при отладке в будущем.

3. я добавил какой-то пример функциональности, которую я пытаюсь сделать pastebin.com/qMfh0kNb но это работает не так, как ожидалось. кстати, я также обновил основной вопрос — с основной целью

4. В документации говорится: «Важное примечание: при обработке событий в очереди вызов триггера всегда будет возвращать True , поскольку во время постановки в очередь невозможно определить, завершится ли переход, включающий вызовы в очереди, в конечном итоге успешно. Это верно даже тогда, когда обрабатывается только одно событие. » Когда вы вызываете цикл с queued=True помощью в другом контексте, триггер будет добавлен в очередь событий, и вызов возвращает / завершает контекст до его фактического выполнения. Другими словами: функция в очереди может не выполняться в контексте, в котором она была запущена.

5. у меня есть еще один вопрос об обработке состояния в очереди — можно ли создать отдельную очередь для каждой добавленной модели в конечном автомате? таким образом, состояния будут обрабатываться для каждой запущенной модели параллельно, а не последовательно для всех.