#python #raspberry-pi #multiprocessing
#python #raspberry-pi #многопроцессорная обработка
Вопрос:
Я работаю на python над Raspberry pi. Я пытаюсь отправить сигнал на контроллер двигателя, а затем получить сигнал с помощью чувствительной шляпы после того, как он пройдет через мою установку (в данном случае RC-фильтр).
Важно то, что я хочу генерировать выходные данные и считывать входные данные как можно ближе к одновременному. Я надеялся использовать многопроцессорную обработку, чтобы один поток отправлял сигнал, в то время как другой считывал входящий сигнал. Но я продолжаю путаться в том, как потоки работают в python.
Короче говоря, возможно ли выполнять 2 разные задачи с многопроцессорной обработкой, а затем повторять эти задачи (отправка и чтение сигнала) до тех пор, пока не будет выполнено условие. (как в цикле while)
(Отредактировано с помощью кода)
from __future__ import print_function
from PyQt5.QtWidgets import QAction
from pyqtgraph.Qt import QtGui, QtCore
from adafruit_motorkit import MotorKit
import pyqtgraph as pg
import sys
from sys import stdout
import numpy as np
from daqhats import mcc118, OptionFlags, HatIDs, HatError
from daqhats_utils import select_hat_device, enum_mask_to_string,
chan_list_to_mask
from decimal import *
import math
import time
getcontext().prec = 3
total_samples_read = 0
READ_ALL_AVAILABLE = -1
channelData = np.zeros(4, dtype=float)
CURSOR_BACK_2 = 'x1b[2D'
ERASE_TO_END_OF_LINE = 'x1b[0K'
# for plotting data
########################################
scan_rate = 1000 # scan rate in hz
maxtime = 30 # second s to run for
Datatime = np.zeros(maxtime * scan_rate, dtype=float)#List of times when smaples are taken
Data1 = np.zeros(maxtime * scan_rate, dtype=float) #sampels taken
data_index = 0 # Maximum index of data points taken
dt = Decimal(1 / scan_rate) # difference in time between indexes of Datatime
display_index = 0 # maximum index of Data being displayed on plot
#################################
# variables for Data logger
##########################
is_scanning = False
channels = [0]
channel_mask = chan_list_to_mask(channels)
num_channels = len(channels)
samples_per_channel = 0
options = OptionFlags.CONTINUOUS
######################################
startedTime = 0 # time at program start
myTime = 0 # time since program started
try:
address = select_hat_device(HatIDs.MCC_118)
hat = mcc118(address)
except (HatError, ValueError) as err:
print('n', err)
class MainWindow(pg.GraphicsWindow):
def __init__(self, *args, **kwargs):
super(pg.GraphicsWindow, self).__init__(*args, **kwargs)
self.delay = 30 #ms
self.quit = QAction("Quit", self)
self.quit.triggered.connect(self.clean_close)
self.timer = QtCore.QTimer()
self.timer.setInterval(self.delay)
self.timer.timeout.connect(self.update_plot)
# plots data and runs calibrate between trials
def update_plot(self):
global display_index, Datatime, Data1
kit.motor1.throttle = .4 .2 * math.cos((time.time()-startedTime)* 2 * np.pi* 1) # 1hz sinusiod out of motor
if data_index < len(Data1):
Collect_Data()
plot.setXRange(0, 20, padding=0)
plot.setXRange(0, 20, padding=0)
curve.setData(Datatime[:display_index], Data1[:display_index])
display_index = 1
app.processEvents()
def clean_close(self):
self.close()
# starts data collection
def Collect_Data():
global is_scanning
"""
This function is executed automatically when the module is run directly.
"""
# Store the channels in a list and convert the list to a channel mask that
# can be passed as a parameter to the MCC 118 functions.
try:
# Select an MCC 118 HAT device to use.
# actual_scan_rate = hat.a_in_scan_actual_rate(num_channels, scan_rate)
# Configure and start the scan.
# Since the continuous option is being used, the samples_per_channel
# parameter is ignored if the value is less than the default internal
# buffer size (10000 * num_channels in this case). If a larger internal
# buffer size is desired, set the value of this parameter accordingly.
if not is_scanning:
hat.a_in_scan_start(channel_mask, samples_per_channel, scan_rate,
options)
is_scanning = True
try:
read_and_display_data(hat, num_channels)
except KeyboardInterrupt:
# Clear the '^C' from the display.
print(CURSOR_BACK_2, ERASE_TO_END_OF_LINE, 'n')
print('Stopping')
hat.a_in_scan_stop()
hat.a_in_scan_cleanup()
except (HatError, ValueError) as err:
print('n', err)
# reads Data off of Hat and adds to Data1
def read_and_display_data(hat, num_channels):
global channelData, data_index, Datatime, Data1
total_samples_read = 0
read_request_size = READ_ALL_AVAILABLE
# When doing a continuous scan, the timeout value will be ignored in the
# call to a_in_scan_read because we will be requesting that all available
# samples (up to the default buffer size) be returned.
timeout = 5.0
# Read all of the available samples (up to the size of the read_buffer which
# is specified by the user_buffer_size). Since the read_request_size is set
# to -1 (READ_ALL_AVAILABLE), this function returns immediately with
# whatever samples are available (up to user_buffer_size) and the timeout
# parameter is ignored.
trigger = True
while trigger == True:
read_result = hat.a_in_scan_read(read_request_size, timeout)
# Check for an overrun error
if read_result.hardware_overrun:
print('nnHardware overrunn')
break
elif read_result.buffer_overrun:
print('nnBuffer overrunn')
break
samples_read_per_channel = int(len(read_result.data) / num_channels)
total_samples_read = samples_read_per_channel
# adds all data in buffer to data to be plotted.
count = 0
if samples_read_per_channel > 0:
index = samples_read_per_channel * num_channels - num_channels
while count < samples_read_per_channel:
for i in range(num_channels):
channelData[i] = read_result.data[index i]
if data_index < len(Data1):
Data1[data_index] = channelData[0]
Datatime[data_index] = float(dt * Decimal(data_index))
data_index = 1
count = 1
trigger = False
stdout.flush()
if __name__ == '__main__':
app = QtGui.QApplication([])
win = MainWindow() # display window
plot = win.addPlot(1, 0)
curve = plot.plot()
win.show()
kit = MotorKit() # implements motor driver
kit.motor1.throttle = .4 # values 1 is 5v and 0 is 0 volts
startedTime = time.time()
# u = .2*math.cos(t * 2*np.pi*1)
win.timer.start()
sys.exit(app.exec_())
Комментарии:
1. Какой сигнал вы имеете в виду, пожалуйста? Аудио? Свет? Программный сигнал, подобный семафору? Как долго сигнал остается включенным? 1 миллисекунда или 4 минуты? Как часто вам нужно его пробовать? Отправляется ли он и принимается на одном и том же Raspberry Pi?
2. Сигнал представляет собой просто синусоидальное напряжение, генерируемое контроллером двигателя, и мне, скорее всего, нужно отправить его на срок до нескольких минут. Шляпа имеет свою собственную частоту дискретизации и будет отбирать ее с любой установленной мной частотой (на данный момент я использую 1 кГц) Мне просто нужно постоянно проверять шляпу и сохранять образцы, которые она получает, в массиве numpy. Итак, короче говоря, задача 1: обновить синусоиду, задача 2: извлечь считанные значения из датчика.
3. Я все еще не понимаю, извини. Пожалуйста, покажите свой минимальный код для отправки нескольких выборок / точек вашего сигнала. Также, пожалуйста, покажите свой минимальный код для получения некоторых значений от вашего датчика.
4. Все еще борюсь:-(Теперь я вижу много графического интерфейса Qt и графики, но ваш вопрос, похоже, касался генерации сигнала (который я вообще не вижу) и чтения образца (который я вижу, но не понимаю), и выполнения этих двух действий в тесно синхронизированномспособ. Пожалуйста, более четко определите, какие два пункта в вашем коде вы пытаетесь сохранить близко друг к другу. Спасибо.
5. Ваш импорт очень запутанный, и существует множество переменных уровня модуля, которые не являются константами. Я бы посоветовал очистить обе эти области, прежде чем беспокоиться о потоковой обработке (если вы хотите более тщательный обзор кода, отправьте его по адресу codereview.stackexchange.com )