Многопроцессорная обработка Python.Очередь никогда не присоединяется при использовании paramiko для создания нескольких ssh-клиентов

#python #multiprocessing #paramiko

#питон #многопроцессорная обработка #парамико

Вопрос:

Я использую paramiko для создания ssh-клиентов и хочу, чтобы они выполняли команды одновременно.

 import paramiko from multiprocessing import Queue, Process from paramiko import SSHClient, BadHostKeyException, AuthenticationException, SSHException  class ConnectionWorker():  def __init__(self, commands: Queue, results: Queue, config, logger):  self.config = config  self.commands = commands  self.results = results   def _init_runner(self):  self.client = SSHClient()  self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())  self.client.connect(  self._config['host'], (int)(self._config['port']), self._config['username'], self._config['password'])  self.conn = self.client.get_transport().open_session()  self.conn.get_pty()  self.conn.invoke_shell()    def main(self):  self._init_runner()   while True:  value = self.commands.get()  if value == None:  self.results.put(None)  break  result = self.run(value)  if (result != None):  self.results.put(result)   def run(self, cmd: dict):  result = self.conn.send(cmd["command"])  return result  if __name__ == "__main__":  config = [  """  Servers' config  """  ]  processes = []  for server in config:  commands = Queue()  results = Queue()  instance = ConnectionWorker(commands, results, server)  processes.append(Process(target=instance.main))    """  Put commands to queues respectively  """   for process in processes:  process.start()   for process in processes:  process.join()    

Он отлично работает при выполнении команд. Но эти процессы никогда не объединятся, когда они закончат свою работу. Я должен позвонить KeyboardInterrupt :

 ^CProcess Process-1: Traceback (most recent call last):  File "/home/lymo/Projects/DAAC/daac_test/src/RerunAll.py", line 20, in lt;modulegt;  rerun_mgr.run()  File "/home/lymo/Projects/DAAC/daac_test/src/WorkerMgr.py", line 99, in run  process.join()  File "/usr/lib/python3.6/multiprocessing/process.py", line 124, in join  res = self._popen.wait(timeout)  File "/usr/lib/python3.6/multiprocessing/popen_fork.py", line 50, in wait  return self.poll(os.WNOHANG if timeout == 0.0 else 0)  File "/usr/lib/python3.6/multiprocessing/popen_fork.py", line 28, in poll  pid, sts = os.waitpid(self.pid, flag) Traceback (most recent call last): KeyboardInterrupt  File "/usr/lib/python3.6/multiprocessing/process.py", line 261, in _bootstrap  util._exit_function()  File "/usr/lib/python3.6/multiprocessing/util.py", line 322, in _exit_function  _run_finalizers()  File "/usr/lib/python3.6/multiprocessing/util.py", line 262, in _run_finalizers  finalizer()  File "/usr/lib/python3.6/multiprocessing/util.py", line 186, in __call__  res = self._callback(*self._args, **self._kwargs)  File "/usr/lib/python3.6/multiprocessing/queues.py", line 191, in _finalize_join  thread.join()  File "/usr/lib/python3.6/threading.py", line 1056, in join  self._wait_for_tstate_lock()  File "/usr/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock  elif lock.acquire(block, timeout): KeyboardInterrupt  

Это немного странно, когда я создаю только один процесс, и он отлично работает.

Любая помощь будет очень признательна. Заранее спасибо.

Комментарии:

1. Теперь я думаю, что это в основном потому, что кэш в очереди результатов никогда не очищается. После того, как я временно удаляю очереди результатов, это работает нормально.