#python #async-await #python-asyncio #python-contextvars
Вопрос:
У меня есть программа (сервер ASGI), которая структурирована примерно так:
import asyncio
import contextvars
ctxvar = contextvars.ContextVar("ctx")
async def lifepsan():
ctxvar.set("spam")
async def endpoint():
assert ctxvar.get() == "spam"
async def main():
ctx = contextvars.copy_context()
task = asyncio.create_task(lifepsan())
await task
task = asyncio.create_task(endpoint())
await task
asyncio.run(main())
Поскольку событие / конечные точки продолжительности жизни выполняются в задачах, они не могут совместно использовать контекстные окна.
Это сделано специально: задачи копируют контекст перед выполнением, поэтому lifespan
не могут быть установлены ctxvar
должным образом.
Это желаемое поведение для конечных точек, но я бы хотел, чтобы выполнение выглядело так (с точки зрения пользователя):
async def lifespan():
ctxvar.set("spam")
await endpoint()
Другими словами, конечные точки выполняются в их собственном независимом контексте, но в контексте продолжительности жизни.
Я попытался заставить это работать, используя contextlib.copy_context()
:
import asyncio
import contextvars
ctxvar = contextvars.ContextVar("ctx")
async def lifepsan():
ctxvar.set("spam")
print("set")
async def endpoint():
print("get")
assert ctxvar.get() == "spam"
async def main():
ctx = contextvars.copy_context()
task = ctx.run(asyncio.create_task, lifepsan())
await task
endpoint_ctx = ctx.copy()
task = endpoint_ctx.run(asyncio.create_task, endpoint())
await task
asyncio.run(main())
Так же как:
async def main():
ctx = contextvars.copy_context()
task = asyncio.create_task(ctx.run(lifespan))
await task
endpoint_ctx = ctx.copy()
task = asyncio.create_task(endpoint_ctx.run(endpoint))
await task
Однако, похоже, contextvars.Context.run
это не работает таким образом (я предполагаю, что контекст связан при создании сопрограммы, но не при ее выполнении).
Есть ли простой способ добиться желаемого поведения, не перестраивая способ создания задач или что-то в этом роде?
Комментарии:
1. Вот как должны работать контекстные переменные, если вы хотите, чтобы две сопрограммы имели одно и то же значение переменной, вы можете попытаться использовать какое-либо хранилище ключей:значение, например, обычный словарь.
2. При полном контроле над всем это имело бы смысл. Более широким контекстом для этой ситуации является библиотека, которую мы назовем B. B вызывается другой библиотекой A , которая выполняет планирование задач и тому подобное (поэтому я сказал «без изменения способа создания задач»). В свою очередь, B вызывает код пользователя. Пользователи могут захотеть использовать контекстные переменные или даже использовать библиотеку, которая, в свою очередь, использует контекстные переменные. Цель здесь состояла в том, чтобы обеспечить разумное поведение для пользователей, не требуя от пользователей A изменять свой код.
Ответ №1:
Вот что я придумал, вдохновленный PEP 555 и asgiref:
from contextvars import Context, ContextVar, copy_context
from typing import Any
def _set_cvar(cvar: ContextVar, val: Any):
cvar.set(val)
class CaptureContext:
def __init__(self) -> None:
self.context = Context()
def __enter__(self) -> "CaptureContext":
self._outer = copy_context()
return self
def sync(self):
final = copy_context()
for cvar in final:
if cvar not in self._outer:
# new contextvar set
self.context.run(_set_cvar, cvar, final.get(cvar))
else:
final_val = final.get(cvar)
if self._outer.get(cvar) != final_val:
# value changed
self.context.run(_set_cvar, cvar, final_val)
def __exit__(self, *args: Any):
self.sync()
def restore_context(context: Context) -> None:
"""Restore `context` to the current Context"""
for cvar in context.keys():
try:
cvar.set(context.get(cvar))
except LookupError:
cvar.set(context.get(cvar))
Использование:
import asyncio
import contextvars
ctxvar = contextvars.ContextVar("ctx")
async def lifepsan(cap: CaptureContext):
with cap:
ctxvar.set("spam")
async def endpoint():
assert ctxvar.get() == "spam"
async def main():
cap = CaptureContext()
await asyncio.create_task(lifepsan(cap))
restore_context(cap.context)
task = asyncio.create_task(endpoint())
await task
asyncio.run(main())
Этот sync()
метод предоставляется на случай, если задача выполняется долго и вам необходимо захватить контекст до ее завершения. Несколько надуманный пример:
import asyncio
import contextvars
ctxvar = contextvars.ContextVar("ctx")
async def lifepsan(cap: CaptureContext, event: asyncio.Event):
with cap:
ctxvar.set("spam")
cap.sync()
event.set()
await asyncio.sleep(float("inf"))
async def endpoint():
assert ctxvar.get() == "spam"
async def main():
cap = CaptureContext()
event = asyncio.Event()
asyncio.create_task(lifepsan(cap, event))
await event.wait()
restore_context(cap.context)
task = asyncio.create_task(endpoint())
await task
asyncio.run(main())
Я думаю, что все равно было бы намного приятнее, если contextvars.Context.run
бы мы работали с сопрограммами.