#python #asynchronous #scheduled-tasks #python-asyncio
#питон #асинхронный #запланированные задачи #python-asyncio #python
Вопрос:
Я хочу иметь асинхронный планировщик для выполнения «действий», который удовлетворяет свойствам cerain:
- Действия являются одноразовыми и планируются с точными временными метками.
- Действия должны выполняться в строго последовательном порядке, т.е. планировщик не может запустить следующее действие до тех пор, пока предыдущее не завершит выполнение.
- Между выполнением действий, когда планировщик ожидает следующей временной метки, планировщик должен находиться в состоянии
asyncio.sleep()
, позволяющем другим сопрограммам выполнять свои очереди. - Когда запланировано новое действие, планировщик должен немедленно скорректировать время ожидания, чтобы планировщик всегда ожидал как можно более быстрого действия.
- Когда никакие действия не запланированы, планировщик должен находиться в постоянном состоянии
asyncio.sleep()
, пока не будет добавлено новое действие.
Моя попытка:
import asyncio
import time
class Action:
def __init__(self, timestamp):
self.timestamp = timestamp
async def do(self):
print("Doing action...")
class Scheduler:
def __init__(self):
self._actions = []
self._sleep_future = None
def add(self, action):
self._actions.append(action)
self._actions.sort(key=lambda x: x.timestamp)
if self._sleep_future:
self._sleep_future.cancel()
def pop(self):
return self._actions.pop(0)
async def start(self):
asyncio.create_task(self.loop())
async def loop(self):
while True:
now = time.time()
while self._actions:
action = self._actions[0]
if action.timestamp <= now:
action = self.pop()
await action.do()
else:
break
self._sleep_future = asyncio.ensure_future(
asyncio.sleep(self._actions[0].timestamp - now)
)
try:
await self._sleep_future
except asyncio.CancelledError:
continue
finally:
self._sleep_future = None
Эта реализация ненадежна и не учитывает условие (5), которое я ищу!
Не могли бы вы мне что-нибудь посоветовать?
Ответ №1:
Цикл событий asyncio уже содержит код, который вы пытались реализовать — упорядочивание таймаутов и ожидание отправки задач. Вам необходимо адаптировать интерфейс Scheduler
к базовой функциональности asyncio, например, так:
class Scheduler:
def __init__(self):
self._running = asyncio.Lock()
async def start(self):
pass # asyncio event loop will do the actual work
def add(self, action):
loop = asyncio.get_event_loop()
# Can't use `call_at()` because event loop time uses a
# different timer than time.time().
loop.call_later(
action.timestamp - time.time(),
loop.create_task, self._execute(action)
)
async def _execute(self, action):
# Use a lock to ensure that no two actions run at
# the same time.
async with self._running:
await action.do()
Комментарии:
1. Я думаю
loop.call_at()
, что принимает только вызываемые (функции), а не сопрограммы.2. @SergeyDylda Хороший момент. Это можно обойти, запланировав
create_task
вместо этого.