Остановка выполнения потоков в Python. Поток блокируется перед методом объединения

#python #windows #multithreading #wxpython #stdout

#python #Windows #многопоточность #wxpython #стандартный вывод

Вопрос:

Я пишу программу, которая выполняет дочерние программы в новом процессе и считывает стандартный вывод этих программ в отдельном потоке и записывает его в StyledTextCtrl control. Я столкнулся с проблемой остановки выполнения потоков. У меня есть следующий код:

 import subprocess
from threading import Thread, Lock
import wx
import wx.stc


import logging
logging.basicConfig(level=logging.DEBUG,
                    format='[%(levelname)s] (%(threadName)-10s) %(module)10s:%(funcName)-15s %(message)s',
                    )

class ReadThread(Thread):
    def __init__(self, subp, fd, append_text_callback):
        Thread.__init__(self)
        self.killed = False
        self.subp = subp
        self.fd = fd
        self.append_text_callback = append_text_callback

    def run(self):
        chunk = None
        while chunk != '' and not self.killed:
            self.subp.poll()
            chunk = self.fd.readline()
            if chunk:
                self.append_text_callback('%d: ' % self.subp.pid   chunk)
            if chunk == "":
                break
        logging.debug('END')


class MyForm(wx.Frame):

    def __init__(self):
        wx.Frame.__init__(self, None, wx.ID_ANY, "Example1")
        panel = wx.Panel(self, wx.ID_ANY)
        self.log = wx.stc.StyledTextCtrl(panel, wx.ID_ANY, size=(300,100), style=wx.TE_MULTILINE|wx.TE_READONLY|wx.HSCROLL)
        run_button = wx.Button(panel, wx.ID_ANY, 'Run')
        self.Bind(wx.EVT_BUTTON, self.onRun, run_button)
        self.Bind(wx.EVT_CLOSE, self.OnCloseFrame)
        sizer = wx.BoxSizer(wx.VERTICAL)
        sizer.Add(self.log, 1, wx.ALL|wx.EXPAND, 5)
        sizer.Add(run_button, 0, wx.ALL|wx.CENTER, 5)
        panel.SetSizer(sizer)
        self.read_thread_1 = None
        self.read_thread_2 = None
        self.log_lock = Lock()            

    def OnCloseFrame(self, event):
        logging.debug('CLOSE')
        if self.read_thread_1:
            self.read_thread_1.killed = True
        if self.read_thread_2:
            self.read_thread_2.killed = True
        if self.read_thread_1:
            self.read_thread_1.join(timeout=1)
            logging.debug('after 1 join')
        if self.read_thread_2:
            self.read_thread_2.join(timeout=1)
            logging.debug('after 2 join')
        self.read_thread_1 = None
        self.read_thread_2 = None
        event.Skip()

    def onRun(self, event):
        cmd1 = "E:/threading_block/stdout_emulator.exe 50 500"
        subp1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        self.read_thread_1 = ReadThread(subp1, subp1.stdout, self.AppendText)
        self.read_thread_1.start()
        cmd2 = "E:/threading_block/stdout_emulator.exe 21 400"
        sp2 = subprocess.Popen(cmd2, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        self.read_thread_2 = ReadThread(sp2, sp2.stdout, self.AppendText)
        self.read_thread_2.start()
        logging.debug('END')

    def AppendText(self, text):
        self.log_lock.acquire()
        logging.debug('%s' % text)
        self.log.AddText(text)
        self.log_lock.release()

if __name__ == "__main__":
    app = wx.PySimpleApp()
    frame = MyForm().Show()
    app.MainLoop()
  

Я останавливаю потоки в OnCloseFrame методе. Я устанавливаю killed атрибут в True значение, и в результате это приводит к завершению выполнения while цикла в run методе ReadThread класса.
Нет никаких проблем, когда я закрываю это приложение (под Windows) во время выполнения потоков. Но если я удалю timeout=1 из self.read_thread_1.join(timeout=1) и self.read_thread_2.join(timeout=1) произойдет блокировка программы.

Программа застряла self.read_thread_1.join() перед OnCloseFrame методом.

В чем проблема с этой блокировкой? Почему я должен использовать timeout for join ?

Код C std_emulator.exe заключается в следующем:

 #include <stdio.h>
#include <stdlib.h>
#include <windows.h>
#include <time.h>

int main(int argc, char *argv[])
{
  int s;
  int n;
  if (argc > 1)
  {    
    s = atoi(argv[1]);
    n = atoi(argv[2]);
  }
  else
  {
    s = 1;
    n = 50;
  }

  int i;
  for (i = s; i < n;   i)
  {
    printf("This is %d linen", i);
    fflush(stdout);
    if (i % 2 == 0) 
    {       
      srand(time(NULL));
      int r = rand() % 7;
      printf("sleep_type_2 with %dn", r);
      Sleep(30 * r);
      fflush(stdout); 
    }     
      if (i % 3 == 0) 
      {     
        int r = rand() % 7;
        printf("sleep_type_3 with %dn", r);
        Sleep(40 * r);
        fflush(stdout); 
      }
      if (i % 5 == 0) 
      {
        int r = rand() % 9;
        printf("sleep_type_5 with %dn", r);
        Sleep(50);
        fflush(stdout); 
      }   
    }
}
  

Ответ №1:

В вашем случае вы фактически не уничтожаете потоки при использовании join(timeout=1) . Вместо join этого операция ожидает одну секунду, чтобы увидеть, завершается ли поток, а когда этого не происходит, он просто сдается. join Не будет работать, потому что ваш поток, скорее всего, блокируется в этой строке:

 chunk = self.fd.readline()
  

Поэтому он никогда не зацикливается, чтобы проверить, установлено ли killed значение True . Я думаю, что лучший способ для вас полностью завершить потоки — это завершить запускаемый подпроцесс в ReadThread OnCloseFrame :

 if self.read_thread_1:
    self.read_thread_1.subp.terminate()
    self.read_thread_1.join()
if self.read_thread_2:
    self.read_thread_2.subp.terminate()
    self.read_thread_2.join()
  

Завершение подпроцесса разблокирует readline вызов и позволит вам чисто завершить поток.