#python #multiprocessing #paramiko
#питон #многопроцессорная обработка #парамико
Вопрос:
Я использую paramiko
для создания ssh-клиентов и хочу, чтобы они выполняли команды одновременно.
import paramiko from multiprocessing import Queue, Process from paramiko import SSHClient, BadHostKeyException, AuthenticationException, SSHException class ConnectionWorker(): def __init__(self, commands: Queue, results: Queue, config, logger): self.config = config self.commands = commands self.results = results def _init_runner(self): self.client = SSHClient() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.client.connect( self._config['host'], (int)(self._config['port']), self._config['username'], self._config['password']) self.conn = self.client.get_transport().open_session() self.conn.get_pty() self.conn.invoke_shell() def main(self): self._init_runner() while True: value = self.commands.get() if value == None: self.results.put(None) break result = self.run(value) if (result != None): self.results.put(result) def run(self, cmd: dict): result = self.conn.send(cmd["command"]) return result if __name__ == "__main__": config = [ """ Servers' config """ ] processes = [] for server in config: commands = Queue() results = Queue() instance = ConnectionWorker(commands, results, server) processes.append(Process(target=instance.main)) """ Put commands to queues respectively """ for process in processes: process.start() for process in processes: process.join()
Он отлично работает при выполнении команд. Но эти процессы никогда не объединятся, когда они закончат свою работу. Я должен позвонить KeyboardInterrupt
:
^CProcess Process-1: Traceback (most recent call last): File "/home/lymo/Projects/DAAC/daac_test/src/RerunAll.py", line 20, in lt;modulegt; rerun_mgr.run() File "/home/lymo/Projects/DAAC/daac_test/src/WorkerMgr.py", line 99, in run process.join() File "/usr/lib/python3.6/multiprocessing/process.py", line 124, in join res = self._popen.wait(timeout) File "/usr/lib/python3.6/multiprocessing/popen_fork.py", line 50, in wait return self.poll(os.WNOHANG if timeout == 0.0 else 0) File "/usr/lib/python3.6/multiprocessing/popen_fork.py", line 28, in poll pid, sts = os.waitpid(self.pid, flag) Traceback (most recent call last): KeyboardInterrupt File "/usr/lib/python3.6/multiprocessing/process.py", line 261, in _bootstrap util._exit_function() File "/usr/lib/python3.6/multiprocessing/util.py", line 322, in _exit_function _run_finalizers() File "/usr/lib/python3.6/multiprocessing/util.py", line 262, in _run_finalizers finalizer() File "/usr/lib/python3.6/multiprocessing/util.py", line 186, in __call__ res = self._callback(*self._args, **self._kwargs) File "/usr/lib/python3.6/multiprocessing/queues.py", line 191, in _finalize_join thread.join() File "/usr/lib/python3.6/threading.py", line 1056, in join self._wait_for_tstate_lock() File "/usr/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock elif lock.acquire(block, timeout): KeyboardInterrupt
Это немного странно, когда я создаю только один процесс, и он отлично работает.
Любая помощь будет очень признательна. Заранее спасибо.
Комментарии:
1. Теперь я думаю, что это в основном потому, что кэш в очереди результатов никогда не очищается. После того, как я временно удаляю очереди результатов, это работает нормально.