Приостановка двух потоков Python, пока третий выполняет что-то (с блокировками?)

#python #multithreading #locks

#python #многопоточность #блокировки

Вопрос:

Я новичок в параллельном программировании.

Я хотел бы выполнить три задачи повторно. Первые два должны выполняться постоянно, третий должен выполняться каждый час или около того. Первые две задачи могут выполняться параллельно, но я всегда хочу приостановить их, пока выполняется третья задача.

Вот скелет того, что я пробовал:

 import threading
import time

flock = threading.Lock()
glock = threading.Lock()

def f():
    while True:
        with flock:
            print 'f'
            time.sleep(1)

def g():
    while True:
        with glock:
            print 'g'
            time.sleep(1)

def h():
    while True:
        with flock:
            with glock:
                print 'h'
        time.sleep(5)

threading.Thread(target=f).start()
threading.Thread(target=g).start()
threading.Thread(target=h).start()
  

Я бы ожидал, что этот код будет печатать f и g каждую секунду, и h примерно каждые пять секунд. Однако, когда я запускаю его, требуется около 12 f и 12 g, прежде чем я начну видеть некоторые h. Похоже, что первые два потока постоянно освобождают и повторно получают свои блокировки, в то время как третий поток остается вне цикла.

  1. Почему это? Когда третий поток пытается получить удерживаемую в данный момент блокировку, и затем она освобождается, не должно ли получение немедленно завершиться успешно вместо того, чтобы первый / второй поток немедленно получал его снова? Вероятно, я что-то недопонимаю.
  2. Какой был бы хороший способ достичь того, чего я хочу?

Примечание: перемещение time.sleep(1) вызовов из блока with flock / glock работает для этого простого примера, но, по-видимому, не для моего реального приложения, где потоки тратят большую часть своего времени на выполнение реальных операций. Когда первые два потока переходят в режим ожидания на секунду после каждого выполнения тела цикла при снятой блокировке, третья задача по-прежнему никогда не выполняется.

Комментарии:

1. В вашем точном коде я вижу ‘h’ каждые 5-6 секунд как в Python2.7, так и в Python 3.2. Я даже экспериментировал с полным удалением спящих режимов и снятием с них блокировки. Какую ОС вы используете? (Я на Windows).

2. Я попробовал Linux 2.6.24-1-amd64 # 1 SMP x86_64 GNU / Linux с Python 2.7.1 и Linux 2.6.32-5-686 # 1 SMP i686 GNU / Linux с Python 2.6.6

Ответ №1:

Как насчет того, чтобы сделать это с обработкой потоков.Мероприятия:

 import threading
import time
import logging

logger=logging.getLogger(__name__)

def f(resume,is_waiting,name):
    while True:
        if not resume.is_set():
            is_waiting.set()
            logger.debug('{n} pausing...'.format(n=name))
            resume.wait()
            is_waiting.clear()
        logger.info(name)
        time.sleep(1)

def h(resume,waiters):
    while True:
        logger.debug('halt') 
        resume.clear()
        for i,w in enumerate(waiters):
            logger.debug('{i}: wait for worker to pause'.format(i=i))
            w.wait()
        logger.info('h begin')
        time.sleep(2)
        logger.info('h end')        
        logger.debug('resume')
        resume.set()
        time.sleep(5)

logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')

# set means resume; clear means halt
resume = threading.Event()
resume.set()

waiters=[]
for name in 'fg':
    is_waiting=threading.Event()
    waiters.append(is_waiting)
    threading.Thread(target=f,args=(resume,is_waiting,name)).start()    
threading.Thread(target=h,args=(resume,waiters)).start()
  

выдает

 [07:28:55 Thread-1] f
[07:28:55 Thread-2] g
[07:28:55 Thread-3] halt
[07:28:55 Thread-3] 0: wait for worker to pause
[07:28:56 Thread-1] f pausing...
[07:28:56 Thread-2] g pausing...
[07:28:56 Thread-3] 1: wait for worker to pause
[07:28:56 Thread-3] h begin
[07:28:58 Thread-3] h end
[07:28:58 Thread-3] resume
[07:28:58 Thread-1] f
[07:28:58 Thread-2] g
[07:28:59 Thread-1] f
[07:28:59 Thread-2] g
[07:29:00 Thread-1] f
[07:29:00 Thread-2] g
[07:29:01 Thread-1] f
[07:29:01 Thread-2] g
[07:29:02 Thread-1] f
[07:29:02 Thread-2] g
[07:29:03 Thread-3] halt
  

(В ответ на вопрос в комментариях) Этот код пытается измерить, сколько времени требуется h -потоку для получения каждой блокировки от других рабочих потоков.

Кажется, это показывает, что даже если h ожидает получения блокировки, другой рабочий поток может с довольно высокой вероятностью освободить и повторно получить блокировку. Приоритет не присваивается h только потому, что он ждал дольше.

Дэвид Бизли выступил на PyCon с докладом о проблемах, связанных с потоковой обработкой и GIL. Вот PDF-файл слайдов. Это увлекательное чтение, и оно также может помочь объяснить это.

 import threading
import time
import logging

logger=logging.getLogger(__name__)

def f(lock,n):
    while True:
        with lock:
            logger.info(n)
            time.sleep(1)

def h(locks):
    while True:
        t=time.time()
        for n,lock in enumerate(locks):
            lock.acquire()
            t2=time.time()
            logger.info('h acquired {n}: {d}'.format(n=n,d=t2-t))
            t=t2
        t2=time.time()
        logger.info('h {d}'.format(d=t2-t))
        t=t2
        for lock in locks:
            lock.release()
        time.sleep(5)

logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')

locks=[]
N=5
for n in range(N):
    lock=threading.Lock()
    locks.append(lock)
    t=threading.Thread(target=f,args=(lock,n))
    t.start()

threading.Thread(target=h,args=(locks,)).start()
  

Комментарии:

1. Я все еще хотел бы понять, почему мой исходный код работает не так, как ожидалось. Действительно ли это так, что новый поток lock.acquire() может немедленно завершиться успешно, в то время как другой поток lock.acquire() уже блокируется?

2. Мое понимание потоков в основном эмпирическое. Я не думаю, что смогу объяснить это на глубоком уровне. Я публикую некоторый код, который, как мне кажется, может пролить немного света на происходящее. Чтобы h-поток получил, скажем, flock, h-поток должен получить GIL и в то время, когда f-поток выпустил flock. Механизм того, как это происходит, я не могу подробно объяснить. Но код (я опубликую выше) показывает, что это происходит не с желаемой частотой.

3. Продолжая размышлять над моим вопросом, да, конечно, это может произойти — в конце концов, нет никаких гарантий относительно порядка, в котором удовлетворяются запросы на получение блокировки. На самом деле я предполагал, что когда блокировка снимается в потоке T и некоторые запросы на получение блокировки в данный момент блокируются, по крайней мере, один из этих запросов будет удовлетворен до того, как что-либо еще произойдет в T. Но, по-видимому, это тоже не гарантировано.

Ответ №2:

Самый простой способ сделать это с помощью 3 процессов Python. Если вы делаете это в Linux, то ежечасный процесс может посылать сигнал, чтобы приостановить выполнение других задач, или вы могли бы даже прервать их, а затем перезапустить после завершения ежечасной задачи. Нет необходимости в потоках.

Однако, если вы полны решимости использовать потоки, тогда постарайтесь вообще не передавать данные между потоками, просто отправляйте сообщения туда и обратно (также известное как копирование данных, а не совместное использование данных). Потоковую обработку сложно выполнить правильно.

Но из-за множества процессов вы не должны делиться ничем, и поэтому это намного проще сделать правильно. Если вы используете библиотеку, подобную 0MQ http://www.zeromq.org чтобы выполнить передачу вашего сообщения, легко перейти от потоковой модели к модели с несколькими процессами.

Комментарии:

1. Я боюсь, что моим потокам действительно нужно обмениваться некоторыми данными, вот почему я решил использовать потоки, а не процессы в первую очередь.

2. Вы уверены, что не можете поделиться необходимыми в данный момент данными каким-либо другим способом? Один из способов — отправить копию в сообщении, но другой способ, который я использовал, — поместить общие данные в memcache. Это тот случай, когда реструктуризация программы упрощает написание надежного высокопроизводительного кода.

Ответ №3:

Использование связи для синхронизации:

 #!/usr/bin/env python
import threading
import time
from Queue import Empty, Queue

def f(q, c):
    while True:
        try: q.get_nowait(); q.get() # get PAUSE signal      
        except Empty: pass  # no signal, do our thing
        else: q.get()       # block until RESUME signal
        print c,
        time.sleep(1)

def h(queues):
    while True:
        for q in queues:
            q.put_nowait(1); q.put(1) # block until PAUSE received
        print 'h'
        for q in queues:
            q.put(1) # put RESUME
        time.sleep(5)

queues = [Queue(1) for _ in range(2)]
threading.Thread(target=f, args=(queues[0], 'f')).start()
threading.Thread(target=f, args=(queues[1], 'g')).start()
threading.Thread(target=h, args=(queues,)).start()
  

Это может быть неоптимально с точки зрения производительности для вас, но я нахожу, что за этим гораздо легче следить.

Вывод

 f g
f g h
f g f g g f f g g f g f f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
  

Комментарии:

1. Не угрожает ли это одновременному выполнению print c, и print 'h' ? Например, когда поток H помещает сигнал pause в очередь, а затем продолжает выполнять свою работу, два других потока могут все еще работать, верно? Это то, чего я хочу избежать.

2. @ke.: да. Это возможно. Вы сказали, что потоки ‘h’ запускаются раз в час. Я разрешил потокам перекрываться в течение одного цикла. (из-за этого Queue() не может быть maxsize=0 , это означает, что оно неограниченно. Итак, вам нужно put() дважды заблокировать Queue с maxsize=1 помощью. Я обновил ответ.

Ответ №4:

Как насчет семафора, инициализированного на 2? F и G ожидают и сигнализируют об одном блоке, H ожидает и сигнализирует о двух блоках.

Комментарии:

1. Вы имеете в виду, как в приведенном выше примере, но замените все ссылки на flock и glock ссылками на один объект semaphore, созданный с использованием многопоточности. Семафор (2)? Я боюсь, что результирующая программа демонстрирует ту же проблему.

2. Хм .. Я ожидал повышения производительности, потому что H ожидает любой релиз, а не только по одному за раз. Затем я бы попробовал понизить приоритет F, G или повысить H, чтобы, когда модуль становится свободным, H запускался и получал модуль вместо F или G. H должен ждать свои 2 модуля в двух циклах, т. Е. два вызова для одного модуля, а не один вызов для двух модулей.

3. В Python нет простого способа установить приоритеты потоков, не так ли?

4. Кроме того, я не уверен, что вы подразумеваете под «ожиданием его 2 единиц в двух циклах». Что я пробовал, так это: с помощью семафора: с помощью семафора: напечатать ‘h’, это то, что вы имеете в виду?

Ответ №5:

Как насчет этого подхода (хотя и спорного, потому что я знаю, что «глобальные» переменные предположительно являются большим запретом, когда дело доходит до потоковой обработки (новичок — значит, все еще учится)…

 import threading, time


import threading, time

def f():
    global BL
    while True:
        BL = 'f' if BL == -1 else BL
        if BL == 'f':
            print('f')
            BL = -1
            ss(0.1)

def g():
    global BL
    while True:
        BL = 'g' if BL == -1 else BL
        if BL == 'g':
            print('g')
            BL = -1
            ss(0.1)

def h():
    global BL
    while True:
        BL = 'h' if BL == -1 and (tt() - start) % delay_3rd <= 0.1 and (tt()-start) > 1 else BL
        if (BL == 'h'):
           print('h')
           print(f' seconds: {round(tt() - start,None)}!!'*100)
           BL = -1
           ss(0.1)


BL, delay_3rd, [ss], [tt]  = -1, 5, [time.sleep], [time.time]
start = tt()
  

третий будет выполняться каждую секунду (вы могли бы сделать delay_3rd = 3600 для
часовые интервалы; в то время как первые два выполняются всегда (в соответствии с вашими
запрос / намерение)

 threading.Thread(target=f).start()
threading.Thread(target=g).start()
threading.Thread(target=h).start()
  

(вывод примерно через 4-5 секунд выполнения …)

f

ч секунд: 5!!

g

f

g

f

f

g

f

h

g

f

g

ч секунд: 6!!

f

g

f

g

f

g

f

g

f

g

f

g

f

h

секунд: 7!!

g

f

g

(обратите внимание, что h появляется только каждую секунду; f amp; g появляются периодически повсюду …)

Комментарии:

1. (т. Е. настолько близкий «код» к вашему исходному представлению — заменяет flock / glock глобальными переменными, и это все, что нужно на самом деле.. о да, и я включаю оператор ‘if’, т. е. BL = ‘f / g / h’, если BL == -1, иначе BL….