Потоковая передача данных по Bluetooth в розетку и одновременное вычисление/построение графиков с помощью asyncio и matplotlib

#python-3.x #matplotlib #python-asyncio

Вопрос:

Я собираю и передаю необработанные данные ЭКГ с Polar H10 в LSL розетку, используя код, адаптированный отсюда. Я записываю этот LSL поток Matlab для ретроспективной обработки данных, как только поток будет завершен. Это использует asyncio библиотеку и хорошо работает.

Однако, в дополнение к потоковой передаче и записи данных ЭКГ через LSL и Matlab , я хотел бы одновременно предварительно обработать/построить график потоковых данных, используя matplotlib их в режиме, близком к реальному времени, как в этом примере.

Проблема в том, что я не уверен, как выполнить предварительную обработку и построение графика в реальном времени, не вызывая задержки в LSL потоке. Мой наивысший приоритет-поддержание точности и точности во времени потоковой передачи/записи LSL . Функция предварительной обработки/построения графика не обязательно должна быть такой же «в реальном времени», как поток.

Моя последняя попытка сделать это с помощью asyncio ниже, но все попытки не увенчались успехом как для потоковой передачи, так и для построения графика. Любая помощь будет признательна.

Это часть настройки сценария:

 from pylsl import StreamInfo, StreamOutlet
import asyncio
import aioconsole 
import os
import signal
import sys
import getopt
import math
import time
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib

from bleak import BleakClient
from bleak.uuids import uuid16_dict

""" Predefined UUID (Universal Unique Identifier) mappings are based on Heart Rate GATT service Protocol that most
Fitness/Heart Rate device manufacturers follow (Polar H10 in this case) to obtain a specific response input from 
the device acting as an API """

uuid16_dict = {v: k for k, v in uuid16_dict.items()}

## This is the device MAC ID (WIN) or CFD (MAC), please update with your device ID
ADDRESS = "3E44F50E-A858-4CC2-BF9F-461FC6D25A93"
STREAMNAME = 'PolarBand'

## UUID for model number ##
MODEL_NBR_UUID = "0000{0:x}-0000-1000-8000-00805f9b34fb".format(
    uuid16_dict.get("Model Number String")
)

## UUID for manufacturer name ##
MANUFACTURER_NAME_UUID = "0000{0:x}-0000-1000-8000-00805f9b34fb".format(
    uuid16_dict.get("Manufacturer Name String")
)

## UUID for battery level ##
BATTERY_LEVEL_UUID = "0000{0:x}-0000-1000-8000-00805f9b34fb".format(
    uuid16_dict.get("Battery Level")
)

## UUID for connection establsihment with device ##
PMD_SERVICE = "FB005C80-02E7-F387-1CAD-8ACD2D8DF0C8"

## UUID for Request of stream settings ##
PMD_CONTROL = "FB005C81-02E7-F387-1CAD-8ACD2D8DF0C8"

## UUID for Request of start stream ##
PMD_DATA = "FB005C82-02E7-F387-1CAD-8ACD2D8DF0C8"

## UUID for Request of ECG Stream ##
ECG_WRITE = bytearray([0x02, 0x00, 0x00, 0x01, 0x82, 0x00, 0x01, 0x01, 0x0E, 0x00])

## For Polar H10  sampling frequency ##
ECG_SAMPLING_FREQ = 130

# for plotting using python
ecg_session_data = []
ecg_session_time = []

# for streaming to LSL
OUTLET = []
 

Здесь я определяю свой LSL поток:

 # define LSL stream
def StartStream(STREAMNAME):

    info = StreamInfo(STREAMNAME, 'ECG', 1,ECG_SAMPLING_FREQ, 'float32', 'myuid2424')

    info.desc().append_child_value("manufacturer", "Polar")
    channels = info.desc().append_child("channels")
    for c in ["ECG"]:
        channels.append_child("channel")
            .append_child_value("name", c)
            .append_child_value("unit", "microvolts")
            .append_child_value("type", "ECG")
    
    # next make an outlet; we set the transmission chunk size to 74 samples and
    # the outgoing buffer size to 360 seconds (max.)
    return StreamOutlet(info, 74, 360)

## Bit conversion of the Hexadecimal stream
def data_conv(sender, data):
    if data[0] == 0x00:
        timestamp = convert_to_unsigned_long(data, 1, 8)
        step = 3
        samples = data[10:]
        offset = 0
        while offset < len(samples):
            ecg = convert_array_to_signed_int(samples, offset, step)
            offset  = step
            OUTLET.push_sample([ecg])
            ecg_session_data.extend([ecg])
            ecg_session_time.extend([timestamp])

def convert_array_to_signed_int(data, offset, length):
    return int.from_bytes(
        bytearray(data[offset : offset   length]), byteorder="little", signed=True,
    )

def convert_to_unsigned_long(data, offset, length):
    return int.from_bytes(
        bytearray(data[offset : offset   length]), byteorder="little", signed=False,
    )
 

This kicks off the script by creating the event loop and instructing to run until all the tasks in the main function are complete:

 if __name__ == "__main__":
    OUTLET = StartStream(STREAMNAME)
    os.environ["PYTHONASYNCIODEBUG"] = str(1)
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(main(ADDRESS, OUTLET))
 

main function creates the streaming data task run using ensure_future and kicks off the event loop:

 async def main(ADDRESS, OUTLET):
    try:
        async with BleakClient(ADDRESS) as client:
            tasks = [
                asyncio.ensure_future(run(client, True)),
            ]

            await asyncio.gather(*tasks)
    except:
        pass
 

run task pipes data from Polar H10 to LSL stream:

 ## Aynchronous task to start the data stream for ECG ##
async def run(client, debug=False):

    print("---------Looking for Device------------ ", flush=True)

    await client.is_connected()
    print("---------Device connected--------------")

    model_number = await client.read_gatt_char(MODEL_NBR_UUID)
    print("Model Number: {0}".format("".join(map(chr, model_number))))

    manufacturer_name = await client.read_gatt_char(MANUFACTURER_NAME_UUID)
    print("Manufacturer Name: {0}".format("".join(map(chr, manufacturer_name))))

    battery_level = await client.read_gatt_char(BATTERY_LEVEL_UUID)
    print("Battery Level: {0}%".format(int(battery_level[0])))

    await client.read_gatt_char(PMD_CONTROL)

    await client.write_gatt_char(PMD_CONTROL, ECG_WRITE)

    ## ECG stream started
    await client.start_notify(PMD_DATA, data_conv)

    print("Collecting ECG data...")

    await aioconsole.ainput('Running: Press a key to quit')
    await client.stop_notify(PMD_DATA)
    print("Stopping ECG data...", flush=True)
    print("[CLOSED] application closed.", flush=True)
    sys.exit(0)
 

While the stream is running and recording, I’d like to plot the data that’s going into the stream without using an await call in the streaming run function. My plotting function is defined here:

 def live_plotter(x_vec,y1_data,line1,identifier='',pause_time=0.1):
    if line1==[]:
        # this is the call to matplotlib that allows dynamic plotting
        plt.ion()
        fig = plt.figure(figsize=(13,6))
        ax = fig.add_subplot(111)
        # create a variable for the line so we can later update it
        line1, = ax.plot(x_vec,y1_data,'-o',alpha=0.8)        
        #update plot label/title
        plt.ylabel('Y Label')
        plt.title('Title: {}'.format(identifier))
        plt.show()
    
    # after the figure, axis, and line are created, we only need to update the y-data
    line1.set_ydata(y1_data)
    # adjust limits if new data goes beyond bounds
    if np.min(y1_data)<=line1.axes.get_ylim()[0] or np.max(y1_data)>=line1.axes.get_ylim()[1]:
        plt.ylim([np.min(y1_data)-np.std(y1_data),np.max(y1_data) np.std(y1_data)])
    # this pauses the data so the figure/axis can catch up - the amount of pause can be altered above
    plt.pause(pause_time)
    
    # return line so we can update it again in the next iteration
    return line1
 

And I call this plotting function from an async function that has a forever while loop:

 async def liveplot(ecg):
    # use ggplot style for more sophisticated visuals
    plt.style.use('ggplot')
    
    size = 100
    x_vec = np.linspace(0,1,size 1)[0:-1]
    y_vec = np.random.randn(len(x_vec))
    line1 = []
    
    while True:
        rand_val = np.random.randn(1)
        y_vec[-1] = rand_val
        line1 = live_plotter(x_vec,y_vec,line1)
        y_vec = np.append(ecg,0.0)
 

Как я должен включить функцию построения графика в сценарий, не влияя на поток? Спасибо!