Scapy sniff() не запускает аргументы prn/stop_sniff во время работы в потоке?

#python #python-multithreading #scapy

Вопрос:

Я пишу некоторый код, который будет искать пакет LLDP с помощью scapy, а затем сохранит пакет(ы) в виде файла pcap для анализа с помощью pyshark.

Функция sniff() должна выполняться в потоке, чтобы избежать зависания программы, однако мои операторы печати/отладки в обоих аргументах, вызываемых через prn и stop_sniffing соответственно, не выводятся, что указывает на то, что sniff() не принимает никаких пакетов?

Я одновременно использую wireshark, чтобы убедиться, что пакеты принимаются в интерфейсе.

 from PyQt6.QtWidgets import (
QMainWindow, QPushButton, QGridLayout, QWidget, QLabel
)
from PyQt6.QtCore import QTimer
from scapy.all import *
import threading
import PacketAnalyser  # .py file to get port info from packets

class PortScanner(QMainWindow):

    def __init__(self):
        super().__init__()

        self.portInfo = []

        self.initUI()

    def initUI(self):

        widget = QWidget()
        layout = QGridLayout(widget)

        # --- Port Scanner Widgets

        # --- Title label init ---#
        self.titleLabel = QLabel("Potter's Port Scanner")
        self.titleLabel.setStyleSheet("""
            font-family: "Palatino Linotype", "Book Antiqua", Palatino, serif;
        font-size: 30px;
        letter-spacing: 2px;
        word-spacing: 2px;
        color: #ba181b;
        font-variant: small-caps;
        """)

        # --- Port information labels init --- #
        if self.portInfo:
            self.switchName = QLabel(self.portInfo[0])
            self.switchPort = QLabel(self.portInfo[1])
            self.vlanID = QLabel(self.portInfo[2])

            self.labelStyle(self.switchName)
            self.labelStyle(self.switchPort)
            self.labelStyle(self.vlanID)

            layout.addWidget(self.switchName, 2, 0, 1, 2)
            layout.addWidget(self.switchPort, 3, 0, 1, 2)
            layout.addWidget(self.vlanID, 4, 0, 1, 2)

        # --- Start and Stop button init --- #
        self.startButton = QPushButton('Start search')
        self.startButton.setEnabled(True)
        self.buttonStyle(self.startButton)

        self.stopButton = QPushButton('Stop search')
        self.stopButton.setEnabled(False)
        self.buttonStyle(self.stopButton)

        self.secondsCounter = 0
        self.timerLabel = QLabel()
        self.timerLabel.setStyleSheet("""
        font-size: 18px;
        color: #ba181b;
        """)

        self.timer = QTimer()
        self.timer.timeout.connect(self.showTime)
        layout.addWidget(self.titleLabel, 0, 1, 1, 2)
        layout.addWidget(self.timerLabel, 1, 1, 1, 2)
        layout.addWidget(self.startButton, 5, 2, 1, 1)
        layout.addWidget(self.stopButton, 5, 1, 1, 1)

        self.startButton.clicked.connect(self.startTime)
        self.startButton.clicked.connect(self.start_button)
        self.stopButton.clicked.connect(self.stopTime)
        self.stopButton.clicked.connect(self.stop_sniffing)

        self.setStyleSheet("""
        background-color: #161a1d;
        """)
        self.setLayout(layout)
        self.setCentralWidget(widget)
        self.resize(550, 350)
        self.center()
        self.setWindowTitle("Port Scanner")


    # --- Func for button stylesheet --- #
    def buttonStyle(self, button):
        button.setStyleSheet("""
        background-color: #ba181b;
        text-style: italic;
        color: #161a1d;
        font-size: 14px;
        """)

    def labelStyle(self, label):
        label.setStyleSheet("""
        background-color: #ba181b;
        text-style: bold;
        color: #161a1d;
        font-size: 18px;
        """)

    # --- Updates timer every 1000ms --- #
    def showTime(self):
        self.secondsCounter  = 1
        self.timerLabel.setText(str("Scanner running for {} second(s)".format(self.secondsCounter)))

    # --- Starts time with 1000ms interval when start button pressed --- #
    def startTime(self):
        self.timer.start(1000)

        # --- Greys/de-greys stop/start button respectively ---#
        self.startButton.setEnabled(False)
        self.stopButton.setEnabled(True)

    def stopTime(self):
        self.timer.stop()
        self.timerLabel.setText('Action stopped')

        # --- Greys/de-greys start/stop button respectively --- #
        self.stopButton.setEnabled(False)
        self.startButton.setEnabled(True)

        # --- Resets timer for if the search is started again --- #
        self.secondsCounter = 0

    # --- Save sniff as .cap file in tmp dir for pyshark, as LLDP packets can't be parsed with native scapy
    def savePktForEdit(self, pkt):
        self.stop_sniffing()
        print("Arrived")
        print("Got to savePkt func: pkt = {}".format(pkt))
        wrpcap("/tmp/temp_sniffed.cap", pkt)
        self.portInfo = PacketAnalyser.PcapStripper.findPacketInfo("/tmp/temp_sniffed.cap")
        print("Packet found: {}".format(pkt))

    def stop_sniffing(self, x):
        global switch
        print("STOPTINGFS")
        return switch

    # ---

    def start_button(self):
        global switch
        global thread

        if (thread is None) or (not thread.is_alive()):
            switch = False
            thread = threading.Thread(target=self.getPacketCaps)
            thread.start()
            time.sleep(1)
        else:
            print('DEBUG: already running')

    @staticmethod
    def stop_button():
        global switch
        global thread

        print('DEBUG: stopping')

        switch = True
        thread = None

    def checkPkt(self, pkt):

        print("FUNC CALLED WIN ")


    def getPacketCaps(self):

        print("DEBUG: before sniff")
        sniff(filter="ether proto 0x88cc", prn=self.checkPkt, stop_filter=self.stop_sniffing)
        sniff(count=1, filter="ether proto 0x88cc", prn=self.savePktForEdit, stop_filter=self.stop_sniffing)

    def center(self):
        qr = self.frameGeometry()
        cp = self.screen().availableGeometry().center()

        qr.moveCenter(cp)
        self.move(qr.topLeft())


thread = None
switch = False
 

Соответствующий код начинается с метода showTime (), но я подумал, что лучше всего включить все это.
Метод stop_sniffing вызывается только тогда, когда я нажимаю кнопку «Стоп», и вообще не вызывается функцией sniff ().