#python #matplotlib #pyserial
#python #matplotlib #pyserial
Вопрос:
В настоящее время я работаю над программой, которая пытается воспроизвести функциональность labview, установив соединение с SR830 через RS-232. По некоторым причинам я не могу записывать данные и, похоже, не могу понять, почему. Я попытался поискать в Google, но ничего не нашел. Если кто-нибудь может помочь мне понять, что я делаю неправильно. Вот мой код:
import serial
import tkinter as tk
from datetime import datetime
import time
import os
import atexit
import threading
import numpy as np
import math
import matplotlib
matplotlib.use('TkAgg')
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
import matplotlib.animation as animation
class SR830:
'''SR830 Class creates an object that can interface with and control an SR830 via RS-232.
Class data structure allows the SR830 code to be imported into other scripts via "from SSTR_Serial import SR830"
This can be used to encapsulate SR830 interface mechanics into a larger control scheme...
by passing the class a frame to pack itself into as the frame argument.'''
def command(self, command_string, command_value): #Basic command protocol, can be used as a template for other command functions.
#Note: queries should have null command_value arguments (command_value = '').
packet = command_string str(command_value) 'r' #Can be adjusted to include checksums/parity if needed.
try:
self.inst.write(packet.encode('ascii'))
except AttributeError as AE:
print(AE)
def create_log(self):
try: #Makes log file if it does not exist already.
log_dir = self.name '_logs'
os.mkdir(log_dir)
except OSError:#Log file already exists
pass
self.log_open = datetime.now() #Var to hold time log was created (Different from when timer is started.)
date = str(self.log_open).split(' ')[0] #Date that log file was opened.
h = 0
a = False
while not a:
h = 1
log_str = log_dir '/' self.name "_" date '_' str(h) '.txt'
log_path = os.path.join(os.getcwd(), log_str)
if os.path.exists(log_path):
pass
else:
a = True
self.log = open(log_str, "w ")
self.log.write(self.name " log created on " str(self.log_open) "nTime tShift (Degrees)tAmplitude (mV)tR ()tTheta ()n") #Writing opening lines.
def start_graph(self): #Should be threaded at begining. This begins making the graph and recording data when the start/record buttons are pressed.
def get_vals(self):
current_time = time.time()#Get current time
if (current_time - self.previous_time) >= self.sample_time: #Time is equal or passed sampling time.
time_elapsed = current_time - self.start_time #Total time elapsed since transmission begun.
#Transmission zone:
try: #Querying and storing commands below
self.command("PHAS?",'')
phase_shift = self.inst.read(5).decode('ascii').replace('r','')
self.command('SLVL?','')
phase_amplitude = self.inst.read(5).decode('ascii').replace('r','')
self.shifts = np.append(self.shifts,[phase_shift]); self.amplitudes = np.append(self.amplitudes,[phase_amplitude]); self.times = np.append(self.times,[time_elapsed]) #Will only append time if both other arrays are appended.
#^ Append on one line to keep all lengths consistant when switching threads.
#Variables that are queried but not logged.
self.command('OEXP?1','')
R = self.inst.read(5).decode('ascii').replace('r','')
self.command('OEXP?2','')
Theta = self.inst.read(5).decode('ascii').replace('r','')
except AttributeError as AE:#Exceptions for testing without connection to device
print(AE)
'''
#This area generates random sine waves. Only use for testing without connection to actual device.---------
self.times = np.append(self.times,[len(self.times)])
self.shifts = np.append(self.shifts,[math.sin(len(self.times))])
self.amplitudes = np.append(self.amplitudes, [5*math.cos(len(self.times))])
phase_shift = 5
phase_amplitude = 7
R = 10
Theta = 11
print(str(len(self.times)))
print(str(len(self.shifts)))
print(str(len(self.amplitudes)))
#End test zone ---------------------------------------------------------------------------------------
'''
#Remove extra elements to reduce memeory usage for long runs
while len(self.times) > 200: #Integer on RHS of inequality is max number of elements.
self.shifts = self.shifts[2:-1:1]; self.amplitudes = self.amplitudes[2:-1:1]; self.times=self.times[2:-1:1]
#Log writing zone:
if self.recording is True: #Updates log, otherwise only updates graph.
self.log.write(str(round(time_elapsed-self.pause_time,4)) 't' str(phase_shift) 'tt' str(phase_amplitude) 'tt' str(R) 't' str(Theta) 'n')
self.previous_time = current_time #Resets time value for calculating interval.
while self.running: #Loops as long as running is true and kills thread once runing is false.
get_vals(self)
def animate(self, i): #Only redraw matplotlib aspects that get cleared with a.clear() and a2.clear() to conserve memory.
#Todo - optimize drawing order
try:
self.a.clear()
self.a.plot(self.times,self.shifts, label='Phase shift ()', color='tab:blue')
self.a.set_ylabel('Shift')
self.a.set_ylabel('Shift', color='tab:blue')
self.a.set_xlabel('Time (s)')
self.a.legend(loc='upper left')
self.a2.clear()
self.a2.plot(self.times,self.amplitudes,label='Phase amplitude ()',color='tab:red')
self.a2.set_ylabel('Amplitude', color='tab:red')
self.a2.legend(loc='upper right')
self.a.get_yaxis_transform()#Fits axis to widget area.
except ValueError as VE:
print(VE)
def clear_graph(self): #Clears all arrays to make graph reset. Does not erase logged data.
self.times = np.array([])
self.shifts = np.array([])
self.amplitudes = np.array([])
def startstop(self): #Flips button to begin or stop graphing.
if self.running is False: #Recording and graphing is off. Calling function will turn it on and activate the following lines
self.running = True
self.clear_graph() #Clears last run.
self.run_btn.config(text='Stop', bg='red')
#self.start_graph()
self.sample_time = float(self.sample_box.get())
self.pause_time = 0 #Resets pause adjustment time.
self.start_time = time.time() #Resets start time. TODO-Add adjustments for pauses
self.pause_point = time.time() #Last time that pause activated. Used to calculate pause_time later.
self.graph_thread = threading.Thread(target=self.start_graph)
self.graph_thread.start()
else: #graphing is on. Calling function will turn it off and activate the following line
self.running = False
self.run_btn.config(text='Start',bg='green')
self.recording=False
self.record_btn.config(text='Start recording', bg="green")
try: #Shuts down log file if it exists.
self.log.close()
except AttributeError as AE:
print(AE)
def startstop_record(self): #Flips recording button.
if self.running is False:
self.startstop() #Makes sure data is being transmitted before recording starts.
if self.recording is False: #Device is running, but not recording. Command turns recording on.
self.record_btn.config(text='Pause recording', bg="yellow")
try:
self.log.write('') #Attempts to write an empty string to test if file is open.
except AttributeError: #File has not been created -> run function to open a new log.
self.create_log()
except ValueError: #Previous log file has been closed -> run function to open new log.
self.create_log()
self.recording = True
self.pause_time = time.time() - self.pause_point #Adjusts for
else: #Device is running AND recording. Command pauses recording.
self.recording = False
self.pause_point = time.time()
self.record_btn.config(text='Start recording', bg="green")
def __init__(self, name, frame):
self.name = name
self.running = False
self.recording = False
#Read config -----------------------------------------------------------------------
config = open('SR830 config.txt','w ')
try:
config_read = config.readlines()
config_read = config_read[1].split('t')
except IndexError: #config coudl not be read - creates default file.
config.write('COM Baudratettimeouttsampling timen10t9600t0.1t1')
config.close()
config = open('SR830 config.txt','r')
config_read = config.readlines()
config_read = config_read[1].split('t')
com = config_read[0]
baud = int(config_read[1])
self.config_timeout = float(config_read[2])
self.sample_time = float(config_read[3])
config.close()
#establish communication -----------------------------------------------------------
com = 'COM' str(com) #TODO- make this read from config
try:
SR_inst = serial.Serial(port=com, baudrate=baud, timeout=self.config_timeout) #Opens communication.
#Note: SR830 has adjustablebaud rate and parity, but defaults to 9600 and none, respectively.
self.inst = SR_inst
self.command('OUTX',0) #Tells device to output responses via RS-232.
print(self.inst.read(5)) #Prints response to OUTX command.
#self.command('OUTX?','') #TEST - see if communication has switched.
#print(self.inst.read(10)) #Should print b'0r' for RS-232 communication.
self.command('DDEF',110)#Sets CH1 to R.
print(self.inst.read(5))
self.command('DDEF',210)#Sets CH2 to Theta.
print(self.inst.read(5))
except ValueError as ve:
SR_inst = 0
print(ve)
except AttributeError as ae:
SR_inst = 0
print (ae)
except NameError as ne:
print(ne)
SR_inst = 0
except serial.SerialException as SE:
SR_inst = 0
print(SE)
#Create Tkinter GUI------------------------------------------------------------------
id_label=tk.Label(frame, text=name)
id_label.grid(row=0,column=0, sticky=tk.N tk.S tk.E tk.W)
version_label = tk.Label(frame, text = 'Version 0.1')
version_label.grid(row=0, column=1, sticky=tk.N tk.S tk.E tk.W)
sample_label = tk.Label(frame, text="Sampling time =")
sample_label.grid(row=1,column=0,sticky=tk.N tk.S tk.E tk.W)
self.sample_box = tk.Entry(frame)
self.sample_box.insert(tk.END, self.sample_time)
self.sample_box.grid(row=1,column=1,sticky=tk.N tk.S tk.E tk.W)
self.run_btn = tk.Button(frame, text='Start', command = self.startstop)
self.run_btn.grid(row=2,column=0,sticky=tk.N tk.S tk.E tk.W)
self.run_btn.config(bg="green")
self.record_btn = tk.Button(frame, text='Start recording', command = self.startstop_record)
self.record_btn.grid(row=2,column=1,sticky=tk.N tk.S tk.E tk.W)
self.record_btn.config(bg="green")
#Graph setup:
self.f = Figure(figsize=(5,5),dpi=100) #Figure that graphs appears in
self.a = self.f.add_subplot() #111 means there is only 1 chart. use a.plot to plot lists.
self.a2 = self.a.twinx()
self.graph_canvas = FigureCanvasTkAgg(self.f, frame)
self.graph_canvas.get_tk_widget().grid(row=3, column=0,columnspan=2)
#Make grid resizeable
for row in range(4): #Number of rows
try:
tk.Grid.rowconfigure(frame,row,weight=2)
except AttributeError as AE: #FOr unfilled rows amp; columns
print(AE)
for column in range(2): #Number of columns
try:
#root.grid_columnconfigure(column, weight=1)
tk.Grid.columnconfigure(frame,column,weight=1)
except AttributeError as AE: #For unfilled rows amp; columns
print(AE)
#Extra declarations
self.sample_time = float(self.sample_box.get())-2*self.config_timeout #Adjusts for timeout of device.
self.times = np.array([]) #Holds time values queried (May not be recorded)
self.shifts = np.array([])#Holds shift values queried.
self.amplitudes = np.array([])#Holds amplitude values queried.
#self.start_time = time.time() #TODO - Move this somewhere better
self.previous_time = 0
time.sleep(1)
self.ani = animation.FuncAnimation(self.f,self.animate, interval=1000)
def SR830_exit(self): #Exit handler.
self.running = False
self.recording = False
try:
self.inst.close()
except AttributeError as AE:
print(AE)
root.destroy()
exit()
#TEST AREA: ------------------------------------------------------------------------------
# Create tk root -------------------
root = tk.Tk()
root.title('SR830')
frame1 = tk.Frame(root)
frame1.pack()
#Create instrument object ---------
sr1 = SR830("SSTR", frame1) #TODO - remove once testing is done.
atexit.register(sr1.SR830_exit)
root.protocol('WM_DELETE_WINDOW', sr1.SR830_exit)
root.mainloop()
Комментарии:
1. Пожалуйста, сократите свой код до того, что необходимо для демонстрации проблемы.
2. Я не совсем уверен, в чем проблема, поэтому я вставил весь код. Но я думаю, что причина, по которой я не могу получать данные, заключается в том, что соединение установлено неправильно