from __future__ import print_function import wx import gui import aioserial from xmodem import XMODEM import threading import io #import wxmplot from datetime import datetime import locale import json import serial.tools.list_ports import shlex import parse import os import dfudfuse import firmware import cli import cli_bt import traceback import asyncio class console( gui.consoleDialog ): def __init__( self, parent, cli): #initialize parent class gui.consoleDialog.__init__(self,parent) self.cli = cli def onConsoleClose( self, event ): self.cli.command(b'stop\n', None) event.Skip() class frame( gui.mainFrame ): #constructor def __init__( self, parent ): #initialize parent class gui.mainFrame.__init__(self,parent) #decimal point must be . locale.setlocale( category=locale.LC_ALL, locale="US" # Note: do not use "de_DE" as it doesn't work ) self.alive = threading.Event() self.mBoxSizerParams = self.m_comboBoxSizer.GetContainingSizer() # self.pl = wxmplot.PlotPanel(self.m_panelData) # self.m_customControlData.GetContainingSizer().Add(self.pl, 1, wx.EXPAND, 5) ports = list(serial.tools.list_ports.comports()) for p in ports: print(p) # self.console = None #dev = usb.core.find(idVendor=0x0483, idProduct=0x5740) #dev = usb.core.find(idVendor=0x0483, idProduct=0x3748) #dev = usb.core.find(find_all=True, bDeviceClass=7) #print(dev) def onShow( self, event ): self.connect(event) event.Skip() def onClose( self, event ): try: self.cli.command(b'reset\n', None) self.cli.close() except: pass wx.CallAfter(self.Destroy) event.Skip() def onSerialChar(self, c): try: if c != '\r': self.console.m_textConsole.AppendText(c) except Exception as e: pass def resInfo(self, lines): self.m_staticTextNode.SetLabel("".join(lines)) def resFinfo(self, lines): self.m_staticTextFlash.SetLabel("".join(lines)) def resDateTime(self, lines): self.m_staticTextDatetime.SetLabel("Datetime: " + lines[0]) def connect( self, event ): ports = list(serial.tools.list_ports.comports()) for p in ports: if p.vid==0x483 and p.pid==0x5740: port = p.device try: port except NameError: resp = wx.MessageBox('Node is not connected', 'Agronode setup', wx.OK | wx.ICON_ERROR) # self.Destroy() # return try: self.cli = cli.CliThread(self,port, self.onSerialChar) self.cli.start() except serial.serialutil.SerialException: resp = wx.MessageBox('Can not open virtual port', 'Agronode setup', wx.OK | wx.ICON_ERROR) self.Destroy() return self.cli.command(b'stop\n', None) self.cli.command(b'log 1\n', None) self.cli.command(b'trace sdi12 0\n', None) self.cli.command(b'power off\n', None) self.cli.command(b'info\n', self.resInfo) self.cli.command(b'finfo\n', self.resFinfo) now = datetime.now() cmd = 'date '+ now.strftime("%Y-%m-%d %H:%M:%S") + '\n' self.cli.command(bytearray(cmd, 'utf-8'), self.resDateTime) #------------------------------------------------------------------- def onPageChange( self, event ): event.Skip() page_text = self.m_notebook.GetPageText(event.GetSelection()) if page_text == 'script': self.onScriptWindow(event) if page_text == 'data': self.onDataWindow(event) if page_text == 'sensor': self.onSensorWindow(event) if page_text == 'sdi12': self.onSDI12Window(event) if page_text == '1wire': self.on1wireWindow(event) if page_text == 'firmware': self.onFirmwareWindow(event) def resScriptWindow(self, lines): self.m_gaugeAct.SetValue(0) self.scriptUpdate(lines) def onScriptWindow( self, event ): if len(self.m_textCtrlScript.GetValue()) == 0: #self.onDownloadScript(None) self.m_gaugeAct.Pulse() self.cli.command(b'list\n', self.resScriptWindow) def onSensorWindow( self, event ): with open('sensors.json') as json_file: self.json_data = json.load(json_file) self.m_comboBoxSensor.Clear() for sensor in self.json_data['sensors']: self.m_comboBoxSensor.Append(sensor['name']) self.m_comboBoxSensor.SetSelection(0) self.onSensorChange(event) self.cli.command(b'power on\n', None) def onSDI12Window( self, event ): self.cli.command(b'power on\n', None) def on1wireWindow( self, event ): self.cli.command(b'power on\n', None) def onFirmwareWindow( self, event ): pass #f = firmware.Firmware(0x483, 0xdf11, 0, 0, 0) #f.erase() #dfuse.DfuFile('agronode.dfu')) #------------------------------------------------------------------- def dataUpdate(self, lines): self.m_listCtrlData.ClearAll(); self.m_listCtrlData.InsertColumn(0, "date_time", width = -1); self.m_listCtrlData.InsertColumn(1, "sensor_address", width = -1); self.m_listCtrlData.InsertColumn(2, "sensor_id", width = -1); self.m_listCtrlData.InsertColumn(3, "value", width = -1); xdata = [] ydata = [] for line in lines: line = line.replace("\r", "").replace("\n", "") items = line.split(';') # xdata.append(datetime.strptime(items[0], "%Y/%m/%d %H:%M:%S").timestamp()) # items[3] = items[3].replace('NAN', 'NaN') # ydata.append(float(items[3])) self.m_listCtrlData.Append(items) # m_gridData( # self.pl.plot(xdata, ydata, use_dates='True') self.m_listCtrlData.SetColumnWidth(0, width = wx.LIST_AUTOSIZE); self.m_listCtrlData.SetColumnWidth(1, width = wx.LIST_AUTOSIZE); self.m_listCtrlData.SetColumnWidth(2, width = wx.LIST_AUTOSIZE); self.m_listCtrlData.SetColumnWidth(3, width = wx.LIST_AUTOSIZE); def onSaveData( self, event ): with wx.FileDialog(self, "Save CSV file", wildcard="CSV files (*.csv)|*.csv", style=wx.FD_SAVE) as fileDialog: if fileDialog.ShowModal() == wx.ID_CANCEL: return # the user changed their mind # Proceed loading the file chosen by the user pathname = fileDialog.GetPath() try: with open(pathname, 'w') as file: file.write("time_stamp;sensor_adr;sensor_id;observed_value\n") count = self.m_listCtrlData.GetItemCount() cols = self.m_listCtrlData.GetColumnCount() for row in range(count): if ((self.m_listCtrlData.GetSelectedItemCount() == 0) or (self.m_listCtrlData.IsSelected(idx=row))): line = "" for col in range(cols): if (col > 0): line += ";" line += self.m_listCtrlData.GetItem(itemIdx=row, col=col).GetText() file.write(line + "\n") file.close() except IOError: wx.LogError("Cannot save file '%s'." % pathname) def resDataWindowGet(self, lines): self.m_gaugeAct.SetValue(0) self.dataUpdate(lines) def resDataWindow(self, lines): self.finfo = parse.parse('Ffs head {head}, tail {tail}, terminus {terminus}, unsent {unsent}', lines[2]) self.m_gaugeAct.Pulse() self.cli.command(b'fget ' + bytes(self.finfo['terminus'],'utf-8') + b' ' + bytes(str(int(self.finfo['head'])),'utf-8') + b'\n', self.resDataWindowGet) def onDataWindow( self, event ): self.m_listCtrlData.ClearAll(); self.m_listCtrlData.InsertColumn(0, "Downloading data", width = -1); self.cli.command(b'finfo\n', self.resDataWindow) #------------------------------------------------------------------- def onDFuse( self, event ): self.cli.command(b'bootld\n', None) event.Skip() #------------------------------------------------------------------- def onTextChar( self, event ): key_code = event.GetKeyCode() # Allow ASCII numerics if ord('0') <= key_code <= ord('9'): event.Skip() return if ord('A') <= key_code <= ord('Z'): event.Skip() return if ord('a') <= key_code <= ord('z'): event.Skip() return # Allow tabs, for tab navigation between TextCtrls if key_code < ord(' '): event.Skip() return if key_code == 127: event.Skip() return event.Skip() return def scriptUpdate(self, lines): self.mBoxSizerParams.Clear(True) #self.m_scrolledWindowParams.Layout() #self.m_panelScript.SetAutoLayout(1) self.lines = lines self.m_textCtrlScript.SetValue("".join(self.lines)) for line in self.lines: line = line.replace("\r", "").replace("\n", "") print(line) #items = line.split(' ') items = shlex.split(line, posix=False) if len(items) > 0: if len(items[0]) > 0: if items[0] == '@title': title = wx.StaticText(self.m_scrolledWindowParams, wx.ID_ANY, " ".join(items[1:]), style=wx.ALIGN_CENTRE_HORIZONTAL) self.mBoxSizerParams.Add(title, 0, wx.LEFT | wx.ALIGN_CENTER_HORIZONTAL, 5) elif items[0] == '@group': group = wx.StaticLine(self.m_scrolledWindowParams, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.LI_HORIZONTAL) self.mBoxSizerParams.Add(group, 0, wx.EXPAND |wx.ALL, 5 ) elif items[0][0] == '@': p = {} for i in range(len(items) // 2): p[items[i * 2]] = items[i * 2 + 1].strip('\"') label = wx.StaticText(self.m_scrolledWindowParams, wx.ID_ANY, items[0][1:] + ":") self.mBoxSizerParams.Add(label, 0, wx.LEFT|wx.TOP, 5) try: if p.get('gui') == None: param = wx.TextCtrl(self.m_scrolledWindowParams, wx.ID_ANY, p[items[0]], wx.DefaultPosition, wx.DefaultSize, wx.CB_READONLY) else: if p['gui'] == "combo": choices = p['choices'].split(',') param = wx.ComboBox(self.m_scrolledWindowParams, wx.ID_ANY, items[0], wx.DefaultPosition, wx.DefaultSize, choices, wx.CB_READONLY) param.SetValue(p[items[0]]) param.Bind(wx.EVT_TEXT, self.onComboChange) if p['gui'] == "spin": param = wx.SpinCtrl(self.m_scrolledWindowParams, wx.ID_ANY, items[0]) param.SetRange(int(p['min']), int(p['max'])) param.SetValue(int(p[items[0]])) param.Bind(wx.EVT_SPINCTRL, self.onSpinChange) #self.m_comboBoxVarSelect.Append(items[0][1:]) if p['gui'] == "spinfloat": param = wx.SpinCtrlDouble(self.m_scrolledWindowParams, wx.ID_ANY, items[0]) param.SetRange(float(p['min']), float(p['max'])) param.SetValue(float(p[items[0]])) param.SetDigits(3) param.SetIncrement(float(p['step'])) param.Bind(wx.EVT_SPINCTRLDOUBLE, self.onSpinFloatChange) if p['gui'] == "text": param = wx.TextCtrl(self.m_scrolledWindowParams, wx.ID_ANY, p[items[0]], wx.DefaultPosition, wx.DefaultSize) param.SetMaxLength(int(p['len'])) param.Bind(wx.EVT_TEXT, self.onTextChange) param.Bind(wx.EVT_CHAR, self.onTextChar) param.SetName(items[0]); self.mBoxSizerParams.Add(param, 0, wx.LEFT, 5) param.SetToolTip(p['tip']); except Exception as e: pass #print(e) #traceback.print_exc() #print () #self.m_scrolledWindowParams.Fit(self.mBoxSizerParams) #self.m_scrolledWindowParams.SetMinSize(wx.Size(200,1000)) #self.mBoxSizerParams.SetMinSize(wx.Size(200,1000)) #self.m_scrolledWindowParams.Layout() #self.m_scrolledWindowParams.Refresh() self.mBoxSizerParams.Layout() def onGuiChange(self, name, value): line_no = [i for i, s in enumerate(self.lines) if name in s][0] line = self.lines[line_no] linesplitted = line.split(' ') linesplitted[1] = value line = " ".join(linesplitted) self.lines[line_no] = line self.m_textCtrlScript.SetValue("".join(self.lines)) def onSpinChange(self, event): event.Skip() self.onGuiChange(event.GetEventObject().GetName(), str(event.GetEventObject().GetValue())) def onSpinFloatChange(self, event): event.Skip() self.onGuiChange(event.GetEventObject().GetName(), str(event.GetEventObject().GetValue())) def onComboChange(self, event): event.Skip() self.onGuiChange(event.GetEventObject().GetName(), str(event.GetEventObject().GetValue())) def onTextChange(self, event): event.Skip() self.onGuiChange(event.GetEventObject().GetName(), str(event.GetEventObject().GetValue())) def onLoadScript( self, event ): event.Skip() with wx.FileDialog(self, "Open BAS file", wildcard="BAS files (*.bas)|*.bas", style=wx.FD_OPEN | wx.FD_FILE_MUST_EXIST) as fileDialog: if fileDialog.ShowModal() == wx.ID_CANCEL: return # the user changed their mind # Proceed loading the file chosen by the user pathname = fileDialog.GetPath() try: with open(pathname, 'r') as file: lines = file.readlines() self.scriptUpdate(lines) file.close() except IOError: wx.LogError("Cannot open file '%s'." % pathname) def onUploadScript( self, event ): event.Skip() #self.cli.command(b'xmodem\n', None) self.cli.xmodem(self.lines) def onSaveScript( self, event ): event.Skip() with wx.FileDialog(self, "Save BAS file", wildcard="BAS files (*.bas)|*.bas", style=wx.FD_SAVE) as fileDialog: if fileDialog.ShowModal() == wx.ID_CANCEL: return # the user changed their mind # Proceed loading the file chosen by the user pathname = fileDialog.GetPath() try: with open(pathname, 'w') as file: file.writelines(self.lines) file.close() except IOError: wx.LogError("Cannot save file '%s'." % pathname) def onDownloadScript( self, event ): event.Skip() self.cli.command(b'list\n', self.resScriptWindow) def resRunScript( self, lines): self.console = console(self, self.cli) self.console.ShowModal() def onRunScript( self, event ): event.Skip() self.cli.command(b'run\n', self.resRunScript) #------------------------------------------------------------------- def resSDI12AddressQuery( self, lines): adr = lines[0].split(' ')[2].replace('\r','').replace('\n','') if adr == 'timeouted': wx.MessageBox('No sensor found', 'Agronode setup', wx.OK | wx.ICON_ERROR) else: self.m_textCtrlSDI12Adress.SetLabel(lines[0].split(' ')[2].replace('\r','').replace('\n','')) def onSDI12AddressQuery( self, event ): self.cli.command(b'sdi12 ?!\n', self.resSDI12AddressQuery) event.Skip() def onSDI12AddressChange( self, event ): self.cli.command(bytes('sdi12 ' + self.m_textCtrlSDI12Adress.Value + 'A' + self.m_textCtrlSDI12AdressChange.Value + '!\n', 'utf-8'), None) event.Skip() def resSDI12Identify( self, lines ): self.m_textCtrlSDI12Identification.SetValue(lines[0][7:]) def onSDI12Identify( self, event ): self.cli.command(bytes('sdi12 ' + self.m_textCtrlSDI12Adress.Value + 'I!\n','utf-8'), self.resSDI12Identify) event.Skip() def resSDI12Command( self, lines ): self.m_textCtrlSDI12CommandResult.SetValue(lines[0].split(':')[1][1:]) def onSDI12Command( self, event ): self.cli.command(bytes('sdi12 ' + self.m_comboBoxSDI12Command.GetValue() + '\n','utf-8'), self.resSDI12Command) self.m_comboBoxSDI12Command.Append(self.m_comboBoxSDI12Command.GetValue()) #todo: zabranit zdvojeni v combo boxu kdyz jsou stejny event.Skip() def onSDI12Char( self, event ): key_code = event.GetKeyCode() # Allow ASCII numerics if ord('0') <= key_code <= ord('9'): event.Skip() return if ord('A') <= key_code <= ord('Z'): event.Skip() return if ord('a') <= key_code <= ord('z'): event.Skip() return # Allow tabs, for tab navigation between TextCtrls if key_code < ord(' '): event.Skip() return if key_code == 127: event.Skip() return # Block everything else return #------------------------------------------------------------------- def res1wireSearch(self, lines): if lines[0][0:13] == 'No chip found': wx.MessageBox('No sensor found', 'Agronode setup', wx.OK | wx.ICON_ERROR) else: for line in lines: wx.CallAfter(self.m_listBox1wire.Append, line) def on1wireSearch( self, event ): self.m_listBox1wire.Clear() self.m_button1wireRemap.Enable(False) self.cli.command(b'owsearch\n', self.res1wireSearch) event.Skip() def on1wireRemap( self, event ): self.cli.command(bytes('owremap ' + self.m_listBox1wire.GetString(self.m_listBox1wire.GetSelection()).split(' ')[0] + ' ' + str(self.m_spinCtrl1wireAdr.GetValue()) + '\n','utf-8'), None) self.m_listBox1wire.Clear() self.m_button1wireRemap.Enable(False) self.cli.command(b'owsearch\n', self.res1wireSearch) event.Skip() def on1wireSelected( self, event ): self.m_spinCtrl1wireAdr.SetValue(self.m_listBox1wire.GetString(self.m_listBox1wire.GetSelection()).split(' ')[1].replace('\r', '').replace('\n','')) self.m_button1wireRemap.Enable(True) event.Skip() #------------------------------------------------------------------- def onSensorChange( self, event ): try: self.m_spinSensorAddress.SetValue(0) self.m_comboBoxType.Clear() for type in self.json_data['sensors'][self.m_comboBoxSensor.GetSelection()]['types']: self.m_comboBoxType.Append(type['desc']) self.m_comboBoxType.SetSelection(0) self.m_spinSensorAddress.SetValue(self.json_data['sensors'][self.m_comboBoxSensor.GetSelection()]['adr']) except Exception as e: #todo: spravnou excepsnu pass event.Skip() def resGet( self, lines ): self.m_textCtrlSensorValue.SetValue(lines[0].split(':')[1][1:] + ' ' + self.json_data['sensors'][self.m_comboBoxSensor.GetSelection()]['types'][self.m_comboBoxType.GetSelection()]['unit']) def onGet( self, event ): self.cli.command(b'invalidate\n', None) # self.nodeSerial.write(b'invalidate\n') self.cli.command(bytes('sget ' # self.nodeSerial.write(bytes('sget ' + self.json_data['sensors'][self.m_comboBoxSensor.GetSelection()]['lib'] + ' ' + str(self.m_spinSensorAddress.GetValue()) + ' ' + str(self.json_data['sensors'][self.m_comboBoxSensor.GetSelection()]['types'][self.m_comboBoxType.GetSelection()]['type']) + '\n' , 'utf-8'), self.resGet) # self.m_staticTextValue.SetLabel('') # self.m_staticTextUnit.SetLabel('') # self.m_staticTextUnit.SetLabel(self.json_data['sensors'][self.m_comboBoxSensor.GetSelection()]['types'][self.m_comboBoxType.GetSelection()]['unit']) self.m_textCtrlSensorValue.SetValue(''); event.Skip() #------------------------------------------------------------------- if __name__ == '__main__': # When this module is run (not imported) then create the app, the # frame, show it, and start the event loop. app = wx.App() # setts = LazySettings('settings.cfg') frm = frame(None) frm.Show() app.MainLoop()