Многопоточность Python.Поток может быть остановлен только с помощью частного метода self .__Thread_stop()

#python #multithreading #queue

#python #многопоточность #очередь

Вопрос:

У меня есть функция, которая принимает большой массив пар x, y в качестве входных данных, которая выполняет некоторую сложную подгонку кривой с использованием numpy и scipy, а затем возвращает одно значение. Чтобы попытаться ускорить процесс, я пытаюсь создать два потока, в которые я передаю данные с помощью очереди.Очередь . Как только данные будут готовы. Я пытаюсь завершить потоки, а затем завершить вызывающий процесс и вернуть управление оболочке.

Я пытаюсь понять, почему я должен прибегать к частному методу в потоковой передаче.Поток, чтобы остановить мои потоки и вернуть управление в командную строку.

Self.join() не завершает программу. Единственным способом вернуть контроль было использовать метод private stop .

         def stop(self):
            print "STOP CALLED"
            self.finished.set()
            print "SET DONE"
            # self.join(timeout=None) does not work
            self._Thread__stop()
  

Вот пример моего кода:

     class CalcThread(threading.Thread):
        def __init__(self,in_queue,out_queue,function):
            threading.Thread.__init__(self)
            self.in_queue = in_queue
            self.out_queue = out_queue
            self.function = function
            self.finished = threading.Event()

        def stop(self):
            print "STOP CALLED"
            self.finished.set()
            print "SET DONE"
            self._Thread__stop()

        def run(self):
            while not self.finished.isSet():
                params_for_function = self.in_queue.get()
                try:
                    tm = self.function(paramsforfunction)
                    self.in_queue.task_done()
                    self.out_queue.put(tm)
                except ValueError as v:
                    #modify params and reinsert into queue
                    window = params_for_function["window"]
                    params_for_function["window"] = window   1
                    self.in_queue.put(params_for_function)

    def big_calculation(well_id,window,data_arrays):
            # do some analysis to calculate tm
            return tm

    if __name__ == "__main__":
        NUM_THREADS = 2
        workers = []
        in_queue = Queue()
        out_queue = Queue()

        for i in range(NUM_THREADS):
            w = CalcThread(in_queue,out_queue,big_calculation)
            w.start()
            workers.append(w)

        if options.analyze_all:
              for i in well_ids:
                  in_queue.put(dict(well_id=i,window=10,data_arrays=my_data_dict))

        in_queue.join()
        print "ALL THREADS SEEM TO BE DONE"
        # gather data and report it from out_queue
        for i in well_ids:
            p = out_queue.get()
            print p
            out_queue.task_done()
            # I had to do this to get the out_queue to proceed
            if out_queue.qsize() == 0:
                out_queue.join()
                break
# Calling this stop method does not seem to return control to the command line unless I use threading.Thread private method

        for aworker in workers:
            aworker.stop()
  

Комментарии:

1. sys.exit() (убивает только поток)

2. self.daemon = True работает, только если вызывается set перед start() , в противном случае возникает ошибка выполнения

3. sys.exit() не прерывает поток, но вызывает исключение SystemExit в текущем потоке.

Ответ №1:

В общем, убивать поток, который изменяет общий ресурс, — плохая идея.

Задачи с интенсивным использованием процессора в нескольких потоках хуже, чем бесполезны в Python, если вы не выпускаете GIL во время выполнения вычислений. Многие numpy функции освобождают GIL.

Пример ThreadPoolExecutor из документации

 import concurrent.futures # on Python 2.x: pip install futures 

calc_args = []
if options.analyze_all:
    calc_args.extend(dict(well_id=i,...) for i in well_ids)

with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
    future_to_args = dict((executor.submit(big_calculation, args), args)
                           for args in calc_args)

    while future_to_args:
        for future in concurrent.futures.as_completed(dict(**future_to_args)):
            args = future_to_args.pop(future)
            if future.exception() is not None:
                print('%r generated an exception: %s' % (args,
                                                         future.exception()))
                if isinstance(future.exception(), ValueError):
                    #modify params and resubmit
                    args["window"]  = 1
                    future_to_args[executor.submit(big_calculation, args)] = args

            else:
                print('f%r returned %r' % (args, future.result()))

print("ALL work SEEMs TO BE DONE")
  

Вы могли бы заменить ThreadPoolExecutor на ProcessPoolExecutor , если нет общего состояния. Поместите код в свою main() функцию.

Комментарии:

1. ВАУ, это ОГРОМНОЕ открытие. Большое спасибо, что познакомили меня с concurrent.futures. И это очень хорошо работает с python 2.7 и numpy и scipy. Ни один из потоков потока. Многопоточность и все преимущества одновременного выполнения

Ответ №2:

Чтобы уточнить мой комментарий — если единственной целью ваших потоков является получение значений из очереди и выполнение над ними функции, вам определенно лучше сделать что-то вроде этого, ИМХО:

 q = Queue()
results = []

def worker():
  while True:
    x, y = q.get()
    results.append(x ** y)
    q.task_done()

for _ in range(workerCount):
  t = Thread(target = worker)
  t.daemon = True
  t.start()

for tup in listOfXYs:
  q.put(tup)

q.join()

# Some more code here with the results list.
  

q.join() будет блокироваться до тех пор, пока он снова не станет пустым. Рабочие потоки будут продолжать пытаться получить значения, но не найдут ни одного, поэтому они будут ждать бесконечно долго, как только очередь опустеет. Когда ваш скрипт завершит свое выполнение позже, рабочие потоки умрут, потому что они помечены как потоки демонов.

Комментарии:

1. Вместо того, чтобы использовать демоны для этого материала (imo не очень хороший дизайн для этой ситуации, YMMV), вы могли бы использовать сторожевые значения. Т.е. После завершения всех заданий поместите nrThreads сторожевые значения в очередь, а затем снова присоединитесь к очереди или потокам. Потоки просто проверяют, get() возвращен ли sentinel (обычно None не является хорошим выбором), и в этом случае останавливаются. Также упрощает включение кода в более крупный дизайн.

2. @Voo: рабочие потоки сами вводят новые значения in_queue . Если основной поток вводит sentinels in_queue , они могут преждевременно сигнализировать о завершении. Как бы вы справились с этой ситуацией?

3. @unutbu — Лично я не вижу преимущества в сторожевых значениях, но вы могли бы (теоретически) решить эту проблему, используя LifoQueue вместо стандартной очереди и предварительно заполнить ее сторожевым значением для каждого рабочего потока. Это действительно несет в себе потенциал (по крайней мере, в случае op), что некоторые из ваших рабочих отмирают раньше, но что конечный рабочий, который повторно добавляет в in_queue несколько раз, в конечном итоге работает значительно дольше. Поток демона в заблокированном queue.get() режиме практически не потребляет ресурсов и, по моему опыту, не снижает производительность.

4. @g.d.d.c: Мне нравится ваша идея использования LifoQueue . Я думаю, что это может быть выполнимо. Но есть еще одна проблема: как узнать, когда out_queue пусто. Я не думаю, что тестирование qsize безопасно — поток может переходить к put новому элементу во out_queue время тестирования основного потока qsize , когда он временно равен нулю.

5. Я, очевидно, недостаточно хорошо объяснил концепцию, но да, g.d.d.c понял это правильно. подождите, поставьте стражей в очередь, снова подождите (хотя вы могли бы подождать потоки / процессы / пул потоков / что угодно и во второй раз; не совсем та же семантика, но достаточно близко). Это довольно полезный шаблон для такого рода проблем — может усложниться с несколькими очередями ввода, очередями вывода и т. Д. Но такова природа зверя, и мы можем обобщить решение и для этого.

Ответ №3:

Я попробовал метод g.d.d.c, и он дал интересный результат. Я мог бы заставить его точное вычисление x ** y работать просто отлично распределено между потоками.

Когда я вызвал свою функцию внутри рабочего цикла while True . Я мог бы выполнять вычисления между несколькими потоками, только если бы я поместил time.sleep(1) в цикл for, который вызывает метод threads start() .

Итак, в моем коде. Без time.sleep(1) программа выдала мне либо чистый выход без вывода, либо в некоторых случаях

«Исключение в потоке Thread-2 (скорее всего, возникает при завершении работы интерпретатора): Исключение в потоке Thread-1 (скорее всего, возникает при завершении работы интерпретатора):»

Как только я добавил time.sleep(), все прошло нормально.

 for aworker in range(5):
    t = Thread(target = worker)
    t.daemon = True
    t.start()
    # This sleep was essential or results for my specific function were None
    time.sleep(1)
    print "Started"