Многопроцессорная обработка Python и вмешательство в клавиатуру во время фазы возрождения

#exception #multiprocessing #python-3.4 #keyboardinterrupt

Вопрос:

Моя задача состоит в том, чтобы обрабатывать прерывания клавиатуры / CTRL Cво время фазы появления многопроцессорных процессов.

Я написал программу для обнаружения хостов в сети, одновременно пропингуя их, чтобы они были быстрыми. Я быстро столкнулся с проблемами с многопроцессорной обработкой во время прерывания программы нажатием CTRL Cкнопки .

Я уже улучшил программу, используя свою собственную очередь задач вместо функции многопроцессорной обработки карты, чтобы получить больше контроля во время планирования задач и включить обработку исключений, связанных с клавиатурой.

К сожалению, если я попытаюсь прервать выполнение нажатием CTRL Cво время фазы появления рабочих процессов

  • Я получаю исключение «Необработанное вмешательство в клавиатуру», даже если рабочие функции обрабатывают исключения
  • Я получаю другие необработанные исключения

Так что я получаю следы вместо приятного отключения.

Поэтому я предполагаю, что я неправильно понимаю все этапы жизненного цикла многопроцессорного процесса и то, как это влияет на обработку исключений.

Может быть, кто-нибудь может дать мне несколько советов и рассказать, как реализовать блокировку клавиатуры / обработку сигналов таким образом, чтобы я мог контролировать процесс завершения работы программы после нажатия CTRL C.

Пожалуйста, имейте в виду, что я использую Windows и по соображениям совместимости Python 3.4.

 import time
import re
import getopt
import sys

import ipaddress
import socket
import subprocess
import multiprocessing


PROCESSLIMIT = 10

# Configure subprocess to hide the console window
info = subprocess.STARTUPINFO()
info.dwFlags |= subprocess.STARTF_USESHOWWINDOW
info.wShowWindow = subprocess.SW_HIDE

#########################################################
# Section: ping sweep
#########################################################

def pingHost(task_queue, result_queue): 
    retry = 3
    waitTimeMS = 1000

    try:
        # loop while there are tasks in the task_queue
        for ip in iter(task_queue.get, None):
            
            processOutput = subprocess.Popen(['ping', '-n', str(retry), '-w', str(waitTimeMS), ip], stdout=subprocess.PIPE, stderr=subprocess.PIPE, startupinfo=info).communicate()[0]
            processOutput = "".join(map(chr, processOutput))
            
            # code logic if we have/don't have good response
            if "unreachable" in processOutput:
                continue
            elif "Reply" in processOutput or "Antwort" in processOutput or "Respuesta" in processOutput:
                dnsName = None
                try:
                    dnsName = socket.gethostbyaddr(ip)[0]
                except:
                    dnsName = None
                result_queue.put([ip,dnsName])
            else:
                continue
                
    except KeyboardInterrupt:
        print("KeyboardInterrupt on pingHost")
        return
    except Exception as e:
        return
        
    return


def performPingSweep(ipList):
    processList = []
    
    try:
        tempHosts = []
        dnsHostKV = {}

        with multiprocessing.Manager() as manager:
            task_queue = manager.Queue()
            result_queue = manager.Queue()
            
            for ip in ipList:
                task_queue.put(ip)
                
            for i in range(PROCESSLIMIT):
                task_queue.put(None)
            
            # Start worker processes
            for i in range(PROCESSLIMIT):
                processList.append(multiprocessing.Process(target=pingHost, args=(task_queue, result_queue)))
            
            for process in processList:
                process.start()
                
            for process in processList:
                process.join()
            
            result_queue.put(None)
            
            for pingResult in iter(result_queue.get,None):
                tempHosts.append([pingResult[0],pingResult[1]])

        return tempHosts
        
    except KeyboardInterrupt:
        print("KeyboardInterrupt on performPingSweep")
        try:
            for process in processList:
                process.join()
        except:
            pass
        return None
        
    except:
        print("Exception in performPingSweep")
        try:
            for process in processList:
                process.join()
        except:
            pass
        return None


def main(argv):
    
    result = []
    net_addr = ""
    all_hosts = []
    oldtime = 0
    
    ##########################
    # Evaluate comandline args
    try:
        opts, args = getopt.getopt(argv,"n:")
    except getopt.GetoptError:
        sys.exit(2)
    
    for opt, arg in opts:
        if opt == '-n':
            net_addr = arg


    try:
        ###########################################    
        # Create the ip list for the given networks
        
        # split -n command line argument to a list of networks
        net_addr_arr = net_addr.split(",")

        # for any network get ip list and add to all_hosts list
        try:
            for entry in net_addr_arr:
                ip_net = ipaddress.ip_network(entry)
                thisList = list(ip_net.hosts())
                all_hosts.extend(thisList)
                all_hosts = [str(item) for item in all_hosts]
                # special case for single host given
                if len(all_hosts) == 0:
                    temp = re.findall(r'(?:d{1,3}.) (?:d{1,3})', entry)
                    for thisIp in temp:
                        all_hosts.append(thisIp)
        except:
            print("no valid ip network given. Abort.")
            outputUsageInfo()
            sys.exit(1)
        
        print("Starting scan for network(s) {}".format(net_addr))
        
        #####################
        # perform ping Sweep
        result = performPingSweep(all_hosts)
        
        if result == None:
            print("No addresses to ping or user abort")
            sys.exit(1)
        
        print(result)
        
        # output execution time
        runtime = float("%0.2f" % (time.time() - startTime))
        print("Run Time: ", runtime, "seconds")

    except KeyboardInterrupt:
        print("Keyboard Interrupt raised in MAIN")
        return
        
    return

if __name__ == "__main__":
    if sys.platform.startswith('win'):
        # On Windows calling this function is necessary.
        multiprocessing.freeze_support()

    main(sys.argv[1:])
 

Трассировка после нажатия CTRL C во время фазы нереста:

 Starting scan for network(s) x.x.x.x/24
Failed to import the site module
Traceback (most recent call last):
Failed to import the site module
Fatal Python error: Py_Initialize: can't initialize sys standard streams
Fatal Python error: Py_Initialize: can't initialize sys standard streams
Traceback (most recent call last):
Failed to import the site module
  File "<string>", line 1, in <module>
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "c:Program Files (x86)Python34libsite.py", line 564, in <module>
Traceback (most recent call last):
  File "c:Program Files (x86)Python34libsite.py", line 564, in <module>
  File "c:Program Files (x86)Python34libmultiprocessing__init__.py", line 16, in <module>
  File "c:Program Files (x86)Python34libio.py", line 52, in <module>
  File "c:Program Files (x86)Python34libio.py", line 72, in <module>
    main()
  File "c:Program Files (x86)Python34libsite.py", line 72, in <module>
    from . import context
    main()
  File "c:Program Files (x86)Python34libsite.py", line 547, in main
  File "c:Program Files (x86)Python34libmultiprocessingcontext.py", line 3, in <module>
  File "c:Program Files (x86)Python34libsite.py", line 545, in main
    import os
    import threading
  File "c:Program Files (x86)Python34libthreading.py", line 10, in <module>
  File "c:Program Files (x86)Python34libos.py", line 618, in <module>
    from traceback import format_exc as _format_exc
    known_paths = venv(known_paths)
  File "c:Program Files (x86)Python34libsite.py", line 471, in venv
  File "c:Program Files (x86)Python34libtraceback.py", line 3, in <module>
    from _collections_abc import MutableMapping
    import linecache
    import re
Fatal Python error: Py_Initialize: can't initialize sys standard streams
    abs_paths()
  File "<frozen importlib._bootstrap>", line 2237, in _find_and_load
  File "c:Program Files (x86)Python34liblinecache.py", line 10, in <module>
    import abc
  File "<frozen importlib._bootstrap>", line 2226, in _find_and_load_unlocked
  File "c:Program Files (x86)Python34libsite.py", line 109, in abs_paths
  File "c:Program Files (x86)Python34libre.py", line 336, in <module>
Traceback (most recent call last):
  File "c:Program Files (x86)Python34libabc.py", line 6, in <module>
    import tokenize
  File "<frozen importlib._bootstrap>", line 1200, in _load_unlocked
    import copyreg
  File "<frozen importlib._bootstrap>", line 2237, in _find_and_load
  File "c:Program Files (x86)Python34libtokenize.py", line 29, in <module>
    m.__cached__ = os.path.abspath(m.__cached__)
    import collections
  File "<frozen importlib._bootstrap>", line 1129, in _exec
  File "<frozen importlib._bootstrap>", line 2237, in _find_and_load
      File "c:Program Files (x86)Python34libntpath.py", line 554, in abspath
  File "c:Program Files (x86)Python34libcollections__init__.py", line 14, in <module>
  File "<frozen importlib._bootstrap>", line 1467, in exec_module
  File "<frozen importlib._bootstrap>", line 1570, in get_code
from _weakrefset import WeakSet  File "<frozen importlib._bootstrap>", line 2230, in _find_and_load_unlocked
    import heapq as _heapq

  File "<frozen importlib._bootstrap>", line 2237, in _find_and_load
  File "<frozen importlib._bootstrap>", line 2237, in _find_and_load
  File "<frozen importlib._bootstrap>", line 2226, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 2226, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 2226, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 1200, in _load_unlocked
  File "<frozen importlib._bootstrap>", line 1193, in _load_unlocked
  File "<frozen importlib._bootstrap>", line 1129, in _exec
  File "<frozen importlib._bootstrap>", line 1118, in create
  File "<frozen importlib._bootstrap>", line 1467, in exec_module
  File "<frozen importlib._bootstrap>", line 130, in _new_module
  File "<frozen importlib._bootstrap>", line 1555, in get_code
KeyboardInterrupt
  File "<frozen importlib._bootstrap>", line 1624, in get_data
KeyboardInterrupt
  File "<frozen importlib._bootstrap>", line 656, in _compile_bytecode
KeyboardInterrupt
    return normpath(path)
  File "c:Program Files (x86)Python34libntpath.py", line 505, in normpath
    comps = path.split(sep)
KeyboardInterrupt
KeyboardInterrupt
  File "<frozen importlib._bootstrap>", line 1200, in _load_unlocked
  File "<frozen importlib._bootstrap>", line 1129, in _exec
Fatal Python error: Py_Initialize: can't initialize sys standard streams
      File "<frozen importlib._bootstrap>", line 1467, in exec_module
Traceback (most recent call last):
  File "<frozen importlib._bootstrap>", line 1570, in get_code
  File "<frozen importlib._bootstrap>", line 656, in _compile_bytecode
KeyboardInterrupt
  File "c:Program Files (x86)Python34libcodecs.py", line 183, in __init__
class IOBase(_io._IOBase, metaclass=abc.ABCMeta):
  File "c:Program Files (x86)Python34libabc.py", line 143, in __new__
    cls.__abstractmethods__ = frozenset(abstracts)
KeyboardInterrupt
    def __init__(self, errors='strict'):
KeyboardInterrupt
KeyboardInterrupt on performPingSweep
No addresses to ping or user abort