#python #multithreading #locks
#python #многопоточность #блокировки
Вопрос:
Я новичок в параллельном программировании.
Я хотел бы выполнить три задачи повторно. Первые два должны выполняться постоянно, третий должен выполняться каждый час или около того. Первые две задачи могут выполняться параллельно, но я всегда хочу приостановить их, пока выполняется третья задача.
Вот скелет того, что я пробовал:
import threading
import time
flock = threading.Lock()
glock = threading.Lock()
def f():
while True:
with flock:
print 'f'
time.sleep(1)
def g():
while True:
with glock:
print 'g'
time.sleep(1)
def h():
while True:
with flock:
with glock:
print 'h'
time.sleep(5)
threading.Thread(target=f).start()
threading.Thread(target=g).start()
threading.Thread(target=h).start()
Я бы ожидал, что этот код будет печатать f и g каждую секунду, и h примерно каждые пять секунд. Однако, когда я запускаю его, требуется около 12 f и 12 g, прежде чем я начну видеть некоторые h. Похоже, что первые два потока постоянно освобождают и повторно получают свои блокировки, в то время как третий поток остается вне цикла.
- Почему это? Когда третий поток пытается получить удерживаемую в данный момент блокировку, и затем она освобождается, не должно ли получение немедленно завершиться успешно вместо того, чтобы первый / второй поток немедленно получал его снова? Вероятно, я что-то недопонимаю.
- Какой был бы хороший способ достичь того, чего я хочу?
Примечание: перемещение time.sleep(1)
вызовов из блока with flock / glock работает для этого простого примера, но, по-видимому, не для моего реального приложения, где потоки тратят большую часть своего времени на выполнение реальных операций. Когда первые два потока переходят в режим ожидания на секунду после каждого выполнения тела цикла при снятой блокировке, третья задача по-прежнему никогда не выполняется.
Комментарии:
1. В вашем точном коде я вижу ‘h’ каждые 5-6 секунд как в Python2.7, так и в Python 3.2. Я даже экспериментировал с полным удалением спящих режимов и снятием с них блокировки. Какую ОС вы используете? (Я на Windows).
2. Я попробовал Linux 2.6.24-1-amd64 # 1 SMP x86_64 GNU / Linux с Python 2.7.1 и Linux 2.6.32-5-686 # 1 SMP i686 GNU / Linux с Python 2.6.6
Ответ №1:
Как насчет того, чтобы сделать это с обработкой потоков.Мероприятия:
import threading
import time
import logging
logger=logging.getLogger(__name__)
def f(resume,is_waiting,name):
while True:
if not resume.is_set():
is_waiting.set()
logger.debug('{n} pausing...'.format(n=name))
resume.wait()
is_waiting.clear()
logger.info(name)
time.sleep(1)
def h(resume,waiters):
while True:
logger.debug('halt')
resume.clear()
for i,w in enumerate(waiters):
logger.debug('{i}: wait for worker to pause'.format(i=i))
w.wait()
logger.info('h begin')
time.sleep(2)
logger.info('h end')
logger.debug('resume')
resume.set()
time.sleep(5)
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s %(threadName)s] %(message)s',
datefmt='%H:%M:%S')
# set means resume; clear means halt
resume = threading.Event()
resume.set()
waiters=[]
for name in 'fg':
is_waiting=threading.Event()
waiters.append(is_waiting)
threading.Thread(target=f,args=(resume,is_waiting,name)).start()
threading.Thread(target=h,args=(resume,waiters)).start()
выдает
[07:28:55 Thread-1] f
[07:28:55 Thread-2] g
[07:28:55 Thread-3] halt
[07:28:55 Thread-3] 0: wait for worker to pause
[07:28:56 Thread-1] f pausing...
[07:28:56 Thread-2] g pausing...
[07:28:56 Thread-3] 1: wait for worker to pause
[07:28:56 Thread-3] h begin
[07:28:58 Thread-3] h end
[07:28:58 Thread-3] resume
[07:28:58 Thread-1] f
[07:28:58 Thread-2] g
[07:28:59 Thread-1] f
[07:28:59 Thread-2] g
[07:29:00 Thread-1] f
[07:29:00 Thread-2] g
[07:29:01 Thread-1] f
[07:29:01 Thread-2] g
[07:29:02 Thread-1] f
[07:29:02 Thread-2] g
[07:29:03 Thread-3] halt
(В ответ на вопрос в комментариях) Этот код пытается измерить, сколько времени требуется h
-потоку для получения каждой блокировки от других рабочих потоков.
Кажется, это показывает, что даже если h
ожидает получения блокировки, другой рабочий поток может с довольно высокой вероятностью освободить и повторно получить блокировку. Приоритет не присваивается h
только потому, что он ждал дольше.
Дэвид Бизли выступил на PyCon с докладом о проблемах, связанных с потоковой обработкой и GIL. Вот PDF-файл слайдов. Это увлекательное чтение, и оно также может помочь объяснить это.
import threading
import time
import logging
logger=logging.getLogger(__name__)
def f(lock,n):
while True:
with lock:
logger.info(n)
time.sleep(1)
def h(locks):
while True:
t=time.time()
for n,lock in enumerate(locks):
lock.acquire()
t2=time.time()
logger.info('h acquired {n}: {d}'.format(n=n,d=t2-t))
t=t2
t2=time.time()
logger.info('h {d}'.format(d=t2-t))
t=t2
for lock in locks:
lock.release()
time.sleep(5)
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s %(threadName)s] %(message)s',
datefmt='%H:%M:%S')
locks=[]
N=5
for n in range(N):
lock=threading.Lock()
locks.append(lock)
t=threading.Thread(target=f,args=(lock,n))
t.start()
threading.Thread(target=h,args=(locks,)).start()
Комментарии:
1. Я все еще хотел бы понять, почему мой исходный код работает не так, как ожидалось. Действительно ли это так, что новый поток
lock.acquire()
может немедленно завершиться успешно, в то время как другой потокlock.acquire()
уже блокируется?2. Мое понимание потоков в основном эмпирическое. Я не думаю, что смогу объяснить это на глубоком уровне. Я публикую некоторый код, который, как мне кажется, может пролить немного света на происходящее. Чтобы h-поток получил, скажем, flock, h-поток должен получить GIL и в то время, когда f-поток выпустил flock. Механизм того, как это происходит, я не могу подробно объяснить. Но код (я опубликую выше) показывает, что это происходит не с желаемой частотой.
3. Продолжая размышлять над моим вопросом, да, конечно, это может произойти — в конце концов, нет никаких гарантий относительно порядка, в котором удовлетворяются запросы на получение блокировки. На самом деле я предполагал, что когда блокировка снимается в потоке T и некоторые запросы на получение блокировки в данный момент блокируются, по крайней мере, один из этих запросов будет удовлетворен до того, как что-либо еще произойдет в T. Но, по-видимому, это тоже не гарантировано.
Ответ №2:
Самый простой способ сделать это с помощью 3 процессов Python. Если вы делаете это в Linux, то ежечасный процесс может посылать сигнал, чтобы приостановить выполнение других задач, или вы могли бы даже прервать их, а затем перезапустить после завершения ежечасной задачи. Нет необходимости в потоках.
Однако, если вы полны решимости использовать потоки, тогда постарайтесь вообще не передавать данные между потоками, просто отправляйте сообщения туда и обратно (также известное как копирование данных, а не совместное использование данных). Потоковую обработку сложно выполнить правильно.
Но из-за множества процессов вы не должны делиться ничем, и поэтому это намного проще сделать правильно. Если вы используете библиотеку, подобную 0MQ http://www.zeromq.org чтобы выполнить передачу вашего сообщения, легко перейти от потоковой модели к модели с несколькими процессами.
Комментарии:
1. Я боюсь, что моим потокам действительно нужно обмениваться некоторыми данными, вот почему я решил использовать потоки, а не процессы в первую очередь.
2. Вы уверены, что не можете поделиться необходимыми в данный момент данными каким-либо другим способом? Один из способов — отправить копию в сообщении, но другой способ, который я использовал, — поместить общие данные в memcache. Это тот случай, когда реструктуризация программы упрощает написание надежного высокопроизводительного кода.
Ответ №3:
Использование связи для синхронизации:
#!/usr/bin/env python
import threading
import time
from Queue import Empty, Queue
def f(q, c):
while True:
try: q.get_nowait(); q.get() # get PAUSE signal
except Empty: pass # no signal, do our thing
else: q.get() # block until RESUME signal
print c,
time.sleep(1)
def h(queues):
while True:
for q in queues:
q.put_nowait(1); q.put(1) # block until PAUSE received
print 'h'
for q in queues:
q.put(1) # put RESUME
time.sleep(5)
queues = [Queue(1) for _ in range(2)]
threading.Thread(target=f, args=(queues[0], 'f')).start()
threading.Thread(target=f, args=(queues[1], 'g')).start()
threading.Thread(target=h, args=(queues,)).start()
Это может быть неоптимально с точки зрения производительности для вас, но я нахожу, что за этим гораздо легче следить.
Вывод
f g
f g h
f g f g g f f g g f g f f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
Комментарии:
1. Не угрожает ли это одновременному выполнению
print c,
иprint 'h'
? Например, когда поток H помещает сигнал pause в очередь, а затем продолжает выполнять свою работу, два других потока могут все еще работать, верно? Это то, чего я хочу избежать.2. @ke.: да. Это возможно. Вы сказали, что потоки ‘h’ запускаются раз в час. Я разрешил потокам перекрываться в течение одного цикла. (из-за этого
Queue()
не может бытьmaxsize=0
, это означает, что оно неограниченно. Итак, вам нужноput()
дважды заблокироватьQueue
сmaxsize=1
помощью. Я обновил ответ.
Ответ №4:
Как насчет семафора, инициализированного на 2? F и G ожидают и сигнализируют об одном блоке, H ожидает и сигнализирует о двух блоках.
Комментарии:
1. Вы имеете в виду, как в приведенном выше примере, но замените все ссылки на flock и glock ссылками на один объект semaphore, созданный с использованием многопоточности. Семафор (2)? Я боюсь, что результирующая программа демонстрирует ту же проблему.
2. Хм .. Я ожидал повышения производительности, потому что H ожидает любой релиз, а не только по одному за раз. Затем я бы попробовал понизить приоритет F, G или повысить H, чтобы, когда модуль становится свободным, H запускался и получал модуль вместо F или G. H должен ждать свои 2 модуля в двух циклах, т. Е. два вызова для одного модуля, а не один вызов для двух модулей.
3. В Python нет простого способа установить приоритеты потоков, не так ли?
4. Кроме того, я не уверен, что вы подразумеваете под «ожиданием его 2 единиц в двух циклах». Что я пробовал, так это: с помощью семафора: с помощью семафора: напечатать ‘h’, это то, что вы имеете в виду?
Ответ №5:
Как насчет этого подхода (хотя и спорного, потому что я знаю, что «глобальные» переменные предположительно являются большим запретом, когда дело доходит до потоковой обработки (новичок — значит, все еще учится)…
import threading, time
import threading, time
def f():
global BL
while True:
BL = 'f' if BL == -1 else BL
if BL == 'f':
print('f')
BL = -1
ss(0.1)
def g():
global BL
while True:
BL = 'g' if BL == -1 else BL
if BL == 'g':
print('g')
BL = -1
ss(0.1)
def h():
global BL
while True:
BL = 'h' if BL == -1 and (tt() - start) % delay_3rd <= 0.1 and (tt()-start) > 1 else BL
if (BL == 'h'):
print('h')
print(f' seconds: {round(tt() - start,None)}!!'*100)
BL = -1
ss(0.1)
BL, delay_3rd, [ss], [tt] = -1, 5, [time.sleep], [time.time]
start = tt()
третий будет выполняться каждую секунду (вы могли бы сделать delay_3rd = 3600 для
часовые интервалы; в то время как первые два выполняются всегда (в соответствии с вашими
запрос / намерение)
threading.Thread(target=f).start()
threading.Thread(target=g).start()
threading.Thread(target=h).start()
(вывод примерно через 4-5 секунд выполнения …)
f
ч секунд: 5!!
g
f
g
f
f
g
f
h
g
f
g
ч секунд: 6!!
f
g
f
g
f
g
f
g
f
g
f
g
f
h
секунд: 7!!
g
f
g
(обратите внимание, что h появляется только каждую секунду; f amp; g появляются периодически повсюду …)
Комментарии:
1. (т. Е. настолько близкий «код» к вашему исходному представлению — заменяет flock / glock глобальными переменными, и это все, что нужно на самом деле.. о да, и я включаю оператор ‘if’, т. е. BL = ‘f / g / h’, если BL == -1, иначе BL….