#python #networking #python-asyncio
#python #сеть #python-asyncio
Вопрос:
У меня есть пул потоков, который обрабатывает некоторые задачи одновременно. Теперь я хотел бы, чтобы задачи ( multiply_by_2
здесь) что-то печатали перед завершением.
Изначально я создал блокировку и передал блокировку каждому рабочему потоку. Если поток хочет что-то напечатать, он сначала получает блокировку, печатает свое сообщение в стандартный вывод, затем снимает блокировку.
Теперь я хочу иметь выделенный серверный поток, управляемый событиями, для обработки печати. Если поток хочет что-то напечатать, он просто отправляет свое сообщение на этот сервер через доменный сокет Unix ( AF_UNIX
). Я надеюсь, что таким образом время блокировки каждого потока может быть сокращено (не нужно ждать блокировки), и мне не нужно разделять блокировку между рабочими потоками. Серверный поток просто печатает все сообщения, которые он получил от клиентов (т. Е. рабочих потоков), по порядку.
Я некоторое время пытался использовать asyncio
модуль Python (требуется Python 3.7 ), но не смог разобраться. Как мне это сделать?
Этот очищенный шаблон:
# Python 3.7
import asyncio
import multiprocessing.dummy as mp # Threading wrapped using multiprocessing API.
import os
import socket
import sys
import threading
import time
server_address = './uds_socket' # UNIX domain socket
def run_multiple_clients_until_complete(input_list):
pool = mp.Pool(8)
result_list = pool.map(multiply_by_2, input_list)
return result_list
def multiply_by_2(n):
time.sleep(0.2) # Simulates some blocking call.
message_str = "client: n = %d" % n
# TODO send message_str.encode() to server
return n * 2
# Server's callback when it gets a client connection
# If you want to change it, please do..
def client_connected_cb(
stream_reader: asyncio.StreamReader,
stream_writer: asyncio.StreamWriter) -> None:
message_str = reader.read().decode()
print(message_str)
def create_server_thread():
pass # TODO
# Let the server finish handling all connections it got, then
# stop the server and join the thread
def stop_server_and_wait_thread(thread):
pass # TODO
def work(input_list):
thread = create_server_thread()
result_list = run_multiple_clients_until_complete(input_list)
stop_server_and_wait_thread(thread)
return result_list
def main():
input_list = list(range(20))
result_list = work(input_list)
print(result_list)
if __name__ == "__main__":
sys.exit(main())
Некоторые дополнительные требования:
- Не создавайте
async
:run_multiple_clients_until_complete()
,multiply_by_2()
,main()
. - Было бы лучше использовать
SOCK_DGRAM
протокол UDP вместоSOCK_STREAM
TCP, но в этом нет необходимости.
Комментарии:
1. Asyncio не подходит для управления блокирующим кодом. Кроме того, асинхронная программа обычно асинхронна с нуля, поэтому не разрешать
main()
быть асинхронной — это в некотором роде не начальное действие. (Есть способы обойти это, но они не простые, и не ясно, помогут ли они здесь.) Также неясно, что именно не так с текущим решением — если блокировка делает работу слишком медленной, трудно представить, что asyncio, который полностью однопоточный, ускорит работу.2. @user4815162342 Спасибо, не могли бы вы сформулировать это как ответ, чтобы я мог его принять? Вы правы, я искал способ обойти «асинхронная программа, как правило, асинхронна с нуля», и оказывается, что нет идиоматического решения.