#python #multithreading
#python #многопоточность
Вопрос:
Я просто возился с потоковой обработкой в python, написал эту базовую IM-штуковину [код внизу]
Я заметил, что когда я завершаю программу с помощью C-c, она не завершается, она просто зависает навсегда.
Я просто предполагаю, что он ожидает, пока каждый поток завершит то, что они делают, но поскольку это бесконечный цикл, этого никогда не произойдет.
Итак, я предполагаю, что мне нужно прерывать каждый поток вручную или завершать цикл при поступлении сигнала killsignal.
Как бы я это сделал?
#!/usr/bin/env python
import threading
import socket
class Listen(threading.Thread):
def run(self):
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.bind(('', 2727))
conn.listen(1)
while True:
channel, details = conn.accept()
print str(details) ": " channel.recv(250)
channel.send("got it")
channel.close()
class Shout(threading.Thread):
def run(self):
while True:
try:
address = raw_input("who u talking to? ")
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.connect((address, 2727))
break
except:
print "can't connect to " str(address)
while True:
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.connect((address, 2727))
conn.send(raw_input())
conn.close()
listen = Listen().start()
shout = Shout().start()
Комментарии:
1. Вам нужно самостоятельно обработать сигнал (SIGINT) и остановить свои потоки. Я не специалист по python, поэтому другие должны быть в состоянии предоставить конкретный пример обработки сигналов на python. В то же время вы можете захотеть поискать в Google (обработка сигналов python)
2. Запускал ее на debian, но в Windows она завершается с исключением.
3. голосование за закрытие из-за того, что заголовок не является указательным и сбивающим с толку и не имеет значения для повторного использования: реальная проблема заключается в нескольких конкретных проблемах в конкретном коде, а не в том, «как выйти из многопоточной программы».
Ответ №1:
Я вижу несколько причин неправильного поведения в вашем коде.
- Ctrl C вызывает исключение «KeyboardInterrupt» в основном потоке. Итак, вы должны справиться с этим там.
- Ваш сокет находится в режиме блокировки. Это приводит к тому, что несколько функций сокета блокируют вызывающий поток до тех пор, пока функция не вернется. В этом состоянии поток не может реагировать ни на одно событие завершения.
- Как вы уже сказали: ваш бесконечный цикл в функции run() потока является … действительно бесконечно. Таким образом, выполнение потока никогда не заканчивается (по крайней мере, не без неожиданного исключения). Вы должны использовать какой-то объект синхронизации, например, потоковую обработку.Объект события, чтобы иметь возможность сообщать потоку извне, что он должен завершить себя.
- Я бы не рекомендовал использовать raw_input() вне основного потока. Представьте, что происходит, когда у вас более одного потока Shout.
- Почему вы всегда закрываете и повторно подключаете сокет, когда сообщение было передано в вашем классе Shout? Сетевые подключения следует восстанавливать только в особых случаях из-за затрат на настройку.
- Без фреймового протокола для обмена данными вы никогда не сможете ожидать получения всех данных, которые были отправлены другим хостом при возврате функции recv ().
- Функция start() объекта thread не возвращает значение или объект. Таким образом, сохранение возвращаемого значения (= None) не имеет особого смысла.
- Вы никогда не можете ожидать, что функция send () передаст все переданные данные. Поэтому вы должны проверить результат функции и соответствующим образом обработать ситуацию, когда на самом деле были переданы не все байты.
- Для изучения многопоточности, безусловно, есть задачи поважнее, чем сетевое взаимодействие, поскольку эта тема сама по себе действительно сложная.
Помимо всего этого, вот моя попытка найти решение. Все еще есть многое, что можно улучшить. Вам также следует рассмотреть ответ Марка Толонена, поскольку класс SocketServer, несомненно, предназначен для упрощения некоторых действий при обработке такого рода материалов. Но вы также должны продолжать изучать основы.
#!/usr/bin/env python
import threading
import socket
import time
import errno
class StoppableThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.stop_event = threading.Event()
def stop(self):
if self.isAlive() == True:
# set event to signal thread to terminate
self.stop_event.set()
# block calling thread until thread really has terminated
self.join()
class Accept(StoppableThread):
def __init__(self, port):
StoppableThread.__init__(self)
self.port = port
self.threads = []
def run(self):
# handle connection acception
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.bind(('', self.port ))
conn.listen(5)
# set socket timeout to ~10ms
conn.settimeout(0.01)
while self.stop_event.is_set() == False:
try:
csock, caddr = conn.accept()
# spawn a new thread to handle the client connection
listen_thread = Listen(csock, caddr)
self.threads.append(listen_thread)
listen_thread.start()
except socket.timeout:
# socket operation timeout
# clear all terminated threads from thread list
for thread in self.threads:
if thread.isAlive() == False:
self.threads.remove(thread)
self.stop_threads()
def stop_threads(self):
# stop all running threads
for listen_thread in self.threads:
if listen_thread.isAlive() == True:
listen_thread.stop()
self.threads = []
class Listen(StoppableThread):
def __init__(self, csock, caddr):
StoppableThread.__init__(self)
self.csock = csock
self.caddr = caddr
self.csock.setblocking(False)
def run(self):
while self.stop_event.is_set() == False:
try:
recv_data = self.csock.recv(250)
if len(recv_data) > 0:
print str(self.caddr) ": " recv_data
self.csock.send("got it")
else:
# connection was closed by foreign host
self.stop_event.set()
except socket.error as (sock_errno, sock_errstr):
if (sock_errno == errno.EWOULDBLOCK):
# socket would block - sleep sometime
time.sleep(0.1)
else:
# unexpected / unhandled error - terminate thread
self.stop_event.set()
channel.close()
class Shout(StoppableThread):
def __init__(self, sport):
StoppableThread.__init__(self)
self.sport = sport
def run(self):
while self.stop_event.is_set() == False:
try:
address = raw_input("who u talking to? ")
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.connect((address, self.sport))
break
except socket.error:
# handle connection problems
print "can't connect to " str(address)
except:
# exit thread in case of an unexpected error
self.stop_event.set()
while self.stop_event.is_set() == False:
try:
# chat loop: send messages to remote host
print "what to send? :",
msg = raw_input()
# beware: send() function may block indefinitly here and it might not send all bytes as expected !!
conn.send(msg)
except:
# exit thread in case of an unexpected error
self.stop_event.set()
# close socket before thread terminates
conn.close()
def main():
do_exit = False
server_port = 2727
# start server socket thread
accept = Accept(server_port)
accept.start()
# start transmitting client socket thread
shout = Shout(server_port)
shout.start()
while do_exit == False:
try:
# sleep some time
time.sleep(0.1)
except KeyboardInterrupt:
# Ctrl C was hit - exit program
do_exit = True
# stop all running threads
shout.stop()
accept.stop()
# exit main program after all threads were terminated gracefully
if __name__ == "__main__":
main()
Ответ №2:
Посмотрите на исходный код библиотеки Python для SocketServer.py , в частности, реализация server_forever(), позволяющая увидеть, как сервер реализует завершение работы. Он использует select() для опроса сокета сервера на предмет новых подключений и проверяет флаг завершения. Вот взлом вашего исходного кода для использования SocketServer, и я добавил флаг выхода в Shout (). Он будет запускать потоки Shout и Listen в течение 5 секунд, а затем остановит их.
import socket
import SocketServer
import threading
import time
class Handler(SocketServer.StreamRequestHandler):
def handle(self):
print str(self.client_address) ": " self.request.recv(250)
self.request.send("got itn")
class Listen(threading.Thread):
def run(self):
self.server = SocketServer.TCPServer(('',2727),Handler)
self.server.serve_forever()
def stop(self):
self.server.shutdown()
class Shout(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.quit = False
def run(self):
while not self.quit:
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.connect(('localhost', 2727))
conn.send('sendingn')
print conn.recv(100)
conn.close()
def stop(self):
self.quit = True
listen = Listen()
listen.start()
shout = Shout()
shout.start()
time.sleep(5)
shout.stop()
listen.stop()