#python #multithreading
#python #многопоточность
Вопрос:
Я работаю над многопоточным скриптом Python, который принимает список имен файлов и помещает их в очередь. Большую часть времени это работает, однако я иногда обнаруживаю, что он застрял, и ‘ps -efL’ покажет два потока, открытых для скрипта python. Я последовал за ним с помощью strace, и 5 из 6 потоков вернулись, но один просто зависает в ожидании futex вечно.
Вот блок кода, о котором идет речь.
threads = 6
for fileName in fileNames:
queue.put(fileName)
for i in range(threads):
t = threading.Thread(target=get_backup_list, args=(queue,dbCreds,arguments.verbose,arguments.vault))
activeThreads.append(t)
t.start()
for activeThread in activeThreads:
activeThread.join()
def get_backup_list(queue,dbCreds,verbosity,vault):
backupFiles = []
while True:
if queue.empty() == True:
return
fileName = queue.get()
try:
fileInfo = lookup_file_by_path(fileName,dbCreds,vault)
if not fileInfo:
start = time.time()
attributes = get_attributes(fileName,verbosity)
end = time.time() - start
if verbosity: print("finished in ") str(end) (" seconds")
insert_file(attributes,dbCreds,vault)
fileInfo = lookup_file_by_path(fileName,dbCreds,vault)
except Exception, e:
print("error on " fileName " " str(e))
return
def lookup_file_by_path(path,dbCreds,vault):
attributes = {}
conn = mdb.connect(dbCreds['server'] , dbCreds['user'], dbCreds['password'], dbCreds['database'], cursorclass=MySQLdb.cursors.DictCursor);
c = conn.cursor()
c.execute('''SELECT * FROM {} where path = "%s" '''.format(vault) % ( path ) )
data = c.fetchone()
if data:
for key in data.keys():
attributes[key] = data[key]
conn.close
return attributes
Я делаю здесь что-то принципиально неправильное, что вызывает состояние гонки? Или есть что-то еще, чего мне не хватает.
Спасибо, Thomas C
Комментарии:
1. Ну,
conn.close
не хватает пары круглых скобок, и вызыватьqueue.empty
вместо того, чтобы просто перейти прямо кget
(возможно, сblock=False
помощью), как правило, плохая идея.
Ответ №1:
В вашем коде есть условие гонки:
while True:
if queue.empty() == True:
return
fileName = queue.get()
Сначала потоки проверяют, пуста ли очередь. Если это не так, он пытается заблокировать get
. Однако за время между вызовом queue.empty()
и queue.get
другой поток мог использовать последний элемент из очереди, что означает, что get
вызов будет заблокирован навсегда. Вы должны сделать это вместо:
try:
fileName = queue.get_nowait()
except Queue.Empty:
return
Если это не решает проблему, вы можете просто добавить несколько print
инструкций в метод threaded, чтобы точно определить, где он застрял, и перейти оттуда. Однако у меня нет других проблем с параллелизмом.
Редактировать:
Кроме того, то, что вы здесь делаете, могло бы быть более четко реализовано как ThreadPool
или multiprocessing.Pool
:
from multiprocessing.pool import ThreadPool
from functools import partial
def get_backup_list(dbCreds, verbosity, vault, fileName):
backupFiles = []
fileInfo = lookup_file_by_path(fileName,dbCreds,vault)
...
if __name__ == "__main__":
pool = ThreadPool(6) # You could use a multiprocessing.Pool, too
func = partial(get_backup_list, dbCreds, arguments.verbose, arguments.vault)
pool.map(func, fileNames)
pool.close()
pool.join()
В зависимости от того, сколько работы выполняет каждый вызов get_backup_list
, вы можете обнаружить, что он работает лучше как multiprocessing.Pool
, потому что он способен обойти глобальную блокировку интерпретатора (GIL), которая предотвращает одновременное выполнение потоков Python на всех ядрах процессора. Похоже, что ваш код, вероятно, привязан к вводу-выводу, так что ThreadPool
может сработать просто отлично.
Комментарии:
1. Бьюсь об заклад, что это то, что происходит, позвольте мне протестировать это несколько раз.
2. Это было исправлено, спасибо! Я смотрел на ThreadPool, когда запускал это, но я не был уверен, какой путь лучше всего использовать. Я попробую еще раз.