Python: Как реализовать асинхронный планировщик с последовательным выполнением?

#python #asynchronous #scheduled-tasks #python-asyncio

#питон #асинхронный #запланированные задачи #python-asyncio #python

Вопрос:

Я хочу иметь асинхронный планировщик для выполнения «действий», который удовлетворяет свойствам cerain:

  1. Действия являются одноразовыми и планируются с точными временными метками.
  2. Действия должны выполняться в строго последовательном порядке, т.е. планировщик не может запустить следующее действие до тех пор, пока предыдущее не завершит выполнение.
  3. Между выполнением действий, когда планировщик ожидает следующей временной метки, планировщик должен находиться в состоянии asyncio.sleep() , позволяющем другим сопрограммам выполнять свои очереди.
  4. Когда запланировано новое действие, планировщик должен немедленно скорректировать время ожидания, чтобы планировщик всегда ожидал как можно более быстрого действия.
  5. Когда никакие действия не запланированы, планировщик должен находиться в постоянном состоянии asyncio.sleep() , пока не будет добавлено новое действие.

Моя попытка:

 
import asyncio
import time

class Action:

    def __init__(self, timestamp):
        self.timestamp = timestamp

    async def do(self):
        print("Doing action...")

class Scheduler:

    def __init__(self):
        self._actions = []
        self._sleep_future = None

    def add(self, action):
        self._actions.append(action)
        self._actions.sort(key=lambda x: x.timestamp)

        if self._sleep_future:
            self._sleep_future.cancel()

    def pop(self):
        return self._actions.pop(0)

    async def start(self):
        asyncio.create_task(self.loop())

    async def loop(self):
        while True:
            now = time.time()            
            while self._actions:
                action = self._actions[0]
                if action.timestamp <= now:
                    action = self.pop()                
                    await action.do()                   
                else:
                    break

            self._sleep_future = asyncio.ensure_future(
                asyncio.sleep(self._actions[0].timestamp - now)
            )

            try:
                await self._sleep_future
            except asyncio.CancelledError:
                continue
            finally:
                self._sleep_future = None


  

Эта реализация ненадежна и не учитывает условие (5), которое я ищу!

Не могли бы вы мне что-нибудь посоветовать?

Ответ №1:

Цикл событий asyncio уже содержит код, который вы пытались реализовать — упорядочивание таймаутов и ожидание отправки задач. Вам необходимо адаптировать интерфейс Scheduler к базовой функциональности asyncio, например, так:

 class Scheduler:
    def __init__(self):
        self._running = asyncio.Lock()

    async def start(self):
        pass  # asyncio event loop will do the actual work

    def add(self, action):
        loop = asyncio.get_event_loop()
        # Can't use `call_at()` because event loop time uses a
        # different timer than time.time().
        loop.call_later(
            action.timestamp - time.time(),
            loop.create_task, self._execute(action)
        )

    async def _execute(self, action):
        # Use a lock to ensure that no two actions run at
        # the same time.
        async with self._running:
            await action.do()
  

Комментарии:

1. Я думаю loop.call_at() , что принимает только вызываемые (функции), а не сопрограммы.

2. @SergeyDylda Хороший момент. Это можно обойти, запланировав create_task вместо этого.