diff --git a/README.rst b/README.rst index 86f04ed..a436cd5 100644 --- a/README.rst +++ b/README.rst @@ -1,6 +1,8 @@ rshell ========= +support python version <= 3.12 on win; not support python >= 3.13; + Remote MicroPython shell. This is a simple shell which runs on the host and uses MicroPython's diff --git a/rshell/main.py b/rshell/main.py index 7a810d9..c23104e 100755 --- a/rshell/main.py +++ b/rshell/main.py @@ -149,8 +149,8 @@ UART_BUFFER_SIZE = 32 BUFFER_SIZE = USB_BUFFER_SIZE QUIET = False -RTS = '' -DTR = '' +SERIAL_EXCLUSIVE = True + # It turns out that just because pyudev is installed doesn't mean that # it can actually be used. So we only bother to try if we're running @@ -1676,6 +1676,8 @@ def write(self, buf): class DeviceSerial(Device): def __init__(self, port, baud, wait): + global SERIAL_EXCLUSIVE + self.port = port self.baud = baud self.wait = wait @@ -1700,7 +1702,8 @@ def __init__(self, port, baud, wait): self.dev_name_long = '%s at %d baud' % (port, baud) try: - pyb = Pyboard(port, baudrate=baud, wait=wait, rts=RTS, dtr=DTR) + pyb = Pyboard(port, baudrate=baud, + wait=wait, exclusive = SERIAL_EXCLUSIVE) except PyboardError as err: print(err) sys.exit(1) @@ -2841,21 +2844,19 @@ def do_rsync(self, line): def real_main(): """The main program.""" - global RTS - global DTR + global BUFFER_SIZE, SERIAL_EXCLUSIVE + try: default_baud = int(os.getenv('RSHELL_BAUD')) except: default_baud = 115200 default_port = os.getenv('RSHELL_PORT') - default_rts = os.getenv('RSHELL_RTS') or RTS - default_dtr = os.getenv('RSHELL_DTR') or DTR default_user = os.getenv('RSHELL_USER') or 'micro' default_password = os.getenv('RSHELL_PASSWORD') or 'python' default_editor = os.getenv('RSHELL_EDITOR') or os.getenv('VISUAL') or os.getenv('EDITOR') or 'vi' default_color = sys.stdout.isatty() default_nocolor = not default_color - global BUFFER_SIZE + try: default_buffer_size = int(os.getenv('RSHELL_BUFFER_SIZE')) except: @@ -2890,18 +2891,6 @@ def real_main(): help="Set the serial port to use (default '%s')" % default_port, default=default_port ) - parser.add_argument( - "--rts", - dest="rts", - help="Set the RTS state (default '%s')" % default_rts, - default=default_rts - ) - parser.add_argument( - "--dtr", - dest="dtr", - help="Set the DTR state (default '%s')" % default_dtr, - default=default_dtr - ) parser.add_argument( "-u", "--user", dest="user", @@ -2932,6 +2921,14 @@ def real_main(): help="Enable debug features", default=False ) + parser.add_argument( + "--noexclusive", + dest="no_exclusive", + action="store_true", + help="Open the serial port Not use exclusive access mode" + " (default SERIAL_EXCLUSIVE is True)", + default=False + ) parser.add_argument( "-n", "--nocolor", dest="nocolor", @@ -3034,8 +3031,9 @@ def real_main(): global ASCII_XFER ASCII_XFER = args.ascii_xfer - RTS = args.rts - DTR = args.dtr + + if args.no_exclusive: + SERIAL_EXCLUSIVE = False if args.list: listports() diff --git a/rshell/pyboard.py b/rshell/pyboard.py index 3e35c0b..d319b85 100644 --- a/rshell/pyboard.py +++ b/rshell/pyboard.py @@ -1,10 +1,40 @@ #!/usr/bin/env python3 +# +# This file is part of the MicroPython project, http://micropython.org/ +# +# The MIT License (MIT) +# +# Copyright (c) 2014-2021 Damien P. George +# Copyright (c) 2017 Paul Sokolovsky +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + """ pyboard interface This module provides the Pyboard class, used to communicate with and -control the pyboard over a serial USB connection. +control a MicroPython device over a communication channel. Both real +boards and emulated devices (e.g. running in QEMU) are supported. +Various communication channels are supported, including a serial +connection, telnet-style network connection, external process +connection. Example usage: @@ -18,6 +48,7 @@ Then: pyb.enter_raw_repl() + pyb.exec('import pyb') pyb.exec('pyb.LED(1).on()') pyb.exit_raw_repl() @@ -37,53 +68,72 @@ """ +import ast +import errno +import os +import struct import sys import time +from collections import namedtuple + try: stdout = sys.stdout.buffer except AttributeError: # Python2 doesn't have buffer attr stdout = sys.stdout + def stdout_write_bytes(b): b = b.replace(b"\x04", b"") stdout.write(b) stdout.flush() -class PyboardError(BaseException): - pass + +class PyboardError(Exception): + def convert(self, info): + if len(self.args) >= 3: + if b"OSError" in self.args[2] and b"ENOENT" in self.args[2]: + return OSError(errno.ENOENT, info) + + return self + + +listdir_result = namedtuple("dir_result", ["name", "st_mode", "st_ino", "st_size"]) + class TelnetToSerial: def __init__(self, ip, user, password, read_timeout=None): + self.tn = None import telnetlib + self.tn = telnetlib.Telnet(ip, timeout=15) self.read_timeout = read_timeout - if b'Login as:' in self.tn.read_until(b'Login as:', timeout=read_timeout): - self.tn.write(bytes(user, 'ascii') + b"\r\n") + if b"Login as:" in self.tn.read_until(b"Login as:", timeout=read_timeout): + self.tn.write(bytes(user, "ascii") + b"\r\n") - if b'Password:' in self.tn.read_until(b'Password:', timeout=read_timeout): + if b"Password:" in self.tn.read_until(b"Password:", timeout=read_timeout): # needed because of internal implementation details of the telnet server time.sleep(0.2) - self.tn.write(bytes(password, 'ascii') + b"\r\n") + self.tn.write(bytes(password, "ascii") + b"\r\n") - if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.', timeout=read_timeout): + if b"for more information." in self.tn.read_until( + b'Type "help()" for more information.', timeout=read_timeout + ): # login successful from collections import deque + self.fifo = deque() return - raise PyboardError('Failed to establish a telnet connection with the board') + raise PyboardError("Failed to establish a telnet connection with the board") def __del__(self): self.close() def close(self): - try: + if self.tn: self.tn.close() - except: - # the telnet object might not exist yet, so ignore this one - pass def read(self, size=1): while len(self.fifo) < size: @@ -98,7 +148,7 @@ def read(self, size=1): break timeout_count += 1 - data = b'' + data = b"" while len(data) < size and len(self.fifo) > 0: data += bytes([self.fifo.popleft()]) return data @@ -116,51 +166,167 @@ def inWaiting(self): else: return n_waiting -def parse_bool(str): - return str == '1' or str.lower() == 'true' + +class ProcessToSerial: + "Execute a process and emulate serial connection using its stdin/stdout." + + def __init__(self, cmd): + import subprocess + + self.subp = subprocess.Popen( + cmd, + bufsize=0, + shell=True, + preexec_fn=os.setsid, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + ) + + # Initially was implemented with selectors, but that adds Python3 + # dependency. However, there can be race conditions communicating + # with a particular child process (like QEMU), and selectors may + # still work better in that case, so left inplace for now. + # + # import selectors + # self.sel = selectors.DefaultSelector() + # self.sel.register(self.subp.stdout, selectors.EVENT_READ) + + import select + + self.poll = select.poll() + self.poll.register(self.subp.stdout.fileno()) + + def close(self): + import signal + + os.killpg(os.getpgid(self.subp.pid), signal.SIGTERM) + + def read(self, size=1): + data = b"" + while len(data) < size: + data += self.subp.stdout.read(size - len(data)) + return data + + def write(self, data): + self.subp.stdin.write(data) + return len(data) + + def inWaiting(self): + # res = self.sel.select(0) + res = self.poll.poll(0) + if res: + return 1 + return 0 + + +class ProcessPtyToTerminal: + """Execute a process which creates a PTY and prints slave PTY as + first line of its output, and emulate serial connection using + this PTY.""" + + def __init__(self, cmd): + import subprocess + import re + import serial + + self.subp = subprocess.Popen( + cmd.split(), + bufsize=0, + shell=False, + preexec_fn=os.setsid, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + pty_line = self.subp.stderr.readline().decode("utf-8") + m = re.search(r"/dev/pts/[0-9]+", pty_line) + if not m: + print("Error: unable to find PTY device in startup line:", pty_line) + self.close() + sys.exit(1) + pty = m.group() + # rtscts, dsrdtr params are to workaround pyserial bug: + # http://stackoverflow.com/questions/34831131/pyserial-does-not-play-well-with-virtual-port + self.serial = serial.Serial(pty, interCharTimeout=1, rtscts=True, dsrdtr=True) + + def close(self): + import signal + + os.killpg(os.getpgid(self.subp.pid), signal.SIGTERM) + + def read(self, size=1): + return self.serial.read(size) + + def write(self, data): + return self.serial.write(data) + + def inWaiting(self): + return self.serial.inWaiting() + class Pyboard: - def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0, rts='', dtr=''): - if device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3: + def __init__( + self, device, baudrate=115200, user="micro", + password="python", wait=0, exclusive=True + ): + self.in_raw_repl = False + self.use_raw_paste = True + if device.startswith("exec:"): + self.serial = ProcessToSerial(device[len("exec:") :]) + elif device.startswith("execpty:"): + self.serial = ProcessPtyToTerminal(device[len("qemupty:") :]) + elif device and device[0].isdigit() and device[-1].isdigit() and device.count(".") == 3: # device looks like an IP address self.serial = TelnetToSerial(device, user, password, read_timeout=10) else: import serial + import serial.tools.list_ports + + # Set options, and exclusive if pyserial supports it + serial_kwargs = {"baudrate": baudrate, "interCharTimeout": 1} + if serial.__version__ >= "3.3": + serial_kwargs["exclusive"] = exclusive + delayed = False - if serial.VERSION == '3.0': - self.serial = serial.Serial(baudrate=baudrate, inter_byte_timeout=1) - else: - self.serial = serial.Serial(baudrate=baudrate, interCharTimeout=1) - if rts != '': - self.serial.rts = parse_bool(rts) - if dtr != '': - self.serial.dtr = parse_bool(dtr) - self.serial.port = device for attempt in range(wait + 1): try: - # Assigning the port attribute will attempt to open the port - self.serial.open() + if os.name == "nt": + self.serial = serial.Serial(**serial_kwargs) + self.serial.port = device + portinfo = list(serial.tools.list_ports.grep(device)) # type: ignore + if portinfo and portinfo[0].manufacturer != "Microsoft": + # ESP8266/ESP32 boards use RTS/CTS for flashing and boot mode selection. + # DTR False: to avoid using the reset button will hang the MCU in bootloader mode + # RTS False: to prevent pulses on rts on serial.close() that would POWERON_RESET an ESPxx + self.serial.dtr = False # DTR False = gpio0 High = Normal boot + self.serial.rts = False # RTS False = EN High = MCU enabled + self.serial.open() + else: + self.serial = serial.Serial(device, **serial_kwargs) break - except (OSError, IOError): # Py2 and Py3 have different errors + except (OSError, IOError): # Py2 and Py3 have different errors if wait == 0: continue if attempt == 0: - sys.stdout.write('Waiting {} seconds for pyboard '.format(wait)) + sys.stdout.write("Waiting {} seconds for pyboard ".format(wait)) delayed = True time.sleep(1) - sys.stdout.write('.') + sys.stdout.write(".") sys.stdout.flush() else: if delayed: - print('') - raise PyboardError('failed to access ' + device) + print("") + raise PyboardError("failed to access " + device) if delayed: - print('') + print("") def close(self): self.serial.close() def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None): + # if data_consumer is used then data is not accumulated and the ending must be 1 byte long + assert data_consumer is None or len(ending) == 1 + data = self.serial.read(min_num_bytes) if data_consumer: data_consumer(data) @@ -170,9 +336,11 @@ def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None): break elif self.serial.inWaiting() > 0: new_data = self.serial.read(1) - data = data + new_data if data_consumer: data_consumer(new_data) + data = new_data + else: + data = data + new_data timeout_count = 0 else: timeout_count += 1 @@ -181,8 +349,8 @@ def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None): time.sleep(0.01) return data - def enter_raw_repl(self): - self.serial.write(b'\r\x03\x03') # ctrl-C twice: interrupt any running program + def enter_raw_repl(self, soft_reset=True): + self.serial.write(b"\r\x03\x03") # ctrl-C twice: interrupt any running program # flush input (without relying on serial.flushInput()) n = self.serial.inWaiting() @@ -190,94 +358,307 @@ def enter_raw_repl(self): self.serial.read(n) n = self.serial.inWaiting() - self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL - data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n>') - if not data.endswith(b'raw REPL; CTRL-B to exit\r\n>'): - print(data) - raise PyboardError('could not enter raw repl') + self.serial.write(b"\r\x01") # ctrl-A: enter raw REPL - self.serial.write(b'\x04') # ctrl-D: soft reset - data = self.read_until(1, b'soft reboot\r\n') - if not data.endswith(b'soft reboot\r\n'): - print(data) - raise PyboardError('could not enter raw repl') - # By splitting this into 2 reads, it allows boot.py to print stuff, - # which will show up after the soft reboot and before the raw REPL. - data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n') - if not data.endswith(b'raw REPL; CTRL-B to exit\r\n'): + if soft_reset: + data = self.read_until(1, b"raw REPL; CTRL-B to exit\r\n>") + if not data.endswith(b"raw REPL; CTRL-B to exit\r\n>"): + print(data) + raise PyboardError("could not enter raw repl") + + self.serial.write(b"\x04") # ctrl-D: soft reset + + # Waiting for "soft reboot" independently to "raw REPL" (done below) + # allows boot.py to print, which will show up after "soft reboot" + # and before "raw REPL". + data = self.read_until(1, b"soft reboot\r\n") + if not data.endswith(b"soft reboot\r\n"): + print(data) + raise PyboardError("could not enter raw repl") + + data = self.read_until(1, b"raw REPL; CTRL-B to exit\r\n") + if not data.endswith(b"raw REPL; CTRL-B to exit\r\n"): print(data) - raise PyboardError('could not enter raw repl') + raise PyboardError("could not enter raw repl") + + self.in_raw_repl = True def exit_raw_repl(self): - self.serial.write(b'\r\x02') # ctrl-B: enter friendly REPL + self.serial.write(b"\r\x02") # ctrl-B: enter friendly REPL + self.in_raw_repl = False def follow(self, timeout, data_consumer=None): # wait for normal output - data = self.read_until(1, b'\x04', timeout=timeout, data_consumer=data_consumer) - if not data.endswith(b'\x04'): - raise PyboardError('timeout waiting for first EOF reception') + data = self.read_until(1, b"\x04", timeout=timeout, data_consumer=data_consumer) + if not data.endswith(b"\x04"): + raise PyboardError("timeout waiting for first EOF reception") data = data[:-1] # wait for error output - data_err = self.read_until(1, b'\x04', timeout=timeout) - if not data_err.endswith(b'\x04'): - raise PyboardError('timeout waiting for second EOF reception') + data_err = self.read_until(1, b"\x04", timeout=timeout) + if not data_err.endswith(b"\x04"): + raise PyboardError("timeout waiting for second EOF reception") data_err = data_err[:-1] # return normal and error output return data, data_err + def raw_paste_write(self, command_bytes): + # Read initial header, with window size. + data = self.serial.read(2) + window_size = struct.unpack("') - if not data.endswith(b'>'): - raise PyboardError('could not enter raw repl') - - # write command + data = self.read_until(1, b">") + if not data.endswith(b">"): + raise PyboardError("could not enter raw repl") + + if self.use_raw_paste: + # Try to enter raw-paste mode. + self.serial.write(b"\x05A\x01") + data = self.serial.read(2) + if data == b"R\x00": + # Device understood raw-paste command but doesn't support it. + pass + elif data == b"R\x01": + # Device supports raw-paste mode, write out the command using this mode. + return self.raw_paste_write(command_bytes) + else: + # Device doesn't support raw-paste, fall back to normal raw REPL. + data = self.read_until(1, b"w REPL; CTRL-B to exit\r\n>") + if not data.endswith(b"w REPL; CTRL-B to exit\r\n>"): + print(data) + raise PyboardError("could not enter raw repl") + # Don't try to use raw-paste mode again for this connection. + self.use_raw_paste = False + + # Write command using standard raw REPL, 256 bytes every 10ms. for i in range(0, len(command_bytes), 256): - self.serial.write(command_bytes[i:min(i + 256, len(command_bytes))]) + self.serial.write(command_bytes[i : min(i + 256, len(command_bytes))]) time.sleep(0.01) - self.serial.write(b'\x04') + self.serial.write(b"\x04") # check if we could exec command data = self.serial.read(2) - if data != b'OK': - raise PyboardError('could not exec command') + if data != b"OK": + raise PyboardError("could not exec command (response: %r)" % data) def exec_raw(self, command, timeout=10, data_consumer=None): - self.exec_raw_no_follow(command); + self.exec_raw_no_follow(command) return self.follow(timeout, data_consumer) - def eval(self, expression): - ret = self.exec_('print({})'.format(expression)) - ret = ret.strip() - return ret + def eval(self, expression, parse=False): + if parse: + ret = self.exec_("print(repr({}))".format(expression)) + ret = ret.strip() + return ast.literal_eval(ret.decode()) + else: + ret = self.exec_("print({})".format(expression)) + ret = ret.strip() + return ret - def exec_(self, command): - ret, ret_err = self.exec_raw(command) + # In Python3, call as pyboard.exec(), see the setattr call below. + def exec_(self, command, data_consumer=None): + ret, ret_err = self.exec_raw(command, data_consumer=data_consumer) if ret_err: - raise PyboardError('exception', ret, ret_err) + raise PyboardError("exception", ret, ret_err) return ret def execfile(self, filename): - with open(filename, 'rb') as f: + with open(filename, "rb") as f: pyfile = f.read() return self.exec_(pyfile) def get_time(self): - t = str(self.eval('pyb.RTC().datetime()'), encoding='utf8')[1:-1].split(', ') + t = str(self.eval("pyb.RTC().datetime()"), encoding="utf8")[1:-1].split(", ") return int(t[4]) * 3600 + int(t[5]) * 60 + int(t[6]) + def fs_exists(self, src): + try: + self.exec_("import os\nos.stat(%s)" % (("'%s'" % src) if src else "")) + return True + except PyboardError: + return False + + def fs_ls(self, src): + cmd = ( + "import os\nfor f in os.ilistdir(%s):\n" + " print('{:12} {}{}'.format(f[3]if len(f)>3 else 0,f[0],'/'if f[1]&0x4000 else ''))" + % (("'%s'" % src) if src else "") + ) + self.exec_(cmd, data_consumer=stdout_write_bytes) + + def fs_listdir(self, src=""): + buf = bytearray() + + def repr_consumer(b): + buf.extend(b.replace(b"\x04", b"")) + + cmd = "import os\nfor f in os.ilistdir(%s):\n" " print(repr(f), end=',')" % ( + ("'%s'" % src) if src else "" + ) + try: + buf.extend(b"[") + self.exec_(cmd, data_consumer=repr_consumer) + buf.extend(b"]") + except PyboardError as e: + raise e.convert(src) + + return [ + listdir_result(*f) if len(f) == 4 else listdir_result(*(f + (0,))) + for f in ast.literal_eval(buf.decode()) + ] + + def fs_stat(self, src): + try: + self.exec_("import os") + return os.stat_result(self.eval("os.stat(%s)" % ("'%s'" % src), parse=True)) + except PyboardError as e: + raise e.convert(src) + + def fs_cat(self, src, chunk_size=256): + cmd = ( + "with open('%s') as f:\n while 1:\n" + " b=f.read(%u)\n if not b:break\n print(b,end='')" % (src, chunk_size) + ) + self.exec_(cmd, data_consumer=stdout_write_bytes) + + def fs_readfile(self, src, chunk_size=256): + buf = bytearray() + + def repr_consumer(b): + buf.extend(b.replace(b"\x04", b"")) + + cmd = ( + "with open('%s', 'rb') as f:\n while 1:\n" + " b=f.read(%u)\n if not b:break\n print(b,end='')" % (src, chunk_size) + ) + try: + self.exec_(cmd, data_consumer=repr_consumer) + except PyboardError as e: + raise e.convert(src) + return ast.literal_eval(buf.decode()) + + def fs_writefile(self, dest, data, chunk_size=256): + self.exec_("f=open('%s','wb')\nw=f.write" % dest) + while data: + chunk = data[:chunk_size] + self.exec_("w(" + repr(chunk) + ")") + data = data[len(chunk) :] + self.exec_("f.close()") + + def fs_cp(self, src, dest, chunk_size=256, progress_callback=None): + if progress_callback: + src_size = self.fs_stat(src).st_size + written = 0 + self.exec_("fr=open('%s','rb')\nr=fr.read\nfw=open('%s','wb')\nw=fw.write" % (src, dest)) + while True: + data_len = int(self.exec_("d=r(%u)\nw(d)\nprint(len(d))" % chunk_size)) + if not data_len: + break + if progress_callback: + written += data_len + progress_callback(written, src_size) + self.exec_("fr.close()\nfw.close()") + + def fs_get(self, src, dest, chunk_size=256, progress_callback=None): + if progress_callback: + src_size = self.fs_stat(src).st_size + written = 0 + self.exec_("f=open('%s','rb')\nr=f.read" % src) + with open(dest, "wb") as f: + while True: + data = bytearray() + self.exec_("print(r(%u))" % chunk_size, data_consumer=lambda d: data.extend(d)) + assert data.endswith(b"\r\n\x04") + try: + data = ast.literal_eval(str(data[:-3], "ascii")) + if not isinstance(data, bytes): + raise ValueError("Not bytes") + except (UnicodeError, ValueError) as e: + raise PyboardError("fs_get: Could not interpret received data: %s" % str(e)) + if not data: + break + f.write(data) + if progress_callback: + written += len(data) + progress_callback(written, src_size) + self.exec_("f.close()") + + def fs_put(self, src, dest, chunk_size=256, progress_callback=None): + if progress_callback: + src_size = os.path.getsize(src) + written = 0 + self.exec_("f=open('%s','wb')\nw=f.write" % dest) + with open(src, "rb") as f: + while True: + data = f.read(chunk_size) + if not data: + break + if sys.version_info < (3,): + self.exec_("w(b" + repr(data) + ")") + else: + self.exec_("w(" + repr(data) + ")") + if progress_callback: + written += len(data) + progress_callback(written, src_size) + self.exec_("f.close()") + + def fs_mkdir(self, dir): + self.exec_("import os\nos.mkdir('%s')" % dir) + + def fs_rmdir(self, dir): + self.exec_("import os\nos.rmdir('%s')" % dir) + + def fs_rm(self, src): + self.exec_("import os\nos.remove('%s')" % src) + + def fs_touch(self, src): + self.exec_("f=open('%s','a')\nf.close()" % src) + + # in Python2 exec is a keyword so one must use "exec_" # but for Python3 we want to provide the nicer version "exec" setattr(Pyboard, "exec", Pyboard.exec_) -def execfile(filename, device='/dev/ttyACM0', baudrate=115200, user='micro', password='python'): + +def execfile(filename, device="/dev/ttyACM0", baudrate=115200, user="micro", password="python"): pyb = Pyboard(device, baudrate, user, password) pyb.enter_raw_repl() output = pyb.execfile(filename) @@ -285,56 +666,257 @@ def execfile(filename, device='/dev/ttyACM0', baudrate=115200, user='micro', pas pyb.exit_raw_repl() pyb.close() + +def filesystem_command(pyb, args, progress_callback=None, verbose=False): + def fname_remote(src): + if src.startswith(":"): + src = src[1:] + # Convert all path separators to "/", because that's what a remote device uses. + return src.replace(os.path.sep, "/") + + def fname_cp_dest(src, dest): + _, src = os.path.split(src) + if dest is None or dest == "": + dest = src + elif dest == ".": + dest = "./" + src + elif dest.endswith("/"): + dest += src + return dest + + cmd = args[0] + args = args[1:] + try: + if cmd == "cp": + if len(args) == 1: + raise PyboardError( + "cp: missing destination file operand after '{}'".format(args[0]) + ) + srcs = args[:-1] + dest = args[-1] + if dest.startswith(":"): + op_remote_src = pyb.fs_cp + op_local_src = pyb.fs_put + else: + op_remote_src = pyb.fs_get + op_local_src = lambda src, dest, **_: __import__("shutil").copy(src, dest) + for src in srcs: + if verbose: + print("cp %s %s" % (src, dest)) + if src.startswith(":"): + op = op_remote_src + else: + op = op_local_src + src2 = fname_remote(src) + dest2 = fname_cp_dest(src2, fname_remote(dest)) + op(src2, dest2, progress_callback=progress_callback) + else: + ops = { + "cat": pyb.fs_cat, + "ls": pyb.fs_ls, + "mkdir": pyb.fs_mkdir, + "rm": pyb.fs_rm, + "rmdir": pyb.fs_rmdir, + "touch": pyb.fs_touch, + } + if cmd not in ops: + raise PyboardError("'{}' is not a filesystem command".format(cmd)) + if cmd == "ls" and not args: + args = [""] + for src in args: + src = fname_remote(src) + if verbose: + print("%s :%s" % (cmd, src)) + ops[cmd](src) + except PyboardError as er: + if len(er.args) > 1: + print(str(er.args[2], "ascii")) + else: + print(er) + pyb.exit_raw_repl() + pyb.close() + sys.exit(1) + + +_injected_import_hook_code = """\ +import os, io +class _FS: + class File(io.IOBase): + def __init__(self): + self.off = 0 + def ioctl(self, request, arg): + return 0 + def readinto(self, buf): + buf[:] = memoryview(_injected_buf)[self.off:self.off + len(buf)] + self.off += len(buf) + return len(buf) + mount = umount = chdir = lambda *args: None + def stat(self, path): + if path == '_injected.mpy': + return tuple(0 for _ in range(10)) + else: + raise OSError(-2) # ENOENT + def open(self, path, mode): + return self.File() +os.mount(_FS(), '/_') +os.chdir('/_') +from _injected import * +os.umount('/_') +del _injected_buf, _FS +""" + + def main(): import argparse - cmd_parser = argparse.ArgumentParser(description='Run scripts on the pyboard.') - cmd_parser.add_argument('--device', default='/dev/ttyACM0', help='the serial device or the IP address of the pyboard') - cmd_parser.add_argument('-b', '--baudrate', default=115200, help='the baud rate of the serial device') - cmd_parser.add_argument('-u', '--user', default='micro', help='the telnet login username') - cmd_parser.add_argument('-p', '--password', default='python', help='the telnet login password') - cmd_parser.add_argument('-c', '--command', help='program passed in as string') - cmd_parser.add_argument('-w', '--wait', default=0, type=int, help='seconds to wait for USB connected board to become available') - cmd_parser.add_argument('--follow', action='store_true', help='follow the output after running the scripts [default if no scripts given]') - cmd_parser.add_argument('files', nargs='*', help='input files') + + cmd_parser = argparse.ArgumentParser(description="Run scripts on the pyboard.") + cmd_parser.add_argument( + "-d", + "--device", + default=os.environ.get("PYBOARD_DEVICE", "/dev/ttyACM0"), + help="the serial device or the IP address of the pyboard", + ) + cmd_parser.add_argument( + "-b", + "--baudrate", + default=os.environ.get("PYBOARD_BAUDRATE", "115200"), + help="the baud rate of the serial device", + ) + cmd_parser.add_argument("-u", "--user", default="micro", help="the telnet login username") + cmd_parser.add_argument("-p", "--password", default="python", help="the telnet login password") + cmd_parser.add_argument("-c", "--command", help="program passed in as string") + cmd_parser.add_argument( + "-w", + "--wait", + default=0, + type=int, + help="seconds to wait for USB connected board to become available", + ) + group = cmd_parser.add_mutually_exclusive_group() + group.add_argument( + "--soft-reset", + default=True, + action="store_true", + help="Whether to perform a soft reset when connecting to the board [default]", + ) + group.add_argument( + "--no-soft-reset", + action="store_false", + dest="soft_reset", + ) + group = cmd_parser.add_mutually_exclusive_group() + group.add_argument( + "--follow", + action="store_true", + default=None, + help="follow the output after running the scripts [default if no scripts given]", + ) + group.add_argument( + "--no-follow", + action="store_false", + dest="follow", + ) + group = cmd_parser.add_mutually_exclusive_group() + group.add_argument( + "--exclusive", + action="store_true", + default=True, + help="Open the serial device for exclusive access [default]", + ) + group.add_argument( + "--no-exclusive", + action="store_false", + dest="exclusive", + ) + cmd_parser.add_argument( + "-f", + "--filesystem", + action="store_true", + help="perform a filesystem action: " + "cp local :device | cp :device local | cat path | ls [path] | rm path | mkdir path | rmdir path", + ) + cmd_parser.add_argument("files", nargs="*", help="input files") args = cmd_parser.parse_args() - def execbuffer(buf): + # open the connection to the pyboard + try: + pyb = Pyboard( + args.device, args.baudrate, args.user, args.password, args.wait, args.exclusive + ) + except PyboardError as er: + print(er) + sys.exit(1) + + # run any command or file(s) + if args.command is not None or args.filesystem or len(args.files): + # we must enter raw-REPL mode to execute commands + # this will do a soft-reset of the board try: - pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait) - pyb.enter_raw_repl() - ret, ret_err = pyb.exec_raw(buf, timeout=None, data_consumer=stdout_write_bytes) - pyb.exit_raw_repl() - pyb.close() + pyb.enter_raw_repl(args.soft_reset) except PyboardError as er: print(er) + pyb.close() sys.exit(1) - except KeyboardInterrupt: - sys.exit(1) - if ret_err: - stdout_write_bytes(ret_err) - sys.exit(1) - - if args.command is not None: - execbuffer(args.command.encode('utf-8')) - - for filename in args.files: - with open(filename, 'rb') as f: - pyfile = f.read() - execbuffer(pyfile) - if args.follow or (args.command is None and len(args.files) == 0): + def execbuffer(buf): + try: + if args.follow is None or args.follow: + ret, ret_err = pyb.exec_raw( + buf, timeout=None, data_consumer=stdout_write_bytes + ) + else: + pyb.exec_raw_no_follow(buf) + ret_err = None + except PyboardError as er: + print(er) + pyb.close() + sys.exit(1) + except KeyboardInterrupt: + sys.exit(1) + if ret_err: + pyb.exit_raw_repl() + pyb.close() + stdout_write_bytes(ret_err) + sys.exit(1) + + # do filesystem commands, if given + if args.filesystem: + filesystem_command(pyb, args.files, verbose=True) + del args.files[:] + + # run the command, if given + if args.command is not None: + execbuffer(args.command.encode("utf-8")) + + # run any files + for filename in args.files: + with open(filename, "rb") as f: + pyfile = f.read() + if filename.endswith(".mpy") and pyfile[0] == ord("M"): + pyb.exec_("_injected_buf=" + repr(pyfile)) + pyfile = _injected_import_hook_code + execbuffer(pyfile) + + # exiting raw-REPL just drops to friendly-REPL mode + pyb.exit_raw_repl() + + # if asked explicitly, or no files given, then follow the output + if args.follow or (args.command is None and not args.filesystem and len(args.files) == 0): try: - pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait) ret, ret_err = pyb.follow(timeout=None, data_consumer=stdout_write_bytes) - pyb.close() except PyboardError as er: print(er) sys.exit(1) except KeyboardInterrupt: sys.exit(1) if ret_err: + pyb.close() stdout_write_bytes(ret_err) sys.exit(1) + # close the connection to the pyboard + pyb.close() + + if __name__ == "__main__": - main() + main() \ No newline at end of file diff --git a/rshell/version.py b/rshell/version.py index 138343c..d9edf17 100644 --- a/rshell/version.py +++ b/rshell/version.py @@ -1 +1 @@ -__version__ = '0.0.31' +__version__ = '0.0.33' diff --git a/setup.cfg b/setup.cfg index b88034e..08aedd7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,2 +1,2 @@ [metadata] -description-file = README.md +description_file = README.md diff --git a/setup.py b/setup.py index 1652e55..be4be84 100644 --- a/setup.py +++ b/setup.py @@ -50,6 +50,6 @@ }, extras_require={ ':sys_platform == "win32"': [ - 'pyreadline'] + 'pyreadline3'] } )