#!/usr/bin/env python3 import threading import serial import queue from xmodem import XMODEM import io from bleak import BleakClient import platform import asyncio import wx mac_addr = ( "80:1F:12:B7:86:93" if platform.system() != "Darwin" else "B9EA5233-37EF-4DD6-87A8-2A875E821C46" ) def notification_handler(sender, data): print(', '.join('{:02x}'.format(x) for x in data)) async def connect(mac_addr: str): CHARACTERISTIC_UUID = "49535343-1E4D-4BD9-BA61-23C647249616" async with BleakClient(mac_addr) as client: x = await client.is_connected() print( "Connected: {0}".format(x)) print(client) await client.write_gatt_char(CHARACTERISTIC_UUID, b'\n') await client.start_notify(CHARACTERISTIC_UUID, notification_handler) return client async def write(client, cmd): CHARACTERISTIC_UUID = "49535343-1E4D-4BD9-BA61-23C647249616" print('cmd ' + str(cmd)) print(client) await client.write_gatt_char(CHARACTERISTIC_UUID, cmd) class CliThread(threading.Thread): def __init__(self, parent): threading.Thread.__init__(self) self.alive = threading.Event() self.q = queue.Queue(100) # self.nodeSerial = serial.Serial(port=port, baudrate=9600) print('bleaaak') self.client = asyncio.run(connect(mac_addr)) asyncio.run(write(self.client, b'\n')) self.alive.set() def close(self): self.alive.clear() self.nodeSerial.close() def command(self, cmd, response): asyncio.run(write(self.client, cmd)) self.q.put(response) #self.nodeSerial.write(cmd) def getc(self, size, timeout=1): return self.nodeSerial.read(size) or None def putc(self,data, timeout=1): return self.nodeSerial.write(data) # note that this ignores the timeout def xmodem(self, lines): self.xmodemlines = lines self.q.put(None) self.nodeSerial.write(b'xmodem\n') def run(self): """\ Thread that handles the incoming traffic. Does the basic input transformation (newlines) and generates an SerialRxEvent """ b = '' cmd = [] lines = '' while self.alive.isSet(): #for c in self.nodeSerial.read(): #with await client.read_gatt_char(CHARACTERISTIC_UUID) as c: c = chr(c) print(c, end='') b += c if b[0:3] == 'ch>': if (len(cmd) > 0): callback = self.q.get(block=False) if callback != None: wx.CallAfter(callback, lines) lines = [] if cmd[0] == 'list': # wx.CallAfter(self.scriptUpdate, lines) lines = [] if cmd[0] == 'fget': #wx.CallAfter(self.dataUpdate, lines) lines = [] if cmd[0] == 'info': #wx.CallAfter(self.parent.m_staticTextNode.SetLabel, "".join(lines)) lines = [] # if cmd[0] == 'sget': # wx.CallAfter(self.m_staticTextBattery.SetLabel, "Battery: " + lines[0][6:-2] + " mV") # lines = [] if cmd[0] == 'finfo': # wx.CallAfter(self.m_staticTextFlash.SetLabel, "".join(lines)) lines = [] if cmd[0] == 'date': # wx.CallAfter(self.m_staticTextDatetime.SetLabel, "Datetime: " + lines[0]) lines = [] if cmd[0] == 'sdi12': # if cmd[1] == '?!': # wx.CallAfter(self.m_textCtrlSDI12Adress.SetLabel, lines[0].split(' ')[2].replace('\r','').replace('\n','')) # if cmd[1][1:] == 'I!': # wx.CallAfter(self.m_textCtrlSDI12Identification.SetValue, lines[0][7:]) # wx.CallAfter(self.m_textCtrlSDI12CommandResult.SetValue, lines[0].split(':')[1][1:]) lines = [] if cmd[0] == 'sget': #wx.CallAfter(self.m_staticTextValue.SetLabel, lines[0].split(':')[1][1:]) wx.CallAfter(self.m_textCtrlSensorValue.SetValue, lines[0].split(':')[1][1:] + ' ' + self.json_data['sensors'][self.m_comboBoxSensor.GetSelection()]['types'][self.m_comboBoxType.GetSelection()]['unit']) lines = [] if cmd[0] == 'owsearch': for line in lines: #wx.CallAfter(self.m_listBox1wire.Append, line.split(' ')[0]) wx.CallAfter(self.m_listBox1wire.Append, line) # wx.CallAfter(self.m_listCtrl1wire.InsertItem, 0, line.split(' ')[1].replace('\r', '').replace('\n','')) lines = [] cmd = [] if c == '\n': if len(cmd) > 0: lines.append(b) if b[0:3] == 'ch>': b = b.replace('\r', '').replace('\n','') cmd = b.split(' ')[1:] lines = [] if len(cmd) > 0: if cmd[0] == 'xmodem': modem = XMODEM(self.getc, self.putc) stream = io.BytesIO(bytes(''.join(self.xmodemlines), 'utf-8')) #stream = io.BytesIO(b"hhh") #stream = io.StringIO("gfbgjfg") #stream = open('script_test.bas', 'rb') modem.send(stream) b = '' #b = self.nodeSerial.read(self.nodeSerial.in_waiting or 1) #if b: # newline transformation #if self.settings.newline == NEWLINE_CR: # b = b.replace(b'\r', b'\n') #elif self.settings.newline == NEWLINE_LF: # pass #elif self.settings.newline == NEWLINE_CRLF: # b = b.replace(b'\r\n', b'\n') #event = SerialRxEvent(self.GetId(), b) #self.GetEventHandler().AddPendingEvent(event) # self.logComm.write(b);