Копирование контекстных символов.Контекст между задачами

#python #async-await #python-asyncio #python-contextvars

Вопрос:

У меня есть программа (сервер ASGI), которая структурирована примерно так:

 import asyncio
import contextvars

ctxvar = contextvars.ContextVar("ctx")


async def lifepsan():
    ctxvar.set("spam")


async def endpoint():
    assert ctxvar.get() == "spam"


async def main():
    ctx = contextvars.copy_context()
    task = asyncio.create_task(lifepsan())
    await task
    task = asyncio.create_task(endpoint())
    await task

asyncio.run(main())
 

Поскольку событие / конечные точки продолжительности жизни выполняются в задачах, они не могут совместно использовать контекстные окна.
Это сделано специально: задачи копируют контекст перед выполнением, поэтому lifespan не могут быть установлены ctxvar должным образом.
Это желаемое поведение для конечных точек, но я бы хотел, чтобы выполнение выглядело так (с точки зрения пользователя):

 async def lifespan():
    ctxvar.set("spam")
    await endpoint()
 

Другими словами, конечные точки выполняются в их собственном независимом контексте, но в контексте продолжительности жизни.

Я попытался заставить это работать, используя contextlib.copy_context() :

 import asyncio
import contextvars

ctxvar = contextvars.ContextVar("ctx")


async def lifepsan():
    ctxvar.set("spam")
    print("set")


async def endpoint():
    print("get")
    assert ctxvar.get() == "spam"


async def main():
    ctx = contextvars.copy_context()
    task = ctx.run(asyncio.create_task, lifepsan())
    await task
    endpoint_ctx = ctx.copy()
    task = endpoint_ctx.run(asyncio.create_task, endpoint())
    await task

asyncio.run(main())
 

Так же как:

 async def main():
    ctx = contextvars.copy_context()
    task = asyncio.create_task(ctx.run(lifespan))
    await task
    endpoint_ctx = ctx.copy()
    task = asyncio.create_task(endpoint_ctx.run(endpoint))
    await task
 

Однако, похоже, contextvars.Context.run это не работает таким образом (я предполагаю, что контекст связан при создании сопрограммы, но не при ее выполнении).

Есть ли простой способ добиться желаемого поведения, не перестраивая способ создания задач или что-то в этом роде?

Комментарии:

1. Вот как должны работать контекстные переменные, если вы хотите, чтобы две сопрограммы имели одно и то же значение переменной, вы можете попытаться использовать какое-либо хранилище ключей:значение, например, обычный словарь.

2. При полном контроле над всем это имело бы смысл. Более широким контекстом для этой ситуации является библиотека, которую мы назовем B. B вызывается другой библиотекой A , которая выполняет планирование задач и тому подобное (поэтому я сказал «без изменения способа создания задач»). В свою очередь, B вызывает код пользователя. Пользователи могут захотеть использовать контекстные переменные или даже использовать библиотеку, которая, в свою очередь, использует контекстные переменные. Цель здесь состояла в том, чтобы обеспечить разумное поведение для пользователей, не требуя от пользователей A изменять свой код.

Ответ №1:

Вот что я придумал, вдохновленный PEP 555 и asgiref:

 from contextvars import Context, ContextVar, copy_context
from typing import Any


def _set_cvar(cvar: ContextVar, val: Any):
    cvar.set(val)


class CaptureContext:

    def __init__(self) -> None:
        self.context = Context()

    def __enter__(self) -> "CaptureContext":
        self._outer = copy_context()
        return self

    def sync(self):
        final = copy_context()
        for cvar in final:
            if cvar not in self._outer:
                # new contextvar set
                self.context.run(_set_cvar, cvar, final.get(cvar))
            else:
                final_val = final.get(cvar)
                if self._outer.get(cvar) != final_val:
                    # value changed
                    self.context.run(_set_cvar, cvar, final_val)

    def __exit__(self, *args: Any):
        self.sync()


def restore_context(context: Context) -> None:
    """Restore `context` to the current Context"""
    for cvar in context.keys():
        try:
            cvar.set(context.get(cvar))
        except LookupError:
            cvar.set(context.get(cvar))
 

Использование:

 import asyncio
import contextvars

ctxvar = contextvars.ContextVar("ctx")


async def lifepsan(cap: CaptureContext):
    with cap:
        ctxvar.set("spam")


async def endpoint():
    assert ctxvar.get() == "spam"


async def main():
    cap = CaptureContext()
    await asyncio.create_task(lifepsan(cap))
    restore_context(cap.context)
    task = asyncio.create_task(endpoint())
    await task

asyncio.run(main())
 

Этот sync() метод предоставляется на случай, если задача выполняется долго и вам необходимо захватить контекст до ее завершения. Несколько надуманный пример:

 import asyncio
import contextvars

ctxvar = contextvars.ContextVar("ctx")


async def lifepsan(cap: CaptureContext, event: asyncio.Event):
    with cap:
        ctxvar.set("spam")
        cap.sync()
        event.set()
        await asyncio.sleep(float("inf"))


async def endpoint():
    assert ctxvar.get() == "spam"


async def main():
    cap = CaptureContext()
    event = asyncio.Event()
    asyncio.create_task(lifepsan(cap, event))
    await event.wait()
    restore_context(cap.context)
    task = asyncio.create_task(endpoint())
    await task

asyncio.run(main())
 

Я думаю, что все равно было бы намного приятнее, если contextvars.Context.run бы мы работали с сопрограммами.