#python #django #async-await #fastapi
#python #django #асинхронное ожидание #fastapi
Вопрос:
Я пишу тесты для fastAPI на django с сервером ASGI (адаптировал это руководство). Моя часть теста fastAPI продолжает возвращать ошибки, и я тщетно пытаюсь это исправить.
Мне нужно создать пользователя для тестирования API.
@sync_to_async
def _create_user(self, username, email):
try:
return User.objects.create(username=username, email=email)
except:
return None
async def setUp(self):
task = asyncio.create_task(self._create_user(username="user", email="email@email.com"))
self.user = await task
Запуск этого теста, оказывается, что self.user
это сопрограмма, и невозможно получить доступ к атрибутам, которые я ожидаю.
Как это решить?
Обновить :
Удалена асинхронность для _create_user(self, username, email)
.
Комментарии:
1.Почему вы обертываете
async
функцию сsync_to_async
помощью? Почему_create_user
async
в первую очередь, видя, что она ничего не делает асинхронно, кроме ошибочной попыткиawait print(..)
? Наконец, почемуsetUp
async
? Похоже, что для этого вообще не нужна какая-либо асинхронность. Вы по своей сути оборачиваете синхронное действиеUser.objects.create
в несколько пустых слоевasync
только для того, чтобы (неправильно) развернуть пустые слои, чтобы затем запустить синхронный код.2. Спасибо за вашу помощь. Есть вещи, которыми я не очень владею. Обертывание asyn с помощью sync_to_async было предназначено для отладки, и я не удалял его перед отправкой моего вопроса. Я обновлю свой вопрос.
Ответ №1:
Согласно документам https://docs.djangoproject.com/en/3.1/topics/async/#asgiref.sync.sync_to_async декоратор sync_to_async должен украшать функции синхронизации. (смотрите пример)
Комментарии:
1. Звучит неплохо. Но я получаю это предупреждение
RuntimeWarning: coroutine 'SyncToAsync.__call__' was never awaited self.setUp()
Ответ №2:
У меня есть ответ,…………………..
Просто создайте обычную функцию синхронизации и вызовите с помощью функции sync_to_async()
Эта функция используется для получения user_object
def get_user_object(mobile_no):
user_object = CustomUser.objects.get(mobile_no=mobile_no)
return user_object
Этот метод выполняется в асинхронном режиме, поэтому нам нужно вызвать функцию синхронизации в async, чтобы избавиться от ошибки……
from channels.db import database_sync_to_async
async def connect(self):
username = await database_sync_to_async(get_user_object)(mobile_no=" 9999999999")
print(user_object)