Сбой блокировки потока в dead-простой пример

#python #python-3.x #thread-safety

#python #python-3.x #безопасность потоков

Вопрос:

Это самый простой игрушечный пример. Я знаю о concurrent.futures и коде более высокого уровня; Я выбираю игрушечный пример, потому что я его преподаю (как часть того же материала с материалом высокого уровня).

Он переходит на счетчик из разных потоков, и я получаю… ну, здесь это еще более странно. Обычно я получаю счетчик меньше, чем должен (например, 5 м), обычно намного меньше, чем 20 тыс. Но по мере того, как я уменьшаю количество циклов, при некотором числе, например, 1000, оно неизменно правильное. Затем при некотором промежуточном числе я получаю почти правильное, иногда правильное, но время от времени немного больше, чем произведение nthread x nloop. Я запускаю его повторно в ячейке Jupyter, но первая строка действительно должна сбрасывать счетчик на ноль, а не сохранять какой-либо старый итог.

 lock = threading.Lock()
counter, nthread, nloop = 0, 100, 50_000 

def increment(n, lock):
    global counter
    for _ in range(n):
        lock.acquire()
        counter  = 1
        lock.release()

for _ in range(nthread):
    t = Thread(target=increment, args=(nloop, lock))
    t.start()
    
print(f"{nloop:,} loops X {nthread:,} threads -> counter is {counter:,}")
  

Если я добавлю .join() изменения в поведении, но по-прежнему неверно. Например, в версии, которая не пытается заблокировать:

 counter, nthread, nloop = 0, 100, 50_000 

def increment(n):
    global counter
    for _ in range(n):
        counter  = 1

for _ in range(nthread):
    t = Thread(target=increment, args=(nloop,))
    t.start()
    t.join()
    
print(f"{nloop:,} loops X {nthread:,} threads -> counter is {counter:,}")
# --> 50,000 loops X 100 threads -> counter is 5,022,510
  

Точное превышение варьируется, но я вижу что-то подобное неоднократно.

Я действительно не хочу .join() этого делать в примере блокировки, потому что я хочу проиллюстрировать идею фонового задания. Но я могу дождаться оживления потока (спасибо, Фрэнк Йеллин!), И это исправляет случай блокировки. Однако меня все еще беспокоит перерасчет.

Ответ №1:

Вы не ждете, пока все ваши потоки будут выполнены, прежде чем смотреть counter . Именно поэтому вы так быстро получаете результат.

     threads = []
    for _ in range(nthread):
        t = threading.Thread(target=increment, args=(nloop, lock))
        t.start()
        threads.append(t)

    for thread in threads:
        thread.join()

    print(f"{nloop:,} loops X {nthread:,} threads -> counter is {counter:,}")
  

выводит ожидаемый результат.

 50,000 loops X 100 threads -> counter is 5,000,000
  

Обновлено. Я настоятельно рекомендую вместо этого использовать ThreadPoolExecutor(), который заботится об отслеживании потоков для вас.

     with ThreadPoolExecutor() as executor:
        for _ in range(nthread):
            executor.submit(increment, nloop, lock)
    print(...)
  

даст вам ответ, который вы хотите, и позаботится об ожидании потоков.

Комментарии:

1. Извините. Можете ли вы отредактировать свою исходную публикацию. Это вышло в основном нечитаемым. Является ли t.join() внутри цикла или вне его? Вам нужно дождаться всех ваших потоков.

2. Тьфу… как мне получить форматированный блок в комментарии? Ох… Я знаю о ThreadPoolExecutor и concurrent.futures. Я преподаю материал, поэтому хочу представить простой материал. Это не то, что я бы написал в своем реальном коде.

3. Вероятно, не может. Вот почему я рекомендовал обновить вашу исходную публикацию. Но находится ли ваш t.join() вне цикла? Вы ожидаете только последнего созданного вами потока, а не всех из них. Нет гарантии, что они завершатся в том порядке, в котором они были созданы.

4. Спасибо, Фрэнк… вы были очень полезны, и я изменил вопрос с помощью форматированного кода и более подробной информации.

5. Ваша версия с t.join() внутренним циклом теперь по сути является однопоточной программой. Вы запускаете поток, а затем ждете его завершения, прежде чем запускать новый. Таким образом, ваш результат имеет еще меньше смысла, поскольку вам не нужна блокировка. Вы перезапустили python? Возможно, некоторые потоки все еще выполняются из предыдущих тестов, поскольку вы не дождались их завершения. Я не уверен, что еще вам сказать.