#python #python-multithreading #scapy
Вопрос:
Я пишу некоторый код, который будет искать пакет LLDP с помощью scapy, а затем сохранит пакет(ы) в виде файла pcap для анализа с помощью pyshark.
Функция sniff() должна выполняться в потоке, чтобы избежать зависания программы, однако мои операторы печати/отладки в обоих аргументах, вызываемых через prn и stop_sniffing соответственно, не выводятся, что указывает на то, что sniff() не принимает никаких пакетов?
Я одновременно использую wireshark, чтобы убедиться, что пакеты принимаются в интерфейсе.
from PyQt6.QtWidgets import (
QMainWindow, QPushButton, QGridLayout, QWidget, QLabel
)
from PyQt6.QtCore import QTimer
from scapy.all import *
import threading
import PacketAnalyser # .py file to get port info from packets
class PortScanner(QMainWindow):
def __init__(self):
super().__init__()
self.portInfo = []
self.initUI()
def initUI(self):
widget = QWidget()
layout = QGridLayout(widget)
# --- Port Scanner Widgets
# --- Title label init ---#
self.titleLabel = QLabel("Potter's Port Scanner")
self.titleLabel.setStyleSheet("""
font-family: "Palatino Linotype", "Book Antiqua", Palatino, serif;
font-size: 30px;
letter-spacing: 2px;
word-spacing: 2px;
color: #ba181b;
font-variant: small-caps;
""")
# --- Port information labels init --- #
if self.portInfo:
self.switchName = QLabel(self.portInfo[0])
self.switchPort = QLabel(self.portInfo[1])
self.vlanID = QLabel(self.portInfo[2])
self.labelStyle(self.switchName)
self.labelStyle(self.switchPort)
self.labelStyle(self.vlanID)
layout.addWidget(self.switchName, 2, 0, 1, 2)
layout.addWidget(self.switchPort, 3, 0, 1, 2)
layout.addWidget(self.vlanID, 4, 0, 1, 2)
# --- Start and Stop button init --- #
self.startButton = QPushButton('Start search')
self.startButton.setEnabled(True)
self.buttonStyle(self.startButton)
self.stopButton = QPushButton('Stop search')
self.stopButton.setEnabled(False)
self.buttonStyle(self.stopButton)
self.secondsCounter = 0
self.timerLabel = QLabel()
self.timerLabel.setStyleSheet("""
font-size: 18px;
color: #ba181b;
""")
self.timer = QTimer()
self.timer.timeout.connect(self.showTime)
layout.addWidget(self.titleLabel, 0, 1, 1, 2)
layout.addWidget(self.timerLabel, 1, 1, 1, 2)
layout.addWidget(self.startButton, 5, 2, 1, 1)
layout.addWidget(self.stopButton, 5, 1, 1, 1)
self.startButton.clicked.connect(self.startTime)
self.startButton.clicked.connect(self.start_button)
self.stopButton.clicked.connect(self.stopTime)
self.stopButton.clicked.connect(self.stop_sniffing)
self.setStyleSheet("""
background-color: #161a1d;
""")
self.setLayout(layout)
self.setCentralWidget(widget)
self.resize(550, 350)
self.center()
self.setWindowTitle("Port Scanner")
# --- Func for button stylesheet --- #
def buttonStyle(self, button):
button.setStyleSheet("""
background-color: #ba181b;
text-style: italic;
color: #161a1d;
font-size: 14px;
""")
def labelStyle(self, label):
label.setStyleSheet("""
background-color: #ba181b;
text-style: bold;
color: #161a1d;
font-size: 18px;
""")
# --- Updates timer every 1000ms --- #
def showTime(self):
self.secondsCounter = 1
self.timerLabel.setText(str("Scanner running for {} second(s)".format(self.secondsCounter)))
# --- Starts time with 1000ms interval when start button pressed --- #
def startTime(self):
self.timer.start(1000)
# --- Greys/de-greys stop/start button respectively ---#
self.startButton.setEnabled(False)
self.stopButton.setEnabled(True)
def stopTime(self):
self.timer.stop()
self.timerLabel.setText('Action stopped')
# --- Greys/de-greys start/stop button respectively --- #
self.stopButton.setEnabled(False)
self.startButton.setEnabled(True)
# --- Resets timer for if the search is started again --- #
self.secondsCounter = 0
# --- Save sniff as .cap file in tmp dir for pyshark, as LLDP packets can't be parsed with native scapy
def savePktForEdit(self, pkt):
self.stop_sniffing()
print("Arrived")
print("Got to savePkt func: pkt = {}".format(pkt))
wrpcap("/tmp/temp_sniffed.cap", pkt)
self.portInfo = PacketAnalyser.PcapStripper.findPacketInfo("/tmp/temp_sniffed.cap")
print("Packet found: {}".format(pkt))
def stop_sniffing(self, x):
global switch
print("STOPTINGFS")
return switch
# ---
def start_button(self):
global switch
global thread
if (thread is None) or (not thread.is_alive()):
switch = False
thread = threading.Thread(target=self.getPacketCaps)
thread.start()
time.sleep(1)
else:
print('DEBUG: already running')
@staticmethod
def stop_button():
global switch
global thread
print('DEBUG: stopping')
switch = True
thread = None
def checkPkt(self, pkt):
print("FUNC CALLED WIN ")
def getPacketCaps(self):
print("DEBUG: before sniff")
sniff(filter="ether proto 0x88cc", prn=self.checkPkt, stop_filter=self.stop_sniffing)
sniff(count=1, filter="ether proto 0x88cc", prn=self.savePktForEdit, stop_filter=self.stop_sniffing)
def center(self):
qr = self.frameGeometry()
cp = self.screen().availableGeometry().center()
qr.moveCenter(cp)
self.move(qr.topLeft())
thread = None
switch = False
Соответствующий код начинается с метода showTime (), но я подумал, что лучше всего включить все это.
Метод stop_sniffing вызывается только тогда, когда я нажимаю кнопку «Стоп», и вообще не вызывается функцией sniff ().