конфигурация python для SR830 через RS232

#python #matplotlib #pyserial

#python #matplotlib #pyserial

Вопрос:

В настоящее время я работаю над программой, которая пытается воспроизвести функциональность labview, установив соединение с SR830 через RS-232. По некоторым причинам я не могу записывать данные и, похоже, не могу понять, почему. Я попытался поискать в Google, но ничего не нашел. Если кто-нибудь может помочь мне понять, что я делаю неправильно. Вот мой код:

 import serial
import tkinter as tk
from datetime import datetime
import time
import os
import atexit
import threading
import numpy as np
import math
import matplotlib
matplotlib.use('TkAgg')
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
import matplotlib.animation as animation

class SR830:
    '''SR830 Class creates an object that can interface with and control an SR830 via RS-232.
    Class data structure allows the SR830 code to be imported into other scripts via "from SSTR_Serial import SR830"
    This can be used to encapsulate SR830 interface mechanics into a larger control scheme...
    by passing the class a frame to pack itself into as the frame argument.'''
    def command(self, command_string, command_value): #Basic command protocol, can be used as a template for other command functions.
        #Note: queries should have null command_value arguments (command_value = '').
        packet = command_string str(command_value) 'r' #Can be adjusted to include checksums/parity if needed.
        try:
            self.inst.write(packet.encode('ascii'))
        except AttributeError as AE:
            print(AE)

    def create_log(self):
        try: #Makes log file if it does not exist already.
            log_dir = self.name '_logs'
            os.mkdir(log_dir)
        except OSError:#Log file already exists
            pass
        self.log_open = datetime.now() #Var to hold time log was created (Different from when timer is started.)
        date = str(self.log_open).split(' ')[0] #Date that log file was opened.
        h = 0
        a = False
        while not a:
            h  = 1
            log_str = log_dir '/' self.name "_" date '_' str(h) '.txt'
            log_path = os.path.join(os.getcwd(), log_str)
            if os.path.exists(log_path):
                pass
            else:
                a = True
        self.log = open(log_str, "w ")
        self.log.write(self.name " log created on "  str(self.log_open) "nTime tShift (Degrees)tAmplitude (mV)tR ()tTheta ()n") #Writing opening lines.

    def start_graph(self): #Should be threaded at begining. This begins making the graph and recording data when the start/record buttons are pressed.
        def get_vals(self):
            current_time = time.time()#Get current time
            if (current_time - self.previous_time) >= self.sample_time: #Time is equal or passed sampling time.
                time_elapsed = current_time - self.start_time #Total time elapsed since transmission begun.

                #Transmission zone:
                try: #Querying and storing commands below
                    self.command("PHAS?",'')
                    phase_shift = self.inst.read(5).decode('ascii').replace('r','')
                    self.command('SLVL?','')
                    phase_amplitude = self.inst.read(5).decode('ascii').replace('r','')
                    self.shifts = np.append(self.shifts,[phase_shift]); self.amplitudes = np.append(self.amplitudes,[phase_amplitude]); self.times = np.append(self.times,[time_elapsed]) #Will only append time if both other arrays are appended.
                    #^ Append on one line to keep all lengths consistant when switching threads.
                    #Variables that are queried but not logged.
                    self.command('OEXP?1','')
                    R = self.inst.read(5).decode('ascii').replace('r','')
                    self.command('OEXP?2','')
                    Theta = self.inst.read(5).decode('ascii').replace('r','')
                except AttributeError as AE:#Exceptions for testing without connection to device
                    print(AE)

                '''
                #This area generates random sine waves. Only use for testing without connection to actual device.---------
                self.times = np.append(self.times,[len(self.times)])
                self.shifts = np.append(self.shifts,[math.sin(len(self.times))])
                self.amplitudes = np.append(self.amplitudes, [5*math.cos(len(self.times))])
                phase_shift = 5
                phase_amplitude = 7
                R = 10
                Theta = 11
                print(str(len(self.times)))
                print(str(len(self.shifts)))
                print(str(len(self.amplitudes)))
                #End test zone ---------------------------------------------------------------------------------------
                '''

                #Remove extra elements to reduce memeory usage for long runs
                while len(self.times) > 200: #Integer on RHS of inequality is max number of elements.
                    self.shifts = self.shifts[2:-1:1]; self.amplitudes = self.amplitudes[2:-1:1]; self.times=self.times[2:-1:1]

                #Log writing zone:
                if self.recording is True: #Updates log, otherwise only updates graph.
                    self.log.write(str(round(time_elapsed-self.pause_time,4)) 't' str(phase_shift) 'tt' str(phase_amplitude) 'tt' str(R) 't' str(Theta) 'n')
                self.previous_time = current_time #Resets time value for calculating interval.

        while self.running: #Loops as long as running is true and kills thread once runing is false.
            get_vals(self)

    def animate(self, i): #Only redraw matplotlib aspects that get cleared with a.clear() and a2.clear() to conserve memory.
    #Todo - optimize drawing order
        try:
            self.a.clear()
            self.a.plot(self.times,self.shifts, label='Phase shift ()', color='tab:blue')
            self.a.set_ylabel('Shift')
            self.a.set_ylabel('Shift', color='tab:blue')
            self.a.set_xlabel('Time (s)')
            self.a.legend(loc='upper left')

            self.a2.clear()
            self.a2.plot(self.times,self.amplitudes,label='Phase amplitude ()',color='tab:red')
            self.a2.set_ylabel('Amplitude', color='tab:red')
            self.a2.legend(loc='upper right')

            self.a.get_yaxis_transform()#Fits axis to widget area.
        except ValueError as VE:
            print(VE)

    def clear_graph(self): #Clears all arrays to make graph reset. Does not erase logged data.
        self.times = np.array([])
        self.shifts = np.array([])
        self.amplitudes = np.array([])

    def startstop(self): #Flips button to begin or stop graphing.
        if self.running is False: #Recording and graphing is off. Calling function will turn it on and activate the following lines
            self.running = True
            self.clear_graph() #Clears last run.
            self.run_btn.config(text='Stop', bg='red')
            #self.start_graph()
            self.sample_time = float(self.sample_box.get())
            self.pause_time = 0 #Resets pause adjustment time.
            self.start_time = time.time() #Resets start time. TODO-Add adjustments for pauses
            self.pause_point = time.time() #Last time that pause activated. Used to calculate pause_time later.
            self.graph_thread = threading.Thread(target=self.start_graph)
            self.graph_thread.start()
        else:  #graphing is on. Calling function will turn it off and activate the following line
            self.running = False
            self.run_btn.config(text='Start',bg='green')
            self.recording=False
            self.record_btn.config(text='Start recording', bg="green")
            try: #Shuts down log file if it exists.
                self.log.close()
            except AttributeError as AE:
                print(AE)

    def startstop_record(self): #Flips recording button.
        if self.running is False:
            self.startstop() #Makes sure data is being transmitted before recording starts.

        if self.recording is False: #Device is running, but not recording. Command turns recording on.
            self.record_btn.config(text='Pause recording', bg="yellow")
            try:
                self.log.write('') #Attempts to write an empty string to test if file is open.
            except AttributeError: #File has not been created -> run function to open a new log.
                self.create_log()
            except ValueError:  #Previous log file has been closed -> run function to open new log.
                self.create_log()
            self.recording = True
            self.pause_time  = time.time() - self.pause_point #Adjusts for
        else: #Device is running AND recording. Command pauses recording.
            self.recording = False
            self.pause_point = time.time()
            self.record_btn.config(text='Start recording', bg="green")

    def __init__(self, name, frame):
        self.name = name
        self.running = False
        self.recording = False
        #Read config -----------------------------------------------------------------------
        config = open('SR830 config.txt','w ')

        try:
            config_read = config.readlines()
            config_read = config_read[1].split('t')
        except IndexError: #config coudl not be read - creates default file.
            config.write('COM Baudratettimeouttsampling timen10t9600t0.1t1')
            config.close()
            config = open('SR830 config.txt','r')
            config_read = config.readlines()
            config_read = config_read[1].split('t')

        com = config_read[0]
        baud = int(config_read[1])
        self.config_timeout = float(config_read[2])
        self.sample_time = float(config_read[3])
        config.close()
        #establish communication -----------------------------------------------------------
        com = 'COM' str(com) #TODO- make this read from config
        try:
            SR_inst = serial.Serial(port=com, baudrate=baud, timeout=self.config_timeout) #Opens communication.
            #Note: SR830 has adjustablebaud rate and parity, but defaults to 9600 and none, respectively.

            self.inst = SR_inst
            self.command('OUTX',0) #Tells device to output responses via RS-232.
            print(self.inst.read(5)) #Prints response to OUTX command.

            #self.command('OUTX?','') #TEST - see if communication has switched.
            #print(self.inst.read(10)) #Should print b'0r' for RS-232 communication.

            self.command('DDEF',110)#Sets CH1 to R.
            print(self.inst.read(5))
            self.command('DDEF',210)#Sets CH2 to Theta.
            print(self.inst.read(5))

        except ValueError as ve:
            SR_inst = 0
            print(ve)
        except AttributeError as ae:
            SR_inst = 0
            print (ae)
        except NameError as ne:
            print(ne)
            SR_inst = 0
        except serial.SerialException as SE:
            SR_inst = 0
            print(SE)
        #Create Tkinter GUI------------------------------------------------------------------
        id_label=tk.Label(frame, text=name)
        id_label.grid(row=0,column=0, sticky=tk.N tk.S tk.E tk.W)
        version_label = tk.Label(frame, text = 'Version 0.1')
        version_label.grid(row=0, column=1, sticky=tk.N tk.S tk.E tk.W)

        sample_label = tk.Label(frame, text="Sampling time =")
        sample_label.grid(row=1,column=0,sticky=tk.N tk.S tk.E tk.W)
        self.sample_box = tk.Entry(frame)
        self.sample_box.insert(tk.END, self.sample_time)
        self.sample_box.grid(row=1,column=1,sticky=tk.N tk.S tk.E tk.W)

        self.run_btn = tk.Button(frame, text='Start', command = self.startstop)
        self.run_btn.grid(row=2,column=0,sticky=tk.N tk.S tk.E tk.W)
        self.run_btn.config(bg="green")
        self.record_btn = tk.Button(frame, text='Start recording', command = self.startstop_record)
        self.record_btn.grid(row=2,column=1,sticky=tk.N tk.S tk.E tk.W)
        self.record_btn.config(bg="green")

        #Graph setup:
        self.f = Figure(figsize=(5,5),dpi=100) #Figure that graphs appears in
        self.a = self.f.add_subplot() #111 means there is only 1 chart. use a.plot to plot lists.
        self.a2 = self.a.twinx()

        self.graph_canvas = FigureCanvasTkAgg(self.f, frame)
        self.graph_canvas.get_tk_widget().grid(row=3, column=0,columnspan=2)

        #Make grid resizeable
        for row in range(4): #Number of rows
            try:
                tk.Grid.rowconfigure(frame,row,weight=2)
            except AttributeError as AE: #FOr unfilled rows amp; columns
                print(AE)
        for column in range(2): #Number of columns
            try:
                #root.grid_columnconfigure(column, weight=1)
                tk.Grid.columnconfigure(frame,column,weight=1)
            except AttributeError as AE: #For unfilled rows amp; columns
                print(AE)

        #Extra declarations
        self.sample_time = float(self.sample_box.get())-2*self.config_timeout #Adjusts for timeout of device.
        self.times = np.array([]) #Holds time values queried (May not be recorded)
        self.shifts = np.array([])#Holds shift values queried.
        self.amplitudes = np.array([])#Holds amplitude values queried.
        #self.start_time = time.time() #TODO - Move this somewhere better
        self.previous_time = 0
        time.sleep(1)
        self.ani = animation.FuncAnimation(self.f,self.animate, interval=1000)
    def SR830_exit(self): #Exit handler.
        self.running = False
        self.recording = False
        try:
            self.inst.close()
        except AttributeError as AE:
            print(AE)
        root.destroy()
        exit()

#TEST AREA: ------------------------------------------------------------------------------
# Create tk root -------------------
root = tk.Tk()
root.title('SR830')
frame1 = tk.Frame(root)
frame1.pack()
#Create instrument object ---------
sr1 = SR830("SSTR", frame1) #TODO - remove once testing is done.
atexit.register(sr1.SR830_exit)

root.protocol('WM_DELETE_WINDOW', sr1.SR830_exit)
root.mainloop()
 

Комментарии:

1. Пожалуйста, сократите свой код до того, что необходимо для демонстрации проблемы.

2. Я не совсем уверен, в чем проблема, поэтому я вставил весь код. Но я думаю, что причина, по которой я не могу получать данные, заключается в том, что соединение установлено неправильно