Python: как создать сервер для управления пулом потоков?

#python #networking #python-asyncio

#python #сеть #python-asyncio

Вопрос:

У меня есть пул потоков, который обрабатывает некоторые задачи одновременно. Теперь я хотел бы, чтобы задачи ( multiply_by_2 здесь) что-то печатали перед завершением.

Изначально я создал блокировку и передал блокировку каждому рабочему потоку. Если поток хочет что-то напечатать, он сначала получает блокировку, печатает свое сообщение в стандартный вывод, затем снимает блокировку.

Теперь я хочу иметь выделенный серверный поток, управляемый событиями, для обработки печати. Если поток хочет что-то напечатать, он просто отправляет свое сообщение на этот сервер через доменный сокет Unix ( AF_UNIX ). Я надеюсь, что таким образом время блокировки каждого потока может быть сокращено (не нужно ждать блокировки), и мне не нужно разделять блокировку между рабочими потоками. Серверный поток просто печатает все сообщения, которые он получил от клиентов (т. Е. рабочих потоков), по порядку.

Я некоторое время пытался использовать asyncio модуль Python (требуется Python 3.7 ), но не смог разобраться. Как мне это сделать?

Этот очищенный шаблон:

 # Python 3.7 
import asyncio
import multiprocessing.dummy as mp # Threading wrapped using multiprocessing API.
import os
import socket
import sys
import threading
import time

server_address = './uds_socket' # UNIX domain socket

def run_multiple_clients_until_complete(input_list):
    pool = mp.Pool(8)
    result_list = pool.map(multiply_by_2, input_list)
    return result_list

def multiply_by_2(n):
    time.sleep(0.2) # Simulates some blocking call.
    message_str = "client: n = %d" % n
    
    # TODO send message_str.encode() to server

    return n * 2

# Server's callback when it gets a client connection
# If you want to change it, please do..
def client_connected_cb(
    stream_reader: asyncio.StreamReader,
    stream_writer: asyncio.StreamWriter) -> None:
    message_str = reader.read().decode()
    print(message_str)

def create_server_thread():
    pass # TODO

# Let the server finish handling all connections it got, then
# stop the server and join the thread
def stop_server_and_wait_thread(thread):
    pass # TODO

def work(input_list):
    thread = create_server_thread()
    result_list = run_multiple_clients_until_complete(input_list)
    stop_server_and_wait_thread(thread)
    return result_list

def main():
    input_list = list(range(20))
    result_list = work(input_list)
    print(result_list)

if __name__ == "__main__":
    sys.exit(main())
  

Некоторые дополнительные требования:

  • Не создавайте async : run_multiple_clients_until_complete() , multiply_by_2() , main() .
  • Было бы лучше использовать SOCK_DGRAM протокол UDP вместо SOCK_STREAM TCP, но в этом нет необходимости.

Комментарии:

1. Asyncio не подходит для управления блокирующим кодом. Кроме того, асинхронная программа обычно асинхронна с нуля, поэтому не разрешать main() быть асинхронной — это в некотором роде не начальное действие. (Есть способы обойти это, но они не простые, и не ясно, помогут ли они здесь.) Также неясно, что именно не так с текущим решением — если блокировка делает работу слишком медленной, трудно представить, что asyncio, который полностью однопоточный, ускорит работу.

2. @user4815162342 Спасибо, не могли бы вы сформулировать это как ответ, чтобы я мог его принять? Вы правы, я искал способ обойти «асинхронная программа, как правило, асинхронна с нуля», и оказывается, что нет идиоматического решения.