Как выйти из многопоточной программы?

#python #multithreading

#python #многопоточность

Вопрос:

Я просто возился с потоковой обработкой в python, написал эту базовую IM-штуковину [код внизу]

Я заметил, что когда я завершаю программу с помощью C-c, она не завершается, она просто зависает навсегда.

Я просто предполагаю, что он ожидает, пока каждый поток завершит то, что они делают, но поскольку это бесконечный цикл, этого никогда не произойдет.
Итак, я предполагаю, что мне нужно прерывать каждый поток вручную или завершать цикл при поступлении сигнала killsignal.
Как бы я это сделал?

 #!/usr/bin/env python
import threading
import socket

class Listen(threading.Thread):

    def run(self):
        conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        conn.bind(('', 2727))
        conn.listen(1)
        while True:
            channel, details = conn.accept()
            print str(details) ": " channel.recv(250)
            channel.send("got it")
            channel.close()

class Shout(threading.Thread):

    def run(self):
        while True:
            try:    
                address = raw_input("who u talking to? ")
                conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                conn.connect((address, 2727))
                break
            except:
                print "can't connect to "  str(address)
        while True:
            conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            conn.connect((address, 2727))
            conn.send(raw_input())
            conn.close()

listen = Listen().start()
shout = Shout().start()
  

Комментарии:

1. Вам нужно самостоятельно обработать сигнал (SIGINT) и остановить свои потоки. Я не специалист по python, поэтому другие должны быть в состоянии предоставить конкретный пример обработки сигналов на python. В то же время вы можете захотеть поискать в Google (обработка сигналов python)

2. Запускал ее на debian, но в Windows она завершается с исключением.

3. голосование за закрытие из-за того, что заголовок не является указательным и сбивающим с толку и не имеет значения для повторного использования: реальная проблема заключается в нескольких конкретных проблемах в конкретном коде, а не в том, «как выйти из многопоточной программы».

Ответ №1:

Я вижу несколько причин неправильного поведения в вашем коде.

  1. Ctrl C вызывает исключение «KeyboardInterrupt» в основном потоке. Итак, вы должны справиться с этим там.
  2. Ваш сокет находится в режиме блокировки. Это приводит к тому, что несколько функций сокета блокируют вызывающий поток до тех пор, пока функция не вернется. В этом состоянии поток не может реагировать ни на одно событие завершения.
  3. Как вы уже сказали: ваш бесконечный цикл в функции run() потока является … действительно бесконечно. Таким образом, выполнение потока никогда не заканчивается (по крайней мере, не без неожиданного исключения). Вы должны использовать какой-то объект синхронизации, например, потоковую обработку.Объект события, чтобы иметь возможность сообщать потоку извне, что он должен завершить себя.
  4. Я бы не рекомендовал использовать raw_input() вне основного потока. Представьте, что происходит, когда у вас более одного потока Shout.
  5. Почему вы всегда закрываете и повторно подключаете сокет, когда сообщение было передано в вашем классе Shout? Сетевые подключения следует восстанавливать только в особых случаях из-за затрат на настройку.
  6. Без фреймового протокола для обмена данными вы никогда не сможете ожидать получения всех данных, которые были отправлены другим хостом при возврате функции recv ().
  7. Функция start() объекта thread не возвращает значение или объект. Таким образом, сохранение возвращаемого значения (= None) не имеет особого смысла.
  8. Вы никогда не можете ожидать, что функция send () передаст все переданные данные. Поэтому вы должны проверить результат функции и соответствующим образом обработать ситуацию, когда на самом деле были переданы не все байты.
  9. Для изучения многопоточности, безусловно, есть задачи поважнее, чем сетевое взаимодействие, поскольку эта тема сама по себе действительно сложная.

Помимо всего этого, вот моя попытка найти решение. Все еще есть многое, что можно улучшить. Вам также следует рассмотреть ответ Марка Толонена, поскольку класс SocketServer, несомненно, предназначен для упрощения некоторых действий при обработке такого рода материалов. Но вы также должны продолжать изучать основы.

 #!/usr/bin/env python
import threading
import socket
import time
import errno

class StoppableThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.stop_event = threading.Event()        

    def stop(self):
        if self.isAlive() == True:
            # set event to signal thread to terminate
            self.stop_event.set()
            # block calling thread until thread really has terminated
            self.join()

class Accept(StoppableThread):
    def __init__(self, port):
        StoppableThread.__init__(self)
        self.port = port
        self.threads = []

    def run(self):     
        # handle connection acception
        conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        conn.bind(('', self.port ))
        conn.listen(5)
        # set socket timeout to ~10ms
        conn.settimeout(0.01)
        while self.stop_event.is_set() == False:
            try:
                csock, caddr = conn.accept()
                # spawn a new thread to handle the client connection
                listen_thread = Listen(csock, caddr)
                self.threads.append(listen_thread)
                listen_thread.start()
            except socket.timeout:
                # socket operation timeout
                # clear all terminated threads from thread list                
                for thread in self.threads:
                    if thread.isAlive() == False:
                        self.threads.remove(thread)

        self.stop_threads()

    def stop_threads(self):
        # stop all running threads
        for listen_thread in self.threads:
            if listen_thread.isAlive() == True:
                listen_thread.stop()
        self.threads = [] 

class Listen(StoppableThread):
    def __init__(self, csock, caddr):
        StoppableThread.__init__(self)
        self.csock = csock
        self.caddr = caddr
        self.csock.setblocking(False)

    def run(self):                
        while self.stop_event.is_set() == False:            
            try:                
                recv_data = self.csock.recv(250)
                if len(recv_data) > 0:       
                    print str(self.caddr) ": "   recv_data
                    self.csock.send("got it")                    
                else:
                    # connection was closed by foreign host
                    self.stop_event.set()
            except socket.error as (sock_errno, sock_errstr):
                if (sock_errno == errno.EWOULDBLOCK):
                    # socket would block - sleep sometime
                    time.sleep(0.1)                    
                else:
                    # unexpected / unhandled error - terminate thread
                    self.stop_event.set()
        channel.close()

class Shout(StoppableThread):
    def __init__(self, sport):
        StoppableThread.__init__(self)
        self.sport = sport

    def run(self):
        while self.stop_event.is_set() == False:
            try:    
                address = raw_input("who u talking to? ")
                conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                conn.connect((address, self.sport))
                break
            except socket.error:
                # handle connection problems
                print "can't connect to "  str(address)
            except: 
                # exit thread in case of an unexpected error
                self.stop_event.set()

        while self.stop_event.is_set() == False:
            try: 
                # chat loop: send messages to remote host            
                print "what to send? :",
                msg = raw_input()
                # beware: send() function may block indefinitly here and it might not send all bytes as expected !!
                conn.send(msg)
            except:
                # exit thread in case of an unexpected error
                self.stop_event.set()
        # close socket before thread terminates
        conn.close()

def main():
    do_exit = False
    server_port = 2727

    # start server socket thread
    accept = Accept(server_port)
    accept.start()

    # start transmitting client socket thread
    shout = Shout(server_port)
    shout.start()

    while do_exit == False:
        try:
            # sleep some time
            time.sleep(0.1)
        except KeyboardInterrupt:
            # Ctrl C was hit - exit program
            do_exit = True

    # stop all running threads
    shout.stop()
    accept.stop()

    # exit main program after all threads were terminated gracefully    

if __name__ == "__main__":
    main()
  

Ответ №2:

Посмотрите на исходный код библиотеки Python для SocketServer.py , в частности, реализация server_forever(), позволяющая увидеть, как сервер реализует завершение работы. Он использует select() для опроса сокета сервера на предмет новых подключений и проверяет флаг завершения. Вот взлом вашего исходного кода для использования SocketServer, и я добавил флаг выхода в Shout (). Он будет запускать потоки Shout и Listen в течение 5 секунд, а затем остановит их.

 import socket
import SocketServer
import threading
import time

class Handler(SocketServer.StreamRequestHandler):
    def handle(self):
        print str(self.client_address)   ": "   self.request.recv(250)
        self.request.send("got itn")

class Listen(threading.Thread):
    def run(self):
        self.server = SocketServer.TCPServer(('',2727),Handler)
        self.server.serve_forever()
    def stop(self):
        self.server.shutdown()

class Shout(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.quit = False
    def run(self):
        while not self.quit:
            conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            conn.connect(('localhost', 2727))
            conn.send('sendingn')
            print conn.recv(100)
            conn.close()
    def stop(self):
        self.quit = True

listen = Listen()
listen.start()
shout = Shout()
shout.start()

time.sleep(5)

shout.stop()
listen.stop()