| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569 |
- from __future__ import print_function
- import wx
- import gui
- import aioserial
- from xmodem import XMODEM
- import threading
- import io
- #import wxmplot
- from datetime import datetime
- import locale
- import json
- import serial.tools.list_ports
- import shlex
- import parse
- import os
- import dfudfuse
- import firmware
- import cli
- import cli_bt
- import traceback
- import asyncio
- class console( gui.consoleDialog ):
- def __init__( self, parent, cli):
- #initialize parent class
- gui.consoleDialog.__init__(self,parent)
- self.cli = cli
- def onConsoleClose( self, event ):
- self.cli.command(b'stop\n', None)
- event.Skip()
- class frame( gui.mainFrame ):
- #constructor
- def __init__( self, parent ):
- #initialize parent class
- gui.mainFrame.__init__(self,parent)
- #decimal point must be .
- locale.setlocale(
- category=locale.LC_ALL,
- locale="US" # Note: do not use "de_DE" as it doesn't work
- )
-
- self.alive = threading.Event()
-
- self.mBoxSizerParams = self.m_comboBoxSizer.GetContainingSizer()
-
- # self.pl = wxmplot.PlotPanel(self.m_panelData)
-
- # self.m_customControlData.GetContainingSizer().Add(self.pl, 1, wx.EXPAND, 5)
-
-
-
- ports = list(serial.tools.list_ports.comports())
- for p in ports:
- print(p)
- # self.console = None
- #dev = usb.core.find(idVendor=0x0483, idProduct=0x5740)
- #dev = usb.core.find(idVendor=0x0483, idProduct=0x3748)
- #dev = usb.core.find(find_all=True, bDeviceClass=7)
- #print(dev)
- def onShow( self, event ):
- self.connect(event)
- event.Skip()
- def onClose( self, event ):
- try:
- self.cli.command(b'reset\n', None)
- self.cli.close()
- except:
- pass
- wx.CallAfter(self.Destroy)
- event.Skip()
-
- def onSerialChar(self, c):
- try:
- if c != '\r':
- self.console.m_textConsole.AppendText(c)
- except Exception as e:
- pass
- def resInfo(self, lines):
- self.m_staticTextNode.SetLabel("".join(lines))
- def resFinfo(self, lines):
- self.m_staticTextFlash.SetLabel("".join(lines))
- def resDateTime(self, lines):
- self.m_staticTextDatetime.SetLabel("Datetime: " + lines[0])
- def connect( self, event ):
-
- ports = list(serial.tools.list_ports.comports())
- for p in ports:
- if p.vid==0x483 and p.pid==0x5740:
- port = p.device
-
- try:
- port
- except NameError:
- resp = wx.MessageBox('Node is not connected', 'Agronode setup',
- wx.OK | wx.ICON_ERROR)
- # self.Destroy()
- # return
-
- try:
- self.cli = cli.CliThread(self,port, self.onSerialChar)
- self.cli.start()
- except serial.serialutil.SerialException:
- resp = wx.MessageBox('Can not open virtual port', 'Agronode setup',
- wx.OK | wx.ICON_ERROR)
- self.Destroy()
- return
- self.cli.command(b'stop\n', None)
- self.cli.command(b'log 1\n', None)
- self.cli.command(b'trace sdi12 0\n', None)
- self.cli.command(b'power off\n', None)
- self.cli.command(b'info\n', self.resInfo)
- self.cli.command(b'finfo\n', self.resFinfo)
- now = datetime.now()
- cmd = 'date '+ now.strftime("%Y-%m-%d %H:%M:%S") + '\n'
- self.cli.command(bytearray(cmd, 'utf-8'), self.resDateTime)
- #-------------------------------------------------------------------
- def onPageChange( self, event ):
- event.Skip()
- page_text = self.m_notebook.GetPageText(event.GetSelection())
-
- if page_text == 'script':
- self.onScriptWindow(event)
- if page_text == 'data':
- self.onDataWindow(event)
- if page_text == 'sensor':
- self.onSensorWindow(event)
- if page_text == 'sdi12':
- self.onSDI12Window(event)
- if page_text == '1wire':
- self.on1wireWindow(event)
- if page_text == 'firmware':
- self.onFirmwareWindow(event)
- def resScriptWindow(self, lines):
- self.m_gaugeAct.SetValue(0)
- self.scriptUpdate(lines)
- def onScriptWindow( self, event ):
- if len(self.m_textCtrlScript.GetValue()) == 0:
- #self.onDownloadScript(None)
- self.m_gaugeAct.Pulse()
- self.cli.command(b'list\n', self.resScriptWindow)
- def onSensorWindow( self, event ):
- with open('sensors.json') as json_file:
- self.json_data = json.load(json_file)
-
- self.m_comboBoxSensor.Clear()
- for sensor in self.json_data['sensors']:
- self.m_comboBoxSensor.Append(sensor['name'])
-
- self.m_comboBoxSensor.SetSelection(0)
- self.onSensorChange(event)
- self.cli.command(b'power on\n', None)
-
- def onSDI12Window( self, event ):
- self.cli.command(b'power on\n', None)
- def on1wireWindow( self, event ):
- self.cli.command(b'power on\n', None)
-
- def onFirmwareWindow( self, event ):
- pass
- #f = firmware.Firmware(0x483, 0xdf11, 0, 0, 0)
- #f.erase()
- #dfuse.DfuFile('agronode.dfu'))
- #-------------------------------------------------------------------
- def dataUpdate(self, lines):
- self.m_listCtrlData.ClearAll();
- self.m_listCtrlData.InsertColumn(0, "date_time", width = -1);
- self.m_listCtrlData.InsertColumn(1, "sensor_address", width = -1);
- self.m_listCtrlData.InsertColumn(2, "sensor_id", width = -1);
- self.m_listCtrlData.InsertColumn(3, "value", width = -1);
- xdata = []
- ydata = []
- for line in lines:
- line = line.replace("\r", "").replace("\n", "")
- items = line.split(';')
- # xdata.append(datetime.strptime(items[0], "%Y/%m/%d %H:%M:%S").timestamp())
- # items[3] = items[3].replace('NAN', 'NaN')
- # ydata.append(float(items[3]))
- self.m_listCtrlData.Append(items)
- # m_gridData(
- # self.pl.plot(xdata, ydata, use_dates='True')
- self.m_listCtrlData.SetColumnWidth(0, width = wx.LIST_AUTOSIZE);
- self.m_listCtrlData.SetColumnWidth(1, width = wx.LIST_AUTOSIZE);
- self.m_listCtrlData.SetColumnWidth(2, width = wx.LIST_AUTOSIZE);
- self.m_listCtrlData.SetColumnWidth(3, width = wx.LIST_AUTOSIZE);
- def onSaveData( self, event ):
- with wx.FileDialog(self, "Save CSV file", wildcard="CSV files (*.csv)|*.csv",
- style=wx.FD_SAVE) as fileDialog:
- if fileDialog.ShowModal() == wx.ID_CANCEL:
- return # the user changed their mind
- # Proceed loading the file chosen by the user
- pathname = fileDialog.GetPath()
- try:
- with open(pathname, 'w') as file:
- file.write("time_stamp;sensor_adr;sensor_id;observed_value\n")
- count = self.m_listCtrlData.GetItemCount()
- cols = self.m_listCtrlData.GetColumnCount()
- for row in range(count):
- if ((self.m_listCtrlData.GetSelectedItemCount() == 0) or (self.m_listCtrlData.IsSelected(idx=row))):
- line = ""
- for col in range(cols):
- if (col > 0):
- line += ";"
- line += self.m_listCtrlData.GetItem(itemIdx=row, col=col).GetText()
- file.write(line + "\n")
- file.close()
- except IOError:
- wx.LogError("Cannot save file '%s'." % pathname)
- def resDataWindowGet(self, lines):
- self.m_gaugeAct.SetValue(0)
- self.dataUpdate(lines)
- def resDataWindow(self, lines):
- self.finfo = parse.parse('Ffs head {head}, tail {tail}, terminus {terminus}, unsent {unsent}', lines[2])
- self.m_gaugeAct.Pulse()
- self.cli.command(b'fget ' + bytes(self.finfo['terminus'],'utf-8') + b' ' + bytes(str(int(self.finfo['head'])),'utf-8') + b'\n', self.resDataWindowGet)
- def onDataWindow( self, event ):
- self.m_listCtrlData.ClearAll();
- self.m_listCtrlData.InsertColumn(0, "Downloading data", width = -1);
- self.cli.command(b'finfo\n', self.resDataWindow)
- #-------------------------------------------------------------------
- def onDFuse( self, event ):
- self.cli.command(b'bootld\n', None)
- event.Skip()
- #-------------------------------------------------------------------
- def onTextChar( self, event ):
- key_code = event.GetKeyCode()
- # Allow ASCII numerics
- if ord('0') <= key_code <= ord('9'):
- event.Skip()
- return
- if ord('A') <= key_code <= ord('Z'):
- event.Skip()
- return
- if ord('a') <= key_code <= ord('z'):
- event.Skip()
- return
- # Allow tabs, for tab navigation between TextCtrls
- if key_code < ord(' '):
- event.Skip()
- return
- if key_code == 127:
- event.Skip()
- return
- event.Skip()
- return
- def scriptUpdate(self, lines):
- self.mBoxSizerParams.Clear(True)
- #self.m_scrolledWindowParams.Layout()
- #self.m_panelScript.SetAutoLayout(1)
- self.lines = lines
- self.m_textCtrlScript.SetValue("".join(self.lines))
- for line in self.lines:
- line = line.replace("\r", "").replace("\n", "")
- print(line)
- #items = line.split(' ')
- items = shlex.split(line, posix=False)
- if len(items) > 0:
- if len(items[0]) > 0:
- if items[0] == '@title':
- title = wx.StaticText(self.m_scrolledWindowParams, wx.ID_ANY, " ".join(items[1:]), style=wx.ALIGN_CENTRE_HORIZONTAL)
- self.mBoxSizerParams.Add(title, 0, wx.LEFT | wx.ALIGN_CENTER_HORIZONTAL, 5)
- elif items[0] == '@group':
- group = wx.StaticLine(self.m_scrolledWindowParams, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.LI_HORIZONTAL)
- self.mBoxSizerParams.Add(group, 0, wx.EXPAND |wx.ALL, 5 )
- elif items[0][0] == '@':
- p = {}
- for i in range(len(items) // 2):
- p[items[i * 2]] = items[i * 2 + 1].strip('\"')
- label = wx.StaticText(self.m_scrolledWindowParams, wx.ID_ANY, items[0][1:] + ":")
- self.mBoxSizerParams.Add(label, 0, wx.LEFT|wx.TOP, 5)
- try:
- if p.get('gui') == None:
- param = wx.TextCtrl(self.m_scrolledWindowParams, wx.ID_ANY, p[items[0]], wx.DefaultPosition, wx.DefaultSize, wx.CB_READONLY)
- else:
- if p['gui'] == "combo":
- choices = p['choices'].split(',')
- param = wx.ComboBox(self.m_scrolledWindowParams, wx.ID_ANY, items[0], wx.DefaultPosition, wx.DefaultSize, choices, wx.CB_READONLY)
- param.SetValue(p[items[0]])
- param.Bind(wx.EVT_TEXT, self.onComboChange)
- if p['gui'] == "spin":
- param = wx.SpinCtrl(self.m_scrolledWindowParams, wx.ID_ANY, items[0])
- param.SetRange(int(p['min']), int(p['max']))
- param.SetValue(int(p[items[0]]))
- param.Bind(wx.EVT_SPINCTRL, self.onSpinChange)
- #self.m_comboBoxVarSelect.Append(items[0][1:])
- if p['gui'] == "spinfloat":
- param = wx.SpinCtrlDouble(self.m_scrolledWindowParams, wx.ID_ANY, items[0])
- param.SetRange(float(p['min']), float(p['max']))
- param.SetValue(float(p[items[0]]))
- param.SetDigits(3)
- param.SetIncrement(float(p['step']))
- param.Bind(wx.EVT_SPINCTRLDOUBLE, self.onSpinFloatChange)
- if p['gui'] == "text":
- param = wx.TextCtrl(self.m_scrolledWindowParams, wx.ID_ANY, p[items[0]], wx.DefaultPosition, wx.DefaultSize)
- param.SetMaxLength(int(p['len']))
- param.Bind(wx.EVT_TEXT, self.onTextChange)
- param.Bind(wx.EVT_CHAR, self.onTextChar)
- param.SetName(items[0]);
- self.mBoxSizerParams.Add(param, 0, wx.LEFT, 5)
- param.SetToolTip(p['tip']);
- except Exception as e:
- pass
- #print(e)
- #traceback.print_exc()
- #print ()
- #self.m_scrolledWindowParams.Fit(self.mBoxSizerParams)
- #self.m_scrolledWindowParams.SetMinSize(wx.Size(200,1000))
- #self.mBoxSizerParams.SetMinSize(wx.Size(200,1000))
- #self.m_scrolledWindowParams.Layout()
- #self.m_scrolledWindowParams.Refresh()
- self.mBoxSizerParams.Layout()
- def onGuiChange(self, name, value):
- line_no = [i for i, s in enumerate(self.lines) if name in s][0]
- line = self.lines[line_no]
- linesplitted = line.split(' ')
- linesplitted[1] = value
- line = " ".join(linesplitted)
- self.lines[line_no] = line
- self.m_textCtrlScript.SetValue("".join(self.lines))
-
- def onSpinChange(self, event):
- event.Skip()
- self.onGuiChange(event.GetEventObject().GetName(), str(event.GetEventObject().GetValue()))
-
- def onSpinFloatChange(self, event):
- event.Skip()
- self.onGuiChange(event.GetEventObject().GetName(), str(event.GetEventObject().GetValue()))
-
- def onComboChange(self, event):
- event.Skip()
- self.onGuiChange(event.GetEventObject().GetName(), str(event.GetEventObject().GetValue()))
-
- def onTextChange(self, event):
- event.Skip()
- self.onGuiChange(event.GetEventObject().GetName(), str(event.GetEventObject().GetValue()))
- def onLoadScript( self, event ):
- event.Skip()
-
- with wx.FileDialog(self, "Open BAS file", wildcard="BAS files (*.bas)|*.bas",
- style=wx.FD_OPEN | wx.FD_FILE_MUST_EXIST) as fileDialog:
- if fileDialog.ShowModal() == wx.ID_CANCEL:
- return # the user changed their mind
- # Proceed loading the file chosen by the user
- pathname = fileDialog.GetPath()
- try:
- with open(pathname, 'r') as file:
- lines = file.readlines()
- self.scriptUpdate(lines)
- file.close()
- except IOError:
- wx.LogError("Cannot open file '%s'." % pathname)
-
- def onUploadScript( self, event ):
- event.Skip()
- #self.cli.command(b'xmodem\n', None)
- self.cli.xmodem(self.lines)
- def onSaveScript( self, event ):
- event.Skip()
- with wx.FileDialog(self, "Save BAS file", wildcard="BAS files (*.bas)|*.bas",
- style=wx.FD_SAVE) as fileDialog:
- if fileDialog.ShowModal() == wx.ID_CANCEL:
- return # the user changed their mind
- # Proceed loading the file chosen by the user
- pathname = fileDialog.GetPath()
- try:
- with open(pathname, 'w') as file:
- file.writelines(self.lines)
- file.close()
- except IOError:
- wx.LogError("Cannot save file '%s'." % pathname)
- def onDownloadScript( self, event ):
- event.Skip()
- self.cli.command(b'list\n', self.resScriptWindow)
- def resRunScript( self, lines):
- self.console = console(self, self.cli)
- self.console.ShowModal()
- def onRunScript( self, event ):
- event.Skip()
- self.cli.command(b'run\n', self.resRunScript)
- #-------------------------------------------------------------------
- def resSDI12AddressQuery( self, lines):
- adr = lines[0].split(' ')[2].replace('\r','').replace('\n','')
- if adr == 'timeouted':
- wx.MessageBox('No sensor found', 'Agronode setup', wx.OK | wx.ICON_ERROR)
- else:
- self.m_textCtrlSDI12Adress.SetLabel(lines[0].split(' ')[2].replace('\r','').replace('\n',''))
- def onSDI12AddressQuery( self, event ):
- self.cli.command(b'sdi12 ?!\n', self.resSDI12AddressQuery)
- event.Skip()
- def onSDI12AddressChange( self, event ):
- self.cli.command(bytes('sdi12 ' + self.m_textCtrlSDI12Adress.Value + 'A' + self.m_textCtrlSDI12AdressChange.Value + '!\n', 'utf-8'), None)
- event.Skip()
- def resSDI12Identify( self, lines ):
- self.m_textCtrlSDI12Identification.SetValue(lines[0][7:])
- def onSDI12Identify( self, event ):
- self.cli.command(bytes('sdi12 ' + self.m_textCtrlSDI12Adress.Value + 'I!\n','utf-8'), self.resSDI12Identify)
- event.Skip()
- def resSDI12Command( self, lines ):
- self.m_textCtrlSDI12CommandResult.SetValue(lines[0].split(':')[1][1:])
- def onSDI12Command( self, event ):
- self.cli.command(bytes('sdi12 ' + self.m_comboBoxSDI12Command.GetValue() + '\n','utf-8'), self.resSDI12Command)
- self.m_comboBoxSDI12Command.Append(self.m_comboBoxSDI12Command.GetValue())
- #todo: zabranit zdvojeni v combo boxu kdyz jsou stejny
- event.Skip()
- def onSDI12Char( self, event ):
- key_code = event.GetKeyCode()
- # Allow ASCII numerics
- if ord('0') <= key_code <= ord('9'):
- event.Skip()
- return
- if ord('A') <= key_code <= ord('Z'):
- event.Skip()
- return
- if ord('a') <= key_code <= ord('z'):
- event.Skip()
- return
- # Allow tabs, for tab navigation between TextCtrls
- if key_code < ord(' '):
- event.Skip()
- return
- if key_code == 127:
- event.Skip()
- return
- # Block everything else
- return
- #-------------------------------------------------------------------
- def res1wireSearch(self, lines):
- if lines[0][0:13] == 'No chip found':
- wx.MessageBox('No sensor found', 'Agronode setup', wx.OK | wx.ICON_ERROR)
- else:
- for line in lines:
- wx.CallAfter(self.m_listBox1wire.Append, line)
- def on1wireSearch( self, event ):
- self.m_listBox1wire.Clear()
- self.m_button1wireRemap.Enable(False)
- self.cli.command(b'owsearch\n', self.res1wireSearch)
- event.Skip()
- def on1wireRemap( self, event ):
- self.cli.command(bytes('owremap ' +
- self.m_listBox1wire.GetString(self.m_listBox1wire.GetSelection()).split(' ')[0] +
- ' ' +
- str(self.m_spinCtrl1wireAdr.GetValue()) +
- '\n','utf-8'), None)
- self.m_listBox1wire.Clear()
- self.m_button1wireRemap.Enable(False)
- self.cli.command(b'owsearch\n', self.res1wireSearch)
- event.Skip()
- def on1wireSelected( self, event ):
- self.m_spinCtrl1wireAdr.SetValue(self.m_listBox1wire.GetString(self.m_listBox1wire.GetSelection()).split(' ')[1].replace('\r', '').replace('\n',''))
- self.m_button1wireRemap.Enable(True)
- event.Skip()
- #-------------------------------------------------------------------
- def onSensorChange( self, event ):
- try:
- self.m_spinSensorAddress.SetValue(0)
- self.m_comboBoxType.Clear()
- for type in self.json_data['sensors'][self.m_comboBoxSensor.GetSelection()]['types']:
- self.m_comboBoxType.Append(type['desc'])
- self.m_comboBoxType.SetSelection(0)
- self.m_spinSensorAddress.SetValue(self.json_data['sensors'][self.m_comboBoxSensor.GetSelection()]['adr'])
- except Exception as e: #todo: spravnou excepsnu
- pass
- event.Skip()
-
- def resGet( self, lines ):
- self.m_textCtrlSensorValue.SetValue(lines[0].split(':')[1][1:] + ' ' + self.json_data['sensors'][self.m_comboBoxSensor.GetSelection()]['types'][self.m_comboBoxType.GetSelection()]['unit'])
- def onGet( self, event ):
- self.cli.command(b'invalidate\n', None)
- # self.nodeSerial.write(b'invalidate\n')
- self.cli.command(bytes('sget '
- # self.nodeSerial.write(bytes('sget '
- + self.json_data['sensors'][self.m_comboBoxSensor.GetSelection()]['lib']
- + ' '
- + str(self.m_spinSensorAddress.GetValue())
- + ' '
- + str(self.json_data['sensors'][self.m_comboBoxSensor.GetSelection()]['types'][self.m_comboBoxType.GetSelection()]['type'])
- + '\n'
- , 'utf-8'), self.resGet)
- # self.m_staticTextValue.SetLabel('')
- # self.m_staticTextUnit.SetLabel('')
- # self.m_staticTextUnit.SetLabel(self.json_data['sensors'][self.m_comboBoxSensor.GetSelection()]['types'][self.m_comboBoxType.GetSelection()]['unit'])
- self.m_textCtrlSensorValue.SetValue('');
-
- event.Skip()
- #-------------------------------------------------------------------
-
- if __name__ == '__main__':
- # When this module is run (not imported) then create the app, the
- # frame, show it, and start the event loop.
- app = wx.App()
-
- # setts = LazySettings('settings.cfg')
-
- frm = frame(None)
- frm.Show()
- app.MainLoop()
-
|