#python #pyqt5 #pyudev
#python #pyqt5 #pyudev
Вопрос:
Следующий код показывает имя нового вставленного USB-накопителя в консоли (в качестве замены PyQt5 GUI) в Linux.
К сожалению, pyudev.device._errors.DeviceNotFoundAtPathError появляется в консоли, как только вы отсоединяете USB-накопитель, не извлекая его должным образом.
Что нужно изменить, чтобы исправить эту ошибку?
main.py:
from functools import partial
import os
import sys
import pyudev
from PyQt5.QtWidgets import QApplication
from PyQt5.QtCore import QSocketNotifier, QObject, pyqtSignal
class MainWindow():
def __init__(self, parent=None):
super().__init__()
# GUI code
pass
def print_name(self, name):
print(name)
class LinuxDeviceMonitor(QObject):
devices_changed = pyqtSignal(list)
def __init__(self):
super().__init__()
self._context = pyudev.Context()
self._monitor = pyudev.Monitor.from_netlink(self._context)
self._monitor.start()
self._devices = set()
self._process_devices(self._context.list_devices(), action="add")
def fileno(self):
return self._monitor.fileno()
@property
def device_names(self):
return [pyudev.Devices.from_path(self._context, device).get("DEVNAME") for device in self._devices]
def process_incoming(self):
read_device = partial(pyudev._util.eintr_retry_call, self._monitor.poll, timeout=0)
self._process_devices(iter(read_device, None))
self.devices_changed.emit(self.device_names)
def _process_devices(self, devices, action=None):
for device in devices:
action = device.action if action is None else action
if action in ("add", "change") and self._is_usb_mass_storage_device(device):
self._devices.add(device.sys_path)
elif action == "remove" and device.sys_path in self._devices:
self._devices.remove(device.sys_path)
@classmethod
def _read_device_flag(self, device, name):
path = os.path.join(device.sys_path, name)
try:
with open(path) as data:
return bool(int(data.read()))
except (IOError, ValueError):
return False
def _is_usb_mass_storage_device(self, device):
is_removable = self._read_device_flag(device, "removable")
has_size = self._read_device_flag(device, "size")
has_usb = device.get("ID_BUS") == "usb"
has_no_disc = device.get("ID_CDROM") is None
return is_removable and has_size and has_usb and has_no_disc
def main():
app = QApplication(sys.argv)
main_window = MainWindow()
linux_device_monitor = LinuxDeviceMonitor()
notifier = QSocketNotifier(linux_device_monitor.fileno(), QSocketNotifier.Read)
notifier.activated.connect(linux_device_monitor.process_incoming)
linux_device_monitor.devices_changed.connect(main_window.print_name)
sys.exit(app.exec_())
if __name__ == '__main__':
main()
Ответ №1:
В моем первом ответе нет ничего плохого, но вы можете перезаписать функцию своей собственной. Ниже приведен пример этого для вашей проблемы. Основными изменениями являются:
Добавлено:
from pyudev._util import ensure_byte_string
и перезаписал from_sys_path
функцию:
pyudev.Devices.from_sys_path = self.from_sys_path
def from_sys_path(self, context, sys_path):
device = context._libudev.udev_device_new_from_syspath(
context, ensure_byte_string(sys_path))
if not device:
return None
return pyudev.Device(context, device)
Изменено:
@property
def device_names(self):
return [pyudev.Devices.from_path(self._context, device).get("DEVNAME") for device in self._devices]
Для:
def device_names(self):
devices = []
for device in self._devices:
dev = pyudev.Devices.from_path(self._context, device)
if dev is not None:
devices.append(dev.get("DEVNAME"))
return devices
и
self.devices_changed.emit(self.device_names)
Для
self.devices_changed.emit(self.device_names())
Весь код выглядит следующим образом:
from functools import partial
import os
import sys
import pyudev
from PyQt5.QtWidgets import QApplication
from PyQt5.QtCore import QSocketNotifier, QObject, pyqtSignal
from pyudev._util import ensure_byte_string
class MainWindow():
def __init__(self, parent=None):
super().__init__()
# GUI code
pass
def print_name(self, name):
print(name)
class LinuxDeviceMonitor(QObject):
devices_changed = pyqtSignal(list)
def __init__(self):
super().__init__()
self._context = pyudev.Context()
self._monitor = pyudev.Monitor.from_netlink(self._context)
self._monitor.start()
self._devices = set()
self._process_devices(self._context.list_devices(), action="add")
pyudev.Devices.from_sys_path = self.from_sys_path
def from_sys_path(self, context, sys_path):
device = context._libudev.udev_device_new_from_syspath(
context, ensure_byte_string(sys_path))
if not device:
return None
return pyudev.Device(context, device)
def fileno(self):
return self._monitor.fileno()
def device_names(self):
devices = []
for device in self._devices:
dev = pyudev.Devices.from_path(self._context, device)
if dev is not None:
devices.append(dev.get("DEVNAME"))
return devices
def process_incoming(self):
read_device = partial(pyudev._util.eintr_retry_call, self._monitor.poll, timeout=0)
self._process_devices(iter(read_device, None))
self.devices_changed.emit(self.device_names())
def _process_devices(self, devices, action=None):
for device in devices:
action = device.action if action is None else action
if action in ("add", "change") and self._is_usb_mass_storage_device(device):
self._devices.add(device.sys_path)
elif action == "remove" and device.sys_path in self._devices:
self._devices.remove(device.sys_path)
@classmethod
def _read_device_flag(self, device, name):
path = os.path.join(device.sys_path, name)
try:
with open(path) as data:
return bool(int(data.read()))
except (IOError, ValueError):
return False
def _is_usb_mass_storage_device(self, device):
is_removable = self._read_device_flag(device, "removable")
has_size = self._read_device_flag(device, "size")
has_usb = device.get("ID_BUS") == "usb"
has_no_disc = device.get("ID_CDROM") is None
return is_removable and has_size and has_usb and has_no_disc
def main():
app = QApplication(sys.argv)
main_window = MainWindow()
linux_device_monitor = LinuxDeviceMonitor()
notifier = QSocketNotifier(linux_device_monitor.fileno(), QSocketNotifier.Read)
notifier.activated.connect(linux_device_monitor.process_incoming)
linux_device_monitor.devices_changed.connect(main_window.print_name)
sys.exit(app.exec_())
if __name__ == '__main__':
main()
Ваша среда IDE может жаловаться на то, что вы обращаетесь к защищенному элементу модуля (pyudev._util), но она все равно будет работать.
Ответ №2:
Вы могли бы просто перехватить исключение:
изменение
@property
def device_names(self):
return [pyudev.Devices.from_path(self._context, device).get("DEVNAME") for device in self._devices]
Для
def device_names(self):
devices = []
for device in self._devices:
try:
dev_name = pyudev.Devices.from_path(self._context, device).get("DEVNAME")
devices.append(dev_name)
except pyudev.DeviceNotFoundAtPathError:
pass
return devices
и
self.devices_changed.emit(self.device_names)
Для
self.devices_changed.emit(self.device_names())
Весь код:
from functools import partial
import os
import sys
import pyudev
from PyQt5.QtWidgets import QApplication
from PyQt5.QtCore import QSocketNotifier, QObject, pyqtSignal
class MainWindow():
def __init__(self, parent=None):
super().__init__()
# GUI code
pass
def print_name(self, name):
print(name)
class LinuxDeviceMonitor(QObject):
devices_changed = pyqtSignal(list)
def __init__(self):
super().__init__()
self._context = pyudev.Context()
self._monitor = pyudev.Monitor.from_netlink(self._context)
self._monitor.start()
self._devices = set()
self._process_devices(self._context.list_devices(), action="add")
def fileno(self):
return self._monitor.fileno()
def device_names(self):
devices = []
for device in self._devices:
try:
dev_name = pyudev.Devices.from_path(self._context, device).get("DEVNAME")
devices.append(dev_name)
except pyudev.DeviceNotFoundAtPathError:
pass
return devices
def process_incoming(self):
read_device = partial(pyudev._util.eintr_retry_call, self._monitor.poll, timeout=0)
self._process_devices(iter(read_device, None))
self.devices_changed.emit(self.device_names())
def _process_devices(self, devices, action=None):
for device in devices:
action = device.action if action is None else action
if action in ("add", "change") and self._is_usb_mass_storage_device(device):
self._devices.add(device.sys_path)
elif action == "remove" and device.sys_path in self._devices:
self._devices.remove(device.sys_path)
@classmethod
def _read_device_flag(self, device, name):
path = os.path.join(device.sys_path, name)
try:
with open(path) as data:
return bool(int(data.read()))
except (IOError, ValueError):
return False
def _is_usb_mass_storage_device(self, device):
is_removable = self._read_device_flag(device, "removable")
has_size = self._read_device_flag(device, "size")
has_usb = device.get("ID_BUS") == "usb"
has_no_disc = device.get("ID_CDROM") is None
return is_removable and has_size and has_usb and has_no_disc
def main():
app = QApplication(sys.argv)
main_window = MainWindow()
linux_device_monitor = LinuxDeviceMonitor()
notifier = QSocketNotifier(linux_device_monitor.fileno(), QSocketNotifier.Read)
notifier.activated.connect(linux_device_monitor.process_incoming)
linux_device_monitor.devices_changed.connect(main_window.print_name)
sys.exit(app.exec_())
if __name__ == '__main__':
main()
Выводит данные при вставке и извлечении USB-накопителя:
[]
[]
[]
['/dev/sdc']
['/dev/sdc']
['/dev/sdc']
[]
[]
[]
Комментарии:
1. Отлично. Перехват исключения позволяет избежать появления сообщения об ошибке. Просто из интереса, есть ли способ исправить эту ошибку без использования исключения?