#python-3.x #matplotlib #python-asyncio
Вопрос:
Я собираю и передаю необработанные данные ЭКГ с Polar H10 в LSL
розетку, используя код, адаптированный отсюда. Я записываю этот LSL
поток Matlab
для ретроспективной обработки данных, как только поток будет завершен. Это использует asyncio
библиотеку и хорошо работает.
Однако, в дополнение к потоковой передаче и записи данных ЭКГ через LSL
и Matlab
, я хотел бы одновременно предварительно обработать/построить график потоковых данных, используя matplotlib
их в режиме, близком к реальному времени, как в этом примере.
Проблема в том, что я не уверен, как выполнить предварительную обработку и построение графика в реальном времени, не вызывая задержки в LSL
потоке. Мой наивысший приоритет-поддержание точности и точности во времени потоковой передачи/записи LSL
. Функция предварительной обработки/построения графика не обязательно должна быть такой же «в реальном времени», как поток.
Моя последняя попытка сделать это с помощью asyncio
ниже, но все попытки не увенчались успехом как для потоковой передачи, так и для построения графика. Любая помощь будет признательна.
Это часть настройки сценария:
from pylsl import StreamInfo, StreamOutlet
import asyncio
import aioconsole
import os
import signal
import sys
import getopt
import math
import time
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib
from bleak import BleakClient
from bleak.uuids import uuid16_dict
""" Predefined UUID (Universal Unique Identifier) mappings are based on Heart Rate GATT service Protocol that most
Fitness/Heart Rate device manufacturers follow (Polar H10 in this case) to obtain a specific response input from
the device acting as an API """
uuid16_dict = {v: k for k, v in uuid16_dict.items()}
## This is the device MAC ID (WIN) or CFD (MAC), please update with your device ID
ADDRESS = "3E44F50E-A858-4CC2-BF9F-461FC6D25A93"
STREAMNAME = 'PolarBand'
## UUID for model number ##
MODEL_NBR_UUID = "0000{0:x}-0000-1000-8000-00805f9b34fb".format(
uuid16_dict.get("Model Number String")
)
## UUID for manufacturer name ##
MANUFACTURER_NAME_UUID = "0000{0:x}-0000-1000-8000-00805f9b34fb".format(
uuid16_dict.get("Manufacturer Name String")
)
## UUID for battery level ##
BATTERY_LEVEL_UUID = "0000{0:x}-0000-1000-8000-00805f9b34fb".format(
uuid16_dict.get("Battery Level")
)
## UUID for connection establsihment with device ##
PMD_SERVICE = "FB005C80-02E7-F387-1CAD-8ACD2D8DF0C8"
## UUID for Request of stream settings ##
PMD_CONTROL = "FB005C81-02E7-F387-1CAD-8ACD2D8DF0C8"
## UUID for Request of start stream ##
PMD_DATA = "FB005C82-02E7-F387-1CAD-8ACD2D8DF0C8"
## UUID for Request of ECG Stream ##
ECG_WRITE = bytearray([0x02, 0x00, 0x00, 0x01, 0x82, 0x00, 0x01, 0x01, 0x0E, 0x00])
## For Polar H10 sampling frequency ##
ECG_SAMPLING_FREQ = 130
# for plotting using python
ecg_session_data = []
ecg_session_time = []
# for streaming to LSL
OUTLET = []
Здесь я определяю свой LSL
поток:
# define LSL stream
def StartStream(STREAMNAME):
info = StreamInfo(STREAMNAME, 'ECG', 1,ECG_SAMPLING_FREQ, 'float32', 'myuid2424')
info.desc().append_child_value("manufacturer", "Polar")
channels = info.desc().append_child("channels")
for c in ["ECG"]:
channels.append_child("channel")
.append_child_value("name", c)
.append_child_value("unit", "microvolts")
.append_child_value("type", "ECG")
# next make an outlet; we set the transmission chunk size to 74 samples and
# the outgoing buffer size to 360 seconds (max.)
return StreamOutlet(info, 74, 360)
## Bit conversion of the Hexadecimal stream
def data_conv(sender, data):
if data[0] == 0x00:
timestamp = convert_to_unsigned_long(data, 1, 8)
step = 3
samples = data[10:]
offset = 0
while offset < len(samples):
ecg = convert_array_to_signed_int(samples, offset, step)
offset = step
OUTLET.push_sample([ecg])
ecg_session_data.extend([ecg])
ecg_session_time.extend([timestamp])
def convert_array_to_signed_int(data, offset, length):
return int.from_bytes(
bytearray(data[offset : offset length]), byteorder="little", signed=True,
)
def convert_to_unsigned_long(data, offset, length):
return int.from_bytes(
bytearray(data[offset : offset length]), byteorder="little", signed=False,
)
This kicks off the script by creating the event loop and instructing to run until all the tasks in the main
function are complete:
if __name__ == "__main__":
OUTLET = StartStream(STREAMNAME)
os.environ["PYTHONASYNCIODEBUG"] = str(1)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(main(ADDRESS, OUTLET))
main
function creates the streaming data task run
using ensure_future
and kicks off the event loop:
async def main(ADDRESS, OUTLET):
try:
async with BleakClient(ADDRESS) as client:
tasks = [
asyncio.ensure_future(run(client, True)),
]
await asyncio.gather(*tasks)
except:
pass
run
task pipes data from Polar H10 to LSL stream:
## Aynchronous task to start the data stream for ECG ##
async def run(client, debug=False):
print("---------Looking for Device------------ ", flush=True)
await client.is_connected()
print("---------Device connected--------------")
model_number = await client.read_gatt_char(MODEL_NBR_UUID)
print("Model Number: {0}".format("".join(map(chr, model_number))))
manufacturer_name = await client.read_gatt_char(MANUFACTURER_NAME_UUID)
print("Manufacturer Name: {0}".format("".join(map(chr, manufacturer_name))))
battery_level = await client.read_gatt_char(BATTERY_LEVEL_UUID)
print("Battery Level: {0}%".format(int(battery_level[0])))
await client.read_gatt_char(PMD_CONTROL)
await client.write_gatt_char(PMD_CONTROL, ECG_WRITE)
## ECG stream started
await client.start_notify(PMD_DATA, data_conv)
print("Collecting ECG data...")
await aioconsole.ainput('Running: Press a key to quit')
await client.stop_notify(PMD_DATA)
print("Stopping ECG data...", flush=True)
print("[CLOSED] application closed.", flush=True)
sys.exit(0)
While the stream is running and recording, I’d like to plot the data that’s going into the stream without using an await
call in the streaming run
function. My plotting function is defined here:
def live_plotter(x_vec,y1_data,line1,identifier='',pause_time=0.1):
if line1==[]:
# this is the call to matplotlib that allows dynamic plotting
plt.ion()
fig = plt.figure(figsize=(13,6))
ax = fig.add_subplot(111)
# create a variable for the line so we can later update it
line1, = ax.plot(x_vec,y1_data,'-o',alpha=0.8)
#update plot label/title
plt.ylabel('Y Label')
plt.title('Title: {}'.format(identifier))
plt.show()
# after the figure, axis, and line are created, we only need to update the y-data
line1.set_ydata(y1_data)
# adjust limits if new data goes beyond bounds
if np.min(y1_data)<=line1.axes.get_ylim()[0] or np.max(y1_data)>=line1.axes.get_ylim()[1]:
plt.ylim([np.min(y1_data)-np.std(y1_data),np.max(y1_data) np.std(y1_data)])
# this pauses the data so the figure/axis can catch up - the amount of pause can be altered above
plt.pause(pause_time)
# return line so we can update it again in the next iteration
return line1
And I call this plotting function from an async
function that has a forever while
loop:
async def liveplot(ecg):
# use ggplot style for more sophisticated visuals
plt.style.use('ggplot')
size = 100
x_vec = np.linspace(0,1,size 1)[0:-1]
y_vec = np.random.randn(len(x_vec))
line1 = []
while True:
rand_val = np.random.randn(1)
y_vec[-1] = rand_val
line1 = live_plotter(x_vec,y_vec,line1)
y_vec = np.append(ecg,0.0)
Как я должен включить функцию построения графика в сценарий, не влияя на поток? Спасибо!