#python #bluetooth-lowenergy #python-asyncio #bluetooth-gatt
#python #bluetooth-lowenergy #python-asyncio #bluetooth-gatt
Вопрос:
Я создал клиент GATT на python, используя библиотеку Bleak. Я подписываюсь на характеристику уведомления, и после того, как я получу обратный вызов в функции обработчика уведомлений, я выполняю запись в характеристику.До сих пор, после запуска уведомления, я использовал asyncio.sleep в течение 5 секунд, а затем я бы записал характеристику. Но это очень сложно, и нет никакого контроля над кодом, поэтому я хочу подождать, пока не получу обратный вызов в функции обработчика уведомлений, и когда я получу ответ, я хочу выполнить запись. Я новичок в python async IO, и я не уверен, как этого добиться. Спасибо за ваше время и усилия.
Вот код:
import logging
import asyncio
import platform
from bleak import BleakClient
from bleak import _logger as logger
import time
from concurrent.futures import ThreadPoolExecutor
CHARACTERISTIC_UUID = "0000c305-0000-1000-8000-00805f9b34fb"
CHARACTERISTIC_UUID_WR = "0000c303-0000-1000-8000-00805f9b34fb"
CHARACTERISTIC_UUID_RD = "0000c301-0000-1000-8000-00805f9b34fb"
_executor = ThreadPoolExecutor(1)
async def notification_handler(sender, data):
"""Simple notification handler which prints the data received."""
print("{0}: {1}".format(sender, data))
print(hexdump(data, 16))
def hexdump(src, length=16):
result = []
digits = 4 if isinstance(src, str) else 2
for i in range(0, len(src), length):
s = src[i:i length]
hexa = " ".join(map("{0:0>2X}".format, src))
text = "".join([chr(x) if 0x20 <= x < 0x7F else "." for x in s])
result.append("X %-*s %s" % (i, length * (digits 1), hexa, text))
return "n".join(result)
async def run(address, debug=False):
if debug:
import sys
l = logging.getLogger("asyncio")
l.setLevel(logging.DEBUG)
h = logging.StreamHandler(sys.stdout)
h.setLevel(logging.DEBUG)
l.addHandler(h)
logger.addHandler(h)
async with BleakClient(address) as client:
x = await client.is_connected()
logger.info("Connected: {0}".format(x))
bleDevice = client
await client.start_notify(CHARACTERISTIC_UUID, notification_handler)
# before: asyncio.sleep(5)
# now: wait here until I the notification_hanlder get a callback, then do the write
await client.write_gatt_char(CHARACTERISTIC_UUID_WR, [0xA6, 0x00, 0x02, 0xD0, 0x01, 0x77])
await client.write_gatt_char(CHARACTERISTIC_UUID_WR, [0xA6])
await client.write_gatt_char(CHARACTERISTIC_UUID_WR, [0xA7])
if __name__ == "__main__":
import os
os.environ["PYTHONASYNCIODEBUG"] = str(1)
address = (
"48:23:35:00:14:be" # <--- Change to your device's address here if you are using Windows or Linux
if platform.system() != "Darwin"
else "B9EA5233-37EF-4DD6-87A8-2A875E821C46" # <--- Change to your device's address here if you are using macOS
)
# for i in range(5):
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(run(address, True))
Ответ №1:
Я начну с примечания. Похоже, вы выбрали некоторые пользовательские значения UUID, которые находятся в зарезервированном диапазоне для UUID, одобренных Bluetooth SIG. По этому поводу есть полезная статья: https://www.novelbits.io/uuid-for-custom-services-and-characteristics /
Перейдем к вашему основному вопросу. Я сам только изучаю asyncio, но я посмотрел и что-то запустил. Ваши write_gatt_char
команды выполнялись при инициализации клиента, а не при получении уведомления. Как только он выполнил запись, он вышел.
Я изменил ваш пример, чтобы использовать бит BBC micro:. Когда отправляется уведомление о кнопке «A», это заставляет мрачного клиента отправлять случайное письмо на бит micro:. При нажатии кнопки «B» уведомление отключается от клиента. Я переместил write_gatt_char
в код уведомления и использовал a create_task
для выполнения записи. Чтобы поддерживать клиент в рабочем состоянии до тех пор, пока кнопка «B» не будет нажата на микро: бит, я поместил цикл while в клиент с режимом ожидания, чтобы остановить завершение. Подозреваю, что это не лучший способ сделать это.
Это пример, который сработал для меня:
import logging
import asyncio
import platform
from random import randint
from bleak import BleakClient
from bleak import _logger as logger
BTN_A_UUID = "E95DDA90-251D-470A-A062-FA1922DFA9A8"
BTN_B_UUID = "E95DDA91-251D-470A-A062-FA1922DFA9A8"
LED_TXT_UUID = "E95D93EE-251D-470A-A062-FA1922DFA9A8"
async def run(address, debug=False):
if debug:
import sys
l = logging.getLogger("asyncio")
l.setLevel(logging.DEBUG)
h = logging.StreamHandler(sys.stdout)
h.setLevel(logging.DEBUG)
l.addHandler(h)
logger.addHandler(h)
async with BleakClient(address) as client:
x = await client.is_connected()
logger.info("Connected: {0}".format(x))
def btn_a_handler(sender, data):
"""Simple notification handler for btn a events."""
print("{0}: {1}".format(sender, data))
# Pick random letter to send
if int.from_bytes(data, byteorder='little', signed=False) > 0:
letter = [randint(99, 122)]
loop.create_task(write_txt(letter))
def btn_b_handler(sender, data):
"""Simple notification handler for btn b events."""
print("{0}: {1}".format(sender, data))
if int.from_bytes(data, byteorder='little', signed=False) > 0:
loop.create_task(client.disconnect())
async def write_txt(data):
await client.write_gatt_char(LED_TXT_UUID,
data)
await client.start_notify(BTN_A_UUID, btn_a_handler)
await client.start_notify(BTN_B_UUID, btn_b_handler)
while await client.is_connected():
await asyncio.sleep(1)
if __name__ == "__main__":
address = ("E9:06:4D:45:FC:8D")
loop = asyncio.get_event_loop()
# loop.set_debug(True)
loop.run_until_complete(run(address, True))
Комментарии:
1. Спасибо, что сообщили мне о пользовательском UUID. У меня есть пользовательский сервер GATT, и он имеет разные характеристики для чтения, записи и уведомления. Итак, когда я запускаю_notify в UUID уведомления, через 2 секунды я получаю обратный вызов и в зависимости от обратного вызова я должен выполнить запись. Итак, как только я начну уведомление, я буду продолжать получать обратные вызовы для notification_handler, и на основе этого я должен выполнить запись в последующем UUID. Итак, в принципе, я должен поддерживать notification_handler и продолжать выполнять записи, пока я не отключусь.
2. Ваш последний комментарий — это вопрос? Я что-то неправильно понял? Я понял, что ваш код должен получать уведомление, и на основе данных в этом уведомлении вы записываете в другую характеристику / UUID. Я думал, что это то, что сделал мой пример. Ваш исходный код отключался и завершался до получения уведомления и не выполнял записи на основе информации в уведомлении. Это было то, что вы намеревались?
3. Извините за поздний ответ, это сработало после некоторых незначительных изменений. Спасибо за вашу помощь, человек. Все это работает быстро и без каких-либо проблем.