Поток Python иногда не возвращает

#python #multithreading

#python #многопоточность

Вопрос:

Я работаю над многопоточным скриптом Python, который принимает список имен файлов и помещает их в очередь. Большую часть времени это работает, однако я иногда обнаруживаю, что он застрял, и ‘ps -efL’ покажет два потока, открытых для скрипта python. Я последовал за ним с помощью strace, и 5 из 6 потоков вернулись, но один просто зависает в ожидании futex вечно.

Вот блок кода, о котором идет речь.

 threads = 6 

for fileName in fileNames:
  queue.put(fileName)

for i in range(threads):
  t = threading.Thread(target=get_backup_list, args=(queue,dbCreds,arguments.verbose,arguments.vault))
  activeThreads.append(t)
  t.start()

for activeThread in activeThreads:
  activeThread.join()


def get_backup_list(queue,dbCreds,verbosity,vault):
  backupFiles = []

  while True:
    if queue.empty() == True:
      return 
    fileName = queue.get()
    try:
      fileInfo = lookup_file_by_path(fileName,dbCreds,vault)
      if not fileInfo:
        start = time.time()
        attributes = get_attributes(fileName,verbosity)
        end = time.time() - start
        if verbosity: print("finished in ")   str(end)   (" seconds")
        insert_file(attributes,dbCreds,vault)
        fileInfo = lookup_file_by_path(fileName,dbCreds,vault)

    except Exception, e:
      print("error on "   fileName   " "   str(e))

  return

def lookup_file_by_path(path,dbCreds,vault):
  attributes = {}
  conn = mdb.connect(dbCreds['server'] , dbCreds['user'], dbCreds['password'], dbCreds['database'], cursorclass=MySQLdb.cursors.DictCursor);
  c = conn.cursor()
  c.execute('''SELECT * FROM {} where path = "%s" '''.format(vault) % ( path ) )
  data = c.fetchone()
  if data:
    for key in data.keys():
      attributes[key] = data[key]
  conn.close
  return attributes
  

Я делаю здесь что-то принципиально неправильное, что вызывает состояние гонки? Или есть что-то еще, чего мне не хватает.

Спасибо, Thomas C

Комментарии:

1. Ну, conn.close не хватает пары круглых скобок, и вызывать queue.empty вместо того, чтобы просто перейти прямо к get (возможно, с block=False помощью), как правило, плохая идея.

Ответ №1:

В вашем коде есть условие гонки:

 while True:
     if queue.empty() == True:
        return 
     fileName = queue.get()
  

Сначала потоки проверяют, пуста ли очередь. Если это не так, он пытается заблокировать get . Однако за время между вызовом queue.empty() и queue.get другой поток мог использовать последний элемент из очереди, что означает, что get вызов будет заблокирован навсегда. Вы должны сделать это вместо:

 try:
    fileName = queue.get_nowait()
except Queue.Empty:
    return
  

Если это не решает проблему, вы можете просто добавить несколько print инструкций в метод threaded, чтобы точно определить, где он застрял, и перейти оттуда. Однако у меня нет других проблем с параллелизмом.

Редактировать:

Кроме того, то, что вы здесь делаете, могло бы быть более четко реализовано как ThreadPool или multiprocessing.Pool :

 from multiprocessing.pool import ThreadPool
from functools import partial

def get_backup_list(dbCreds, verbosity, vault, fileName):
  backupFiles = []
  fileInfo = lookup_file_by_path(fileName,dbCreds,vault)
  ...

if __name__ == "__main__":
    pool = ThreadPool(6) # You could use a multiprocessing.Pool, too
    func = partial(get_backup_list, dbCreds, arguments.verbose, arguments.vault)
    pool.map(func, fileNames)
    pool.close()
    pool.join()
  

В зависимости от того, сколько работы выполняет каждый вызов get_backup_list , вы можете обнаружить, что он работает лучше как multiprocessing.Pool , потому что он способен обойти глобальную блокировку интерпретатора (GIL), которая предотвращает одновременное выполнение потоков Python на всех ядрах процессора. Похоже, что ваш код, вероятно, привязан к вводу-выводу, так что ThreadPool может сработать просто отлично.

Комментарии:

1. Бьюсь об заклад, что это то, что происходит, позвольте мне протестировать это несколько раз.

2. Это было исправлено, спасибо! Я смотрел на ThreadPool, когда запускал это, но я не был уверен, какой путь лучше всего использовать. Я попробую еще раз.