cli.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. #!/usr/bin/env python3
  2. import threading
  3. import serial
  4. import queue
  5. from xmodem import XMODEM
  6. import io
  7. import wx
  8. class CliThread(threading.Thread):
  9. def __init__(self, parent, port, callback):
  10. threading.Thread.__init__(self)
  11. self.alive = threading.Event()
  12. self.q = queue.Queue(100)
  13. self.nodeSerial = serial.Serial(port=port, baudrate=9600)
  14. self.nodeSerial.write(b'\n')
  15. self.alive.set()
  16. self.char_callback = callback
  17. def close(self):
  18. self.alive.clear()
  19. self.nodeSerial.close()
  20. def command(self, cmd, response):
  21. self.q.put(response)
  22. self.nodeSerial.write(cmd)
  23. def getc(self, size, timeout=1):
  24. return self.nodeSerial.read(size) or None
  25. def putc(self,data, timeout=1):
  26. return self.nodeSerial.write(data) # note that this ignores the timeout
  27. def xmodem(self, lines):
  28. self.xmodemlines = []
  29. for line in lines:
  30. line = line.replace("\n", "").replace("\r", "")
  31. self.xmodemlines.append(line + "\r\n")
  32. self.q.put(None)
  33. self.nodeSerial.write(b'xmodem\n')
  34. def run(self):
  35. """\
  36. Thread that handles the incoming traffic. Does the basic input
  37. transformation (newlines) and generates an SerialRxEvent
  38. """
  39. b = ''
  40. cmd = []
  41. lines = ''
  42. while self.alive.isSet():
  43. for c in self.nodeSerial.read():
  44. c = chr(c)
  45. print(c, end='')
  46. if self.char_callback != None:
  47. wx.CallAfter(self.char_callback, c)
  48. b += c
  49. if b[0:3] == 'ch>':
  50. if (len(cmd) > 0):
  51. callback = self.q.get(block=False)
  52. if callback != None:
  53. wx.CallAfter(callback, lines)
  54. lines = []
  55. cmd = []
  56. if c == '\n':
  57. if len(cmd) > 0:
  58. lines.append(b)
  59. if b[0:3] == 'ch>':
  60. b = b.replace('\r', '').replace('\n','')
  61. cmd = b.split(' ')[1:]
  62. lines = []
  63. if len(cmd) > 0:
  64. if cmd[0] == 'xmodem':
  65. modem = XMODEM(self.getc, self.putc)
  66. stream = io.BytesIO(bytes(''.join(self.xmodemlines), 'utf-8'))
  67. modem.send(stream)
  68. b = ''