#python-3.x #asynchronous #python-asyncio
#python-3.x #асинхронный #python-асинхронный
Вопрос:
Я пытаюсь извлечь некоторые данные из OpenSubtitles с помощью asyncio, а затем загрузить файл, информация о котором содержится в этих данных. Я хочу получить эти данные и одновременно загрузить файл с помощью asyncio.
Проблема в том, что я хочу дождаться завершения 1 задачи из списка tasks
, прежде чем начинать с остальных задач в списке или download_tasks
. Причина этого заключается в том, что in self._perform_query()
я записываю информацию в файл, а in self._download_and_save_file()
я считываю ту же информацию из этого файла. Другими словами, перед download_tasks
началом выполнения необходимо дождаться завершения хотя бы одной задачи tasks
.
Я обнаружил, что могу это сделать, asyncio.wait(return_when=FIRST_COMPLETED)
но по какой-то причине он не работает должным образом:
payloads = [create_payloads(entry) for entry in retreive(table_in_database)]
tasks = [asyncio.create_task(self._perform_query(payload, proxy)) for payload in payloads]
download_tasks = [asyncio.create_task(self._download_and_save_file(url, proxy) for url in url_list]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(done)
print(len(done))
print(pending)
print(len(pending))
await asyncio.wait(download_tasks)
Результат полностью отличается от ожидаемого. Похоже , что из 3 задач в списке tasks
все 3 выполняются , несмотря на то , что я сдал asyncio.FIRST_COMPLETED
экзамен . Почему это происходит?
{<Task finished coro=<SubtitleDownloader._perform_query() done, defined at C:Users...subtitles.py:71> result=None>, <Task finished coro=<SubtitleDownloader._perform_query() done, defined at C:Users...subtitles.py:71> result=None>, <Task finished coro=<SubtitleDownloader._perform_query() done, defined at C:Users...subtitles.py:71> result=None>}
3
set()
0
Exiting
Насколько я могу судить, приведенный код self._perform_query()
не должен влиять на эту проблему. Здесь это в любом случае, просто чтобы убедиться:
async def _perform_query(self, payload, proxy):
try:
query_result = proxy.SearchSubtitles(self.opensubs_token, [payload], {"limit": 25})
except Fault as e:
raise "A fault has occurred:n{}".format(e)
except ProtocolError as e:
raise "A ProtocolError has occurred:n{}".format(e)
else:
if query_result["status"] == "200 OK":
with open("dl_links.json", "w") as dl_links_json:
result = query_result["data"][0]
subtitle_name = result["SubFileName"]
download_link = result["SubDownloadLink"]
download_data = {"download link": download_link,
"file name": subtitle_name}
json.dump(download_data, dl_links_json)
else:
print("Wrong status code: {}".format(query_result["status"]))
На данный момент я тестировал это без запуска download_tasks
, но я включил его здесь для контекста. Возможно, я решаю эту проблему совершенно неправильно. Если это так, я был бы очень признателен за ваш вклад!
Редактировать:
Проблема была очень простой, как указано ниже. _perform_query
не была ожидаемой функцией, вместо этого она выполнялась синхронно. Я изменил это, отредактировав часть записи файла _perform_query
, чтобы быть асинхронной с aiofiles
:
def _perform_query(self, payload, proxy):
query_result = proxy.SearchSubtitles(self.opensubs_token, [payload], {"limit": 25})
if query_result["status"] == "200 OK":
async with aiofiles.open("dl_links.json", mode="w") as dl_links_json:
result = query_result["data"][0]
download_link = result["SubDownloadLink"]
await dl_links_json.write(download_link)
Ответ №1:
return_when=FIRST_COMPLETED
не гарантирует, что будет выполнена только одна задача. Это гарантирует, что ожидание завершится, как только задача завершится, но вполне возможно, что другие задачи завершатся «одновременно», что для asyncio означает ту же итерацию цикла событий. Рассмотрим, например, следующий код:
async def noop():
pass
async def main():
done, pending = await asyncio.wait(
[noop(), noop(), noop()], return_when=asyncio.FIRST_COMPLETED)
print(len(done), len(pending))
asyncio.run(main())
Это печатается 3 0
так же, как ваш код. Почему?
asyncio.wait
выполняет две вещи: отправляет сопрограммы в цикл событий и настраивает обратные вызовы для уведомления о завершении любого из них. Однако noop
сопрограмма не содержит await
, поэтому ни один из вызовов noop()
не приостанавливается, каждый просто делает свое дело и немедленно возвращается. В результате все три экземпляра сопрограммы завершаются за один и тот же проход цикла событий. wait
затем сообщается, что все три сопрограммы завершены, о чем он послушно сообщает.
Если вы измените noop
режим ожидания случайного сна, например, измените pass
на await asyncio.sleep(0.1 * random.random())
, вы получите ожидаемое поведение. await
Сопрограммы больше не завершаются одновременно и wait
будут сообщать о первой, как только она ее обнаружит.
Это показывает истинную основную проблему с вашим кодом: _perform_query
не ожидает. Это указывает на то, что вы не используете базовую библиотеку async или используете ее неправильно. Вызов SearchSubtitles
, вероятно, просто блокирует цикл событий, который, по-видимому, работает в тривиальных тестах, но нарушает важные функции asyncio, такие как одновременное выполнение задач.
Комментарии:
1. Спасибо вам за замечательное разъяснение! Добавляя асинхронную запись в файл
_perform_query
, я получаю желаемый результат.2. @917k Рассмотрите также возможность использования ожидаемого for
SearchSubtitles
, что (я подозреваю) занимает большую часть времени. Запись файлов с использованиемaiofiles
, по-видимому, устраняет проблему, в то время как более насущная проблемаSearchSubtitles
блокировки сопрограммы (и цикла событий вместе с ней) остается, как вы обнаружите, если попытаетесь, например, загрузить несколько субтитров параллельно.3. Вы абсолютно правы. Но когда я пытаюсь создать задачу,
SearchSubtitles
а затем ожидаю ее, я получаюTypeError: a coroutine was expected, got {'status': '200 OK', ...
. Как я должен ожидать этот метод, если он является частью API OpenSubtitles?4. @917k Вам нужно найти асинхронный API для нужной вам функциональности — например, использовать
aiohttp
вместоrequests
. В вашем случае вам нужно будет найти или создать асинхронную версию API поиска субтитров. Ситуация улучшается, но доступность асинхронных интерфейсов является одной из проблем использования asyncio на данном этапе. Наконец, если вам необходимо использовать блокирующий интерфейс, вы можете использоватьrun_in_executor
его для вызова функции блокировки.5. Не могли бы вы подробнее рассказать о том, как я буду использовать
run_in_executor
? Что именно завершит вызов этой функции?