TCP-сокет Python не закрывается?

#python #sockets #tcp

#python #сокеты #tcp

Вопрос:

Может быть, у кого-нибудь здесь будет ответ на эту вещь, которая просто сводит меня с ума.

Чтобы упростить задачу, я создаю что-то вроде прокси. Всякий раз, когда он что-то получает, он пересылает все на сервер и отправляет обратно ответ. Итак, есть один сокет, который всегда прослушивает порт 4557 для клиентов, и для каждого входящего соединения создается новый сокет на случайном порту для подключения к порту сервера 4556.

Клиенты <==> Прокси <==> Сервер

Кроме того, существует другой сокет, который создается и прослушивает запросы, поступающие с сервера, для пересылки соответствующему клиенту.

Вот пример:

  • Клиент A подключается к прокси через порт 4557
  • Прокси создает сокет для сервера на порту 4556
  • Наряду с этим создается сокет, прослушивающий порт 40100
  • Клиент отправляет данные, пересылаемые на сервер
  • Клиент отключается. Закройте клиентское соединение и сокет с сервером
  • Некоторое время спустя сервер отправляет данные на прокси-сервер на порт 40100
  • Все перенаправлено клиенту A (порт 40100, соответствующий клиенту A)
  • И так далее..

Пока что в моих тестах я использую простой скрипт на Python для отправки уникального tcp-пакета на прокси-сервер вместе с сервером дампа, отображающим полученные данные и отражающим их обратно.

Итак, проблема в том, что когда соединение с прокси закрывается, соединение с сервером также должно быть закрыто с помощью «sock.close()». Однако, похоже, это просто полностью игнорируется. Сокет остается УСТАНОВЛЕННЫМ.

Теперь о коде.

Несколько замечаний.

  • DTN и Node являются соответственно сервером и клиентами.
  • runCallback вызывается в цикле до тех пор, пока поток не завершится.
  • finalCallback вызывается, когда поток завершается.
  • Связи между удаленными хостами (клиентом), прокси-портами (для сервера) и прокси-серверами хранятся в словарях: TCPProxyHostRegister (RemoteHost => Прокси), TCPProxyPortRegister (порт => Прокси), TCPPortToHost (порт => Удаленный хост).

Первый класс — TCPListenerThread . Он просто прослушивает определенный порт и создает экземпляры прокси (по одному для каждой пары Клиент => Сервер и пары Сервер => Клиент) и пересылает им соединения.

 class TCPListenerThread(StoppableThread):
    def __init__(self, tcp_port):
        StoppableThread.__init__(self)

        self.tcp_port = tcp_port

        self.sock = socket.socket( socket.AF_INET, # Internet
                        socket.SOCK_STREAM ) # tcp
        self.sock.bind( (LOCAL_ADDRESS, self.tcp_port) )

        self.sock.listen(1)

    def runCallback(self):
        print "Listen on " str(self.tcp_port) ".."
        conn, addr = self.sock.accept()

        if isFromDTN(addr):
            tcpProxy = getProxyFromPort(tcp_port)
            if not tcpProxy:
                tcpProxy = TCPProxy(host, True)
        else:
            host = addr[0]
            tcpProxy = getProxyFromHost(host)
            if not tcpProxy:
                tcpProxy = TCPProxy(host, False)

        tcpProxy.handle(conn)

    def finalCallback(self):
        self.sock.close()
  

Now comes the TCP Proxy:
It associates a remote host (Client) with a port connecting to Server.
If it’s a connection coming from a new Client, it will create a new listener (see above) for the Server and create a socket ready to forward everything to Server.

 class TCPProxy():
    def __init__(self, remote, isFromDTN):
        #remote = port for Server or Remote host for Client
        self.isFromDTN = isFromDTN
        self.conn = None

        #add itself to proxy registries

        #If listening from a node
        if not isFromDTN:
            #Set node remote host
            self.remoteHost = remote
            TCPProxyHostRegister[self.remoteHost] = self

            #Set port to DTN interface   listener
            self.portToDTN = getNewTCPPort()
            TCPPortToHost[self.portToDTN] = self.remoteHost
            newTCPListenerThread(self.portToDTN)
        #Or from DTN
        else:
            self.portToDTN = remote
            TCPProxyPortRegister[self.portToDTN] = self

            self.remoteHost = getRemoteHostFromPortTCP(self.portToDTN)

    def handle(self, conn):
        print "New connection!"

        #shouldn't happen, but eh
        if self.conn != None:
            self.closeConnections()

        self.conn = conn

        #init socket with remote
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        #self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        if self.isFromDTN:
            self.sock.connect((self.remoteHost, 4556)) #TODO: handle dynamic port..
        else:
            self.sock.connect((DTN_Address, DTN_TCPPort))

        #handle connection in a thread
        self.handlerThread = newTCPHandlerThread(self)
        #handle reply in a therad
        self.replyThread = newTCPReplyThread(self)

    def closeConnections(self):
        try:
            if self.conn != None:
                print "Close connections!"
                self.sock.close()
                self.conn.close()
                self.conn = None
                self.handlerThread.kill()
                self.replyThread.kill()
        except Exception, err:
            print str(err)
            #pass

    def forward(self, data):
        print "TCP forwarding data: " data
        self.sock.send(data)

    def forwardBack(self, data):
        print "TCP forwarding data back: " data
        self.conn.send(data)
  

In this proxy class, I instantiate two classes, TCPHandlerThread and TCPReplyThread. They are responsible for forwarding to Server, and forwarding back to Client, respectively.

 class TCPHandlerThread(StoppableThread):
    def __init__(self, proxy):
        StoppableThread.__init__(self)
        self.proxy = proxy

    def runCallback(self):
        test = False
        while 1:

            data = self.proxy.conn.recv(BUFFER_SIZE)    
            if test:
                self.proxy.sock.close()

            test = True
            if not data:
                break
            print "TCP received data:", data
            self.proxy.forward(data)
        self.kill()

    def finalCallback(self):
        self.proxy.closeConnections()



class TCPReplyThread(StoppableThread):
    def __init__(self, proxy):
        StoppableThread.__init__(self)
        self.proxy = proxy

    def runCallback(self):
        while 1:
            data = self.proxy.sock.recv(BUFFER_SIZE)
            if not data:            
                break
            print "TCP received back data: " data
            self.proxy.forwardBack(data)
        self.kill()

    def finalCallback(self):
        self.proxy.closeConnections()
  

Вы видите, что всякий раз, когда соединение закрывается, поток прерывается, а другое соединение (клиент / сервер с прокси-сервером или Прокси-сервер с сервером / клиентом) должно быть закрыто в Proxy.closeConnections()

Я заметил, что когда closeConnections() имеет значение «data = self.proxy.conn.recv(BUFFER_SIZE)», все идет хорошо, но когда оно вызывается даже сразу после последнего оператора, все идет не так.

Я подключил TCP, и прокси-сервер не отправляет никакого сигнала «пока». Состояние сокета не переходит в TIME_WAIT или что-то еще, оно просто остается УСТАНОВЛЕННЫМ.

Кроме того, я протестировал его на Windows и Ubuntu.

  • В Windows все происходит именно так, как я объяснил
  • В Ubuntu это работает нормально обычно (не всегда) для двух подключений, и в третий раз, когда я подключаюсь к тому же клиенту точно таким же образом к прокси, все снова идет не так, как описано.

Вот три файла, которые я использую, чтобы вы могли взглянуть на весь код. Прошу прощения, возможно, файл прокси не очень удобен для чтения. Предполагалось, что это будет быстрая разработка.

http://hognerud.net/stackoverflow/

Заранее спасибо.. Это, конечно, что-то глупое. Пожалуйста, не бейте меня слишком сильно, когда увидите это : (

Комментарии:

1. Не могли бы вы поделиться, где вы отслеживаете список открытых клиентских сокетов и как они связаны с прокси-сокетом для прокси-службы? Я мог бы потратить некоторое время на чтение кода, но такого рода базовая информация упростила бы задачу, чтобы все не проводили одинаковые исследования.

2. Почему вы публикуете код, который предположительно работает, а не код, который не работает, который обрабатывает открытие и закрытие проблемного серверного сокета?

3. Спасибо за комментарии. Отредактирован первоначальный пост.

Ответ №1:

Во-первых, я сожалею, что в настоящее время у меня нет времени на фактический запуск и тестирование вашего кода.

Но мне пришла в голову идея, что ваша проблема на самом деле может быть связана с использованием режима блокировки или неблокирующего режима в сокете. В этом случае вам следует проверить справку по модулю «socket» в документации python, особенно socket.setblocking().

Я предполагаю, что функция proxy.conn.recv() возвращает только тогда, когда фактически BUFFER_SIZE байты, полученные сокетом. Из-за этого поток блокируется до тех пор, пока не будет получено достаточное количество данных, и, следовательно, сокет не закрывается.

Как я уже сказал, в настоящее время это всего лишь предположение, поэтому, пожалуйста, не отвергайте меня, если это не решит проблему…

Комментарии:

1. Извините за поздний ответ. На самом деле это решило мою проблему. Спасибо!

2. нп — Я рад, что смог помочь.

3. Ваше предположение неверно. recv() возвращает, когда есть EOS, ошибка или был передан хотя бы один байт. В противном случае написание любого сетевого кода было бы практически невозможно.