| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- #!/usr/bin/env python3
- import threading
- import serial
- import queue
- from xmodem import XMODEM
- import io
- from bleak import BleakClient
- import platform
- import asyncio
- import wx
- mac_addr = (
- "80:1F:12:B7:86:93"
- if platform.system() != "Darwin"
- else "B9EA5233-37EF-4DD6-87A8-2A875E821C46"
- )
- def notification_handler(sender, data):
- print(', '.join('{:02x}'.format(x) for x in data))
- async def connect(mac_addr: str):
- CHARACTERISTIC_UUID = "49535343-1E4D-4BD9-BA61-23C647249616"
- async with BleakClient(mac_addr) as client:
- x = await client.is_connected()
- print( "Connected: {0}".format(x))
- print(client)
- await client.write_gatt_char(CHARACTERISTIC_UUID, b'\n')
- await client.start_notify(CHARACTERISTIC_UUID, notification_handler)
- return client
- async def write(client, cmd):
- CHARACTERISTIC_UUID = "49535343-1E4D-4BD9-BA61-23C647249616"
- print('cmd ' + str(cmd))
- print(client)
- await client.write_gatt_char(CHARACTERISTIC_UUID, cmd)
- class CliThread(threading.Thread):
- def __init__(self, parent):
- threading.Thread.__init__(self)
- self.alive = threading.Event()
- self.q = queue.Queue(100)
- # self.nodeSerial = serial.Serial(port=port, baudrate=9600)
- print('bleaaak')
- self.client = asyncio.run(connect(mac_addr))
- asyncio.run(write(self.client, b'\n'))
- self.alive.set()
- def close(self):
- self.alive.clear()
- self.nodeSerial.close()
- def command(self, cmd, response):
- asyncio.run(write(self.client, cmd))
- self.q.put(response)
- #self.nodeSerial.write(cmd)
- def getc(self, size, timeout=1):
- return self.nodeSerial.read(size) or None
- def putc(self,data, timeout=1):
- return self.nodeSerial.write(data) # note that this ignores the timeout
- def xmodem(self, lines):
- self.xmodemlines = lines
- self.q.put(None)
- self.nodeSerial.write(b'xmodem\n')
- def run(self):
- """\
- Thread that handles the incoming traffic. Does the basic input
- transformation (newlines) and generates an SerialRxEvent
- """
- b = ''
- cmd = []
- lines = ''
- while self.alive.isSet():
- #for c in self.nodeSerial.read():
- #with await client.read_gatt_char(CHARACTERISTIC_UUID) as c:
- c = chr(c)
- print(c, end='')
- b += c
- if b[0:3] == 'ch>':
- if (len(cmd) > 0):
- callback = self.q.get(block=False)
- if callback != None:
- wx.CallAfter(callback, lines)
- lines = []
- if cmd[0] == 'list':
- # wx.CallAfter(self.scriptUpdate, lines)
- lines = []
- if cmd[0] == 'fget':
- #wx.CallAfter(self.dataUpdate, lines)
- lines = []
- if cmd[0] == 'info':
- #wx.CallAfter(self.parent.m_staticTextNode.SetLabel, "".join(lines))
- lines = []
- # if cmd[0] == 'sget':
- # wx.CallAfter(self.m_staticTextBattery.SetLabel, "Battery: " + lines[0][6:-2] + " mV")
- # lines = []
- if cmd[0] == 'finfo':
- # wx.CallAfter(self.m_staticTextFlash.SetLabel, "".join(lines))
- lines = []
- if cmd[0] == 'date':
- # wx.CallAfter(self.m_staticTextDatetime.SetLabel, "Datetime: " + lines[0])
- lines = []
- if cmd[0] == 'sdi12':
- # if cmd[1] == '?!':
- # wx.CallAfter(self.m_textCtrlSDI12Adress.SetLabel, lines[0].split(' ')[2].replace('\r','').replace('\n',''))
- # if cmd[1][1:] == 'I!':
- # wx.CallAfter(self.m_textCtrlSDI12Identification.SetValue, lines[0][7:])
- # wx.CallAfter(self.m_textCtrlSDI12CommandResult.SetValue, lines[0].split(':')[1][1:])
- lines = []
- if cmd[0] == 'sget':
- #wx.CallAfter(self.m_staticTextValue.SetLabel, lines[0].split(':')[1][1:])
- wx.CallAfter(self.m_textCtrlSensorValue.SetValue,
- lines[0].split(':')[1][1:] + ' ' +
- self.json_data['sensors'][self.m_comboBoxSensor.GetSelection()]['types'][self.m_comboBoxType.GetSelection()]['unit'])
- lines = []
- if cmd[0] == 'owsearch':
- for line in lines:
- #wx.CallAfter(self.m_listBox1wire.Append, line.split(' ')[0])
- wx.CallAfter(self.m_listBox1wire.Append, line)
- # wx.CallAfter(self.m_listCtrl1wire.InsertItem, 0, line.split(' ')[1].replace('\r', '').replace('\n',''))
- lines = []
- cmd = []
- if c == '\n':
- if len(cmd) > 0:
- lines.append(b)
- if b[0:3] == 'ch>':
- b = b.replace('\r', '').replace('\n','')
- cmd = b.split(' ')[1:]
- lines = []
- if len(cmd) > 0:
- if cmd[0] == 'xmodem':
- modem = XMODEM(self.getc, self.putc)
- stream = io.BytesIO(bytes(''.join(self.xmodemlines), 'utf-8'))
- #stream = io.BytesIO(b"hhh")
- #stream = io.StringIO("gfbgjfg")
- #stream = open('script_test.bas', 'rb')
- modem.send(stream)
- b = ''
- #b = self.nodeSerial.read(self.nodeSerial.in_waiting or 1)
- #if b:
- # newline transformation
- #if self.settings.newline == NEWLINE_CR:
- # b = b.replace(b'\r', b'\n')
- #elif self.settings.newline == NEWLINE_LF:
- # pass
- #elif self.settings.newline == NEWLINE_CRLF:
- # b = b.replace(b'\r\n', b'\n')
- #event = SerialRxEvent(self.GetId(), b)
- #self.GetEventHandler().AddPendingEvent(event)
- # self.logComm.write(b);
|