Есть ли у кого-нибудь рабочий пример сервера скрытой клавиатуры GATT, использующего BLUEZ?

#keyboard #hid #bluez #gatt

#клавиатура #hid #bluez #gatt

Вопрос:

Я использую свой Raspbery PI4-b с дистрибутивом Raspian Lite для написания виртуальной клавиатуры с использованием BLUEZ для создания службы GATT HID (HOGP).

Я взял пример кода GATT для сервиса мониторинга сердечного ритма, предоставляемого BLUEZ, и изменил его, добавив службы и характеристики, чтобы попытаться создать свой клавиатурный сервер GATT HID.

Я сделал все возможное, чтобы мой сервер заработал, но не могу в этом разобраться. Итак, было интересно, есть ли у кого-нибудь рабочий пример сервиса такого типа, с которым я мог бы работать?

У меня есть сервер, который нормально подключается к Windows 10, и мой телефон Android в качестве периферийного устройства клавиатуры ble, но ни один из клиентов не запускает функцию уведомления о характеристиках отчетов, чтобы я мог начать отправлять отчеты, содержащие данные нажатия клавиш.

Спасибо!!! Заранее за любую помощь.

Код, который я создал из образца BLUEZ heart monitor:

 #!/usr/bin/env python3
# SPDX-License-Identifier: LGPL-2.1-or-later

import time
import dbus, dbus.exceptions
import dbus.mainloop.glib
import dbus.service

import array
try:
  from gi.repository import GObject
except ImportError:
  import gobject as GObject
import sys

from random import randint

mainloop = None
hidService = None

BLUEZ_SERVICE_NAME = 'org.bluez'
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
DBUS_OM_IFACE =      'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE =    'org.freedesktop.DBus.Properties'

GATT_SERVICE_IFACE = 'org.bluez.GattService1'
GATT_CHRC_IFACE =    'org.bluez.GattCharacteristic1'
GATT_DESC_IFACE =    'org.bluez.GattDescriptor1'

class InvalidArgsException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'

class NotSupportedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.NotSupported'

class NotPermittedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.NotPermitted'

class InvalidValueLengthException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.InvalidValueLength'

class FailedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.Failed'


class Application(dbus.service.Object):
    """
    org.bluez.GattApplication1 interface implementation
    """
    def __init__(self, bus):
 
        self.path = '/'
        self.services = []
        dbus.service.Object.__init__(self, bus, self.path)
        
        self.add_service(HIDService(bus, 0))
        self.add_service(DeviceInfoService(bus, 1))
        self.add_service(BatteryService(bus, 2))
        #self.add_service(TestService(bus, 3))

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_service(self, service):
        self.services.append(service)

    @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
    def GetManagedObjects(self):
        response = {}
        print('GetManagedObjects')

        for service in self.services:
            response[service.get_path()] = service.get_properties()
            chrcs = service.get_characteristics()
            for chrc in chrcs:
                response[chrc.get_path()] = chrc.get_properties()
                descs = chrc.get_descriptors()
                for desc in descs:
                    response[desc.get_path()] = desc.get_properties()

        return response


class Service(dbus.service.Object):
    """
    org.bluez.GattService1 interface implementation
    """
    PATH_BASE = '/org/bluez/example/service'

    def __init__(self, bus, index, uuid, primary):
        self.path = self.PATH_BASE   str(index)
        self.bus = bus
        self.uuid = uuid
        self.primary = primary
        self.characteristics = []
        dbus.service.Object.__init__(self, bus, self.path)

    def get_properties(self):
        return {
                GATT_SERVICE_IFACE: {
                        'UUID': self.uuid,
                        'Primary': self.primary,
                        'Characteristics': dbus.Array(
                                self.get_characteristic_paths(),
                                signature='o')
                }
        }

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_characteristic(self, characteristic):
        self.characteristics.append(characteristic)

    def get_characteristic_paths(self):
        result = []
        for chrc in self.characteristics:
            result.append(chrc.get_path())
        return result

    def get_characteristics(self):
        return self.characteristics

    @dbus.service.method(DBUS_PROP_IFACE,
                         in_signature='s',
                         out_signature='a{sv}')
    def GetAll(self, interface):
        if interface != GATT_SERVICE_IFACE:
            raise InvalidArgsException()

        return self.get_properties()[GATT_SERVICE_IFACE]


class Characteristic(dbus.service.Object):
    """
    org.bluez.GattCharacteristic1 interface implementation
    """
    def __init__(self, bus, index, uuid, flags, service):
        self.path = service.path   '/char'   str(index)
        self.bus = bus
        self.uuid = uuid
        self.service = service
        self.flags = flags
        self.descriptors = []
        dbus.service.Object.__init__(self, bus, self.path)

    def get_properties(self):
        return {
                GATT_CHRC_IFACE: {
                        'Service': self.service.get_path(),
                        'UUID': self.uuid,
                        'Flags': self.flags,
                        'Descriptors': dbus.Array(
                                self.get_descriptor_paths(),
                                signature='o')
                }
        }

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_descriptor(self, descriptor):
        self.descriptors.append(descriptor)

    def get_descriptor_paths(self):
        result = []
        for desc in self.descriptors:
            result.append(desc.get_path())
        return result

    def get_descriptors(self):
        return self.descriptors

    @dbus.service.method(DBUS_PROP_IFACE,
                         in_signature='s',
                         out_signature='a{sv}')
    def GetAll(self, interface):
        if interface != GATT_CHRC_IFACE:
            raise InvalidArgsException()

        return self.get_properties()[GATT_CHRC_IFACE]

    @dbus.service.method(GATT_CHRC_IFACE,
                        in_signature='a{sv}',
                        out_signature='ay')
    def ReadValue(self, options):
        print('Default ReadValue called, returning error')
        raise NotSupportedException()

    @dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}')
    def WriteValue(self, value, options):
        print('Default WriteValue called, returning error')
        raise NotSupportedException()

    @dbus.service.method(GATT_CHRC_IFACE)
    def StartNotify(self):
        print('Default StartNotify called, returning error')
        raise NotSupportedException()

    @dbus.service.method(GATT_CHRC_IFACE)
    def StopNotify(self):
        print('Default StopNotify called, returning error')
        raise NotSupportedException()

    @dbus.service.signal(DBUS_PROP_IFACE,
                         signature='sa{sv}as')
    def PropertiesChanged(self, interface, changed, invalidated):
        pass


class Descriptor(dbus.service.Object):
    """
    org.bluez.GattDescriptor1 interface implementation
    """
    def __init__(self, bus, index, uuid, flags, characteristic):
        self.path = characteristic.path   '/desc'   str(index)
        self.bus = bus
        self.uuid = uuid
        self.flags = flags
        self.chrc = characteristic
        dbus.service.Object.__init__(self, bus, self.path)

    def get_properties(self):
        return {
                GATT_DESC_IFACE: {
                        'Characteristic': self.chrc.get_path(),
                        'UUID': self.uuid,
                        'Flags': self.flags,
                }
        }

    def get_path(self):
        return dbus.ObjectPath(self.path)

    @dbus.service.method(DBUS_PROP_IFACE,
                         in_signature='s',
                         out_signature='a{sv}')
    def GetAll(self, interface):
        if interface != GATT_DESC_IFACE:
            raise InvalidArgsException()

        return self.get_properties()[GATT_DESC_IFACE]

    @dbus.service.method(GATT_DESC_IFACE,
                        in_signature='a{sv}',
                        out_signature='ay')
    def ReadValue(self, options):
        print ('Default ReadValue called, returning error')
        raise NotSupportedException()

    @dbus.service.method(GATT_DESC_IFACE, in_signature='aya{sv}')
    def WriteValue(self, value, options):
        print('Default WriteValue called, returning error')
        raise NotSupportedException()

class BatteryService(Service):
    """
    Fake Battery service that emulates a draining battery.

    """
    SERVICE_UUID = '180f'

    def __init__(self, bus, index):
        Service.__init__(self, bus, index, self.SERVICE_UUID, True)
        self.add_characteristic(BatteryLevelCharacteristic(bus, 0, self))


class BatteryLevelCharacteristic(Characteristic):
    """
    Fake Battery Level characteristic. The battery level is drained by 2 points
    every 5 seconds.

    """
    BATTERY_LVL_UUID = '2a19'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.BATTERY_LVL_UUID,
                ['read', 'notify'],
                service)
        self.notifying = False
        self.battery_lvl = 100
        self.timer = GObject.timeout_add(30000, self.drain_battery)

    def notify_battery_level(self):
        if not self.notifying:
            return
        self.PropertiesChanged(
                GATT_CHRC_IFACE,
                { 'Value': [dbus.Byte(self.battery_lvl)] }, [])

    def drain_battery(self):
        if not self.notifying:
            return True
        if self.battery_lvl > 0:
            self.battery_lvl -= 2
            if self.battery_lvl < 5:
                #self.battery_lvl = 0
                GObject.source_remove(self.timer)
                
        print('Battery Level drained: '   repr(self.battery_lvl))
        self.notify_battery_level()
        return True

    def ReadValue(self, options):
        print('Battery Level read: '   repr(self.battery_lvl))
        return [dbus.Byte(self.battery_lvl)]

    def StartNotify(self):
        if self.notifying:
            print('Already notifying, nothing to do')
            return

        self.notifying = True
        self.notify_battery_level()

    def StopNotify(self):
        if not self.notifying:
            print('Not notifying, nothing to do')
            return

        self.notifying = False


#sourceId="org.bluetooth.service.device_information" type="primary" uuid="180A"
class DeviceInfoService(Service):

    SERVICE_UUID = '180A'

    def __init__(self, bus, index):
        Service.__init__(self, bus, index, self.SERVICE_UUID, True)
        self.add_characteristic(VendorCharacteristic(bus, 0, self))
        self.add_characteristic(ProductCharacteristic(bus, 1, self))
        self.add_characteristic(VersionCharacteristic(bus, 2, self))

#name="Manufacturer Name String" sourceId="org.bluetooth.characteristic.manufacturer_name_string" uuid="2A29"
class VendorCharacteristic(Characteristic):

    CHARACTERISTIC_UUID = '2A29'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.CHARACTERISTIC_UUID,
                ["read"],
                service)
        
        self.value = dbus.Array('HodgeCode'.encode(), signature=dbus.Signature('y'))
        print(f'***VendorCharacteristic value***: {self.value}')

    def ReadValue(self, options):
        print(f'Read VendorCharacteristic: {self.value}')
        return self.value

#sourceId="org.bluetooth.characteristic.model_number_string" uuid="2A24"
class ProductCharacteristic(Characteristic):

    CHARACTERISTIC_UUID = '2A24'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.CHARACTERISTIC_UUID,
                ["read"],
                service)
        
        self.value = dbus.Array('smartRemotes'.encode(), signature=dbus.Signature('y'))
        print(f'***ProductCharacteristic value***: {self.value}')

    def ReadValue(self, options):
        print(f'Read ProductCharacteristic: {self.value}')
        return self.value

#sourceId="org.bluetooth.characteristic.software_revision_string" uuid="2A28"
class VersionCharacteristic(Characteristic):

    CHARACTERISTIC_UUID = '2A28'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.CHARACTERISTIC_UUID,
                ["read"],
                service)
        
        self.value = dbus.Array('version 1.0.0'.encode(), signature=dbus.Signature('y'))
        print(f'***VersionCharacteristic value***: {self.value}')

    def ReadValue(self, options):
        print(f'Read VersionCharacteristic: {self.value}')
        return self.value

#name="Human Interface Device" sourceId="org.bluetooth.service.human_interface_device" type="primary" uuid="1812"
class HIDService(Service):
    SERVICE_UUID = '1812'
   
    def __init__(self, bus, index, application):
        Service.__init__(self, bus, index, self.SERVICE_UUID, True)
        
        self.parent = application
        self.protocolMode = ProtocolModeCharacteristic(bus, 0, self)
        self.hidInfo = HIDInfoCharacteristic(bus, 1, self)
        self.controlPoint = ControlPointCharacteristic(bus, 2, self)
        self.report = ReportCharacteristic(bus, 3, self)
        self.reportMap = ReportMapCharacteristic(bus, 4, self)
        
        self.add_characteristic(self.protocolMode)
        self.add_characteristic(self.hidInfo)
        self.add_characteristic(self.controlPoint)
        self.add_characteristic(self.report)
        self.add_characteristic(self.reportMap)
    
        self.protocolMode.ReadValue({})
        
#name="Protocol Mode" sourceId="org.bluetooth.characteristic.protocol_mode" uuid="2A4E"
class ProtocolModeCharacteristic(Characteristic):

    CHARACTERISTIC_UUID = '2A4E'

    def __init__(self, bus, index, service):
        
        Characteristic.__init__(
                self, bus, index,
                self.CHARACTERISTIC_UUID,
                ["read", "write-without-response"],
                service)
        
        #self.value = dbus.Array([1], signature=dbus.Signature('y'))
        self.parent = service
        self.value = dbus.Array(bytearray.fromhex('01'), signature=dbus.Signature('y'))
        print(f'***ProtocolMode value***: {self.value}')
        print('********', service.parent)

    def ReadValue(self, options):
        print(f'Read ProtocolMode: {self.value}')
        return self.value

    def WriteValue(self, value, options):
        print(f'Write ProtocolMode {value}')
        self.value = value

#sourceId="org.bluetooth.characteristic.hid_control_point" uuid="2A4C"
class ControlPointCharacteristic(Characteristic):

    CHARACTERISTIC_UUID = '2A4C'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.CHARACTERISTIC_UUID,
                ["write-without-response"],
                service)
        
        self.value = dbus.Array(bytearray.fromhex('00'), signature=dbus.Signature('y'))
        print(f'***ControlPoint value***: {self.value}')

    def WriteValue(self, value, options):
        print(f'Write ControlPoint {value}')
        self.value = value


#id="hid_information" name="HID Information" sourceId="org.bluetooth.characteristic.hid_information" uuid="2A4A"
class HIDInfoCharacteristic(Characteristic):

    CHARACTERISTIC_UUID = '2A4A'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.CHARACTERISTIC_UUID,
                ['read'],
                service)
                
        self.value = dbus.Array(bytearray.fromhex('01110002'), signature=dbus.Signature('y'))
        print(f'***HIDInformation value***: {self.value}')

    def ReadValue(self, options):
        print(f'Read HIDInformation: {self.value}')
        return self.value


#sourceId="org.bluetooth.characteristic.report_map" uuid="2A4B"
class ReportMapCharacteristic(Characteristic):

    CHARACTERISTIC_UUID = '2A4B'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.CHARACTERISTIC_UUID,
                ['read'],
                service)
                
        self.parent = service
        #self.value = dbus.Array(bytearray.fromhex('05010906a101850175019508050719e029e715002501810295017508810395057501050819012905910295017503910395067508150026ff000507190029ff8100c0050C0901A101850275109501150126ff0719012Aff078100C005010906a101850375019508050719e029e715002501810295017508150026ff000507190029ff8100c0'), signature=dbus.Signature('y'))
        self.value = dbus.Array(bytearray.fromhex('05010906a101050719e029e71500250175019508810295017508810195067508150025650507190029658100c0'), signature=dbus.Signature('y'))
        print(f'***ReportMap value***: {self.value}')

    def ReadValue(self, options):
        print(f'Read ReportMap: {self.value}')
        return self.value


#id="report" name="Report" sourceId="org.bluetooth.characteristic.report" uuid="2A4D"        
class ReportCharacteristic(Characteristic):

    CHARACTERISTIC_UUID = '2A4D'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.CHARACTERISTIC_UUID,
                ['read', 'notify'],
                service)
                
        #self.add_descriptor(ClientConfigurationDescriptor(bus, 0, self))
        self.add_descriptor(ReportReferenceDescriptor(bus, 1, self))
        
        #[ 0xA1, reportNum, 0, 0, 0, 0, 0, 0, 0, 0 ]
        #self.value = dbus.Array(bytearray.fromhex('00000000000000000000'), signature=dbus.Signature('y'))
        self.value = dbus.Array(bytearray.fromhex('0000000000000000'), signature=dbus.Signature('y'))
        print(f'***Report value***: {self.value}')
                
        self.notifying = False
        #self.battery_lvl = 100
        #GObject.timeout_add(5000, self.drain_battery)

    def send(self, value='Hey'):
        print(f'***send*** {value}');
        self.payload = dbus.Array(bytearray.fromhex('a100004800000000'))       
        self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': self.payload }, [])
                
        print(f'***sent***');
        

    def ReadValue(self, options):
        print(f'Read Report: {self.value}')
        return self.value

    def WriteValue(self, value, options):
        print(f'Write Report {self.value}')
        self.value = value

    def StartNotify(self):
        print(f'Start Notify')
        if self.notifying:
            print('Already notifying, nothing to do')
            return

        self.notifying = True
        self.notify_battery_level()

    def StopNotify(self):
        print(f'Stop Notify')
        if not self.notifying:
            print('Not notifying, nothing to do')
            return

        self.notifying = False
        
#name="Client Characteristic Configuration" sourceId="org.bluetooth.descriptor.gatt.client_characteristic_configuration" uuid="2902"
class ClientConfigurationDescriptor(Descriptor):

    DESCRIPTOR_UUID = '2902'

    def __init__(self, bus, index, characteristic):
        Descriptor.__init__(
                self, bus, index,
                self.DESCRIPTOR_UUID,
                ['read', 'write'],
                characteristic)
                
        self.value = dbus.Array(bytearray.fromhex('0100'), signature=dbus.Signature('y'))
        print(f'***ClientConfiguration***: {self.value}')

    def ReadValue(self, options):
        print(f'Read ClientConfiguration: {self.value}')
        return self.value

    def WriteValue(self, value, options):
        print(f'Write ClientConfiguration {self.value}')
        self.value = value

#type="org.bluetooth.descriptor.report_reference" uuid="2908"
class ReportReferenceDescriptor(Descriptor):

    DESCRIPTOR_UUID = '2908'

    def __init__(self, bus, index, characteristic):
        Descriptor.__init__(
                self, bus, index,
                self.DESCRIPTOR_UUID,
                ['read'],
                characteristic)
                
        self.value = dbus.Array(bytearray.fromhex('0001'), signature=dbus.Signature('y'))
        print(f'***ReportReference***: {self.value}')

    def ReadValue(self, options):
        print(f'Read ReportReference: {self.value}')
        return self.value
 
#############################
# my sandbox
#############################
class TestService(Service):
    """
    Dummy test service that provides characteristics and descriptors that
    exercise various API functionality.
    """

    SERVICE_UUID = '12345678-1234-5678-1234-56789abcdef0'

    def __init__(self, bus, index):
        Service.__init__(self, bus, index, self.SERVICE_UUID, True)
        self.add_characteristic(TestCharacteristic(bus, 0, self))


class TestCharacteristic(Characteristic):
    """
    Dummy test characteristic. Allows writing arbitrary bytes to its value, and
    contains "extended properties", as well as a test descriptor.
    """

    CHARACTERISTIC_UUID = '12345678-1234-5678-1234-56789abcdef1'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.CHARACTERISTIC_UUID,
                ['read', 'write'],
                service)
                
        self.add_descriptor(TestDescriptor(bus, 0, self))
        #self.value = []
        self.value = dbus.Array(bytearray.fromhex('05010906a101850175019508050719e029e715002501810295017508810395057501050819012905910295017503910395067508150026ff000507190029ff8100c0050C0901A101850275109501150126ff0719012Aff078100C005010906a101850375019508050719e029e715002501810295017508150026ff000507190029ff8100c0'), signature=dbus.Signature('y'))

    def ReadValue(self, options):
        print('TestCharacteristic Read: '   repr(self.value))
        return self.value

    def WriteValue(self, value, options):
        print('TestCharacteristic Write: '   repr(value))
        self.value = value


class TestDescriptor(Descriptor):
    """
    Dummy test descriptor. Returns a static value.
    """

    DESCRIPTOR_UUID = '12345678-1234-5678-1234-56789abcdef2'

    def __init__(self, bus, index, characteristic):
        Descriptor.__init__(
                self, bus, index,
                self.DESCRIPTOR_UUID,
                ['read', 'write'],
                characteristic)
                
        self.value = dbus.Array('Test'.encode(), signature=dbus.Signature('y'))
        print(f'***TestDescriptor***: {self.value}')


    def ReadValue(self, options):
        print('TestDescriptor Read')
        
        return self.value

    def WriteValue(self, value, options):
        print(f'TestDescriptor Write: {value}')
        self.value = value

def register_app_cb():
    print('GATT application registered')


def register_app_error_cb(error):
    print('Failed to register application: '   str(error))
    mainloop.quit()


def find_adapter(bus):
    remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
                               DBUS_OM_IFACE)
    objects = remote_om.GetManagedObjects()

    for o, props in objects.items():
        if GATT_MANAGER_IFACE in props.keys():
            return o

    return None

def main():
    global mainloop

    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

    bus = dbus.SystemBus()

    adapter = find_adapter(bus)
    if not adapter:
        print('GattManager1 interface not found')
        return

    service_manager = dbus.Interface(
            bus.get_object(BLUEZ_SERVICE_NAME, adapter),
            GATT_MANAGER_IFACE)

    app = Application(bus)

    mainloop = GObject.MainLoop()

    print('Registering GATT application...')

    service_manager.RegisterApplication(app.get_path(), {},
                                    reply_handler=register_app_cb,
                                    error_handler=register_app_error_cb)

    mainloop.run()

if __name__ == '__main__':
    main()