From 0bab8734d588249a85c321f6a1e6dad096446c05 Mon Sep 17 00:00:00 2001 From: dummys Date: Wed, 29 Sep 2021 15:11:50 +0200 Subject: [PATCH 01/10] migration to python3 --- umap2/apps/list_classes.py | 4 +- umap2/dev/audio.py | 64 ++++++++++++++--------------- umap2/dev/cdc.py | 2 +- umap2/dev/cdc_acm.py | 2 +- umap2/dev/cdc_dl.py | 2 +- umap2/dev/hub.py | 4 +- umap2/dev/mass_storage.py | 4 +- umap2/fuzz/fuzz_engine.py | 2 +- umap2/fuzz/helpers.py | 10 ++--- umap2/fuzz/templates/audio.py | 6 +-- umap2/fuzz/templates/cdc.py | 2 +- umap2/fuzz/templates/enum.py | 2 +- umap2/fuzz/templates/hid.py | 8 ++-- umap2/phy/facedancer/max342x_phy.py | 2 +- 14 files changed, 57 insertions(+), 57 deletions(-) diff --git a/umap2/apps/list_classes.py b/umap2/apps/list_classes.py index cb37d31..80ff896 100644 --- a/umap2/apps/list_classes.py +++ b/umap2/apps/list_classes.py @@ -17,8 +17,8 @@ def run(self): ks = self.umap_classes verbose = self.options.get('--verbose', False) if verbose: - print '%-20s %s' % ('Device', 'Description') - print '-------------------- ----------------------------------------------------' + print('%-20s %s' % ('Device', 'Description')) + print('-------------------- ----------------------------------------------------') for k in ks: if verbose: print('%-20s %s' % (k, self.umap_class_dict[k][1])) diff --git a/umap2/dev/audio.py b/umap2/dev/audio.py index 6897b83..f917c9f 100644 --- a/umap2/dev/audio.py +++ b/umap2/dev/audio.py @@ -42,23 +42,23 @@ def setup_local_handlers(self): } self._settings = { # (val, index): [cur, min, max, res, (idle)] - (0x0100, 0x0001): ['\x44\xac\x00', '\x44\xac\x00', '\x80\xbb\x00', '\x80\xbb\x00'], + (0x0100, 0x0001): [b'\x44\xac\x00', b'\x44\xac\x00', b'\x80\xbb\x00', b'\x80\xbb\x00'], # (0x0100, 0x0002): ['\x44\xac\x00', '\x44\xac\x00', '\x80\xbb\x00', '\x80\xbb\x00'], - (0x0100, 0x0082): ['\x44\xac\x00', '\x44\xac\x00', '\x80\xbb\x00', '\x80\xbb\x00'], - (0x0100, 0x0900): ['\x00', '\x00', '\xff', '\x00'], - (0x0100, 0x0a00): ['\x01', '\x00', '\xff', '\x00'], - (0x0100, 0x0d00): ['\x01', '\x00', '\xff', '\x00'], - (0x0101, 0x0f00): ['\x01', '\x00', '\xff', '\x00'], - (0x0102, 0x0f00): ['\x01', '\x00', '\xff', '\x00'], - (0x0200, 0x0a00): ['\x00\x00', '\x00\x00', '\x55\x00', '\x30\x00', '\x00\x00'], - (0x0200, 0x0d00): ['\x80\x22', '\x00\x00', '\xd0\x00', '\x30\x00'], - (0x0201, 0x0900): ['\x80\x22', '\x20\x00', '\xa0\x00', '\x30\x00'], - (0x0201, 0x0f00): ['\x01', '\x00', '\xff', '\x00'], - (0x0202, 0x0900): ['\xcf\x00', '\x00\x00', '\xcf\x00', '\x30\x00'], - (0x0202, 0x0f00): ['\x01', '\x00', '\xff', '\x00'], - (0x0301, 0x0f00): ['\x01', '\x00', '\xff', '\x00'], - (0x0302, 0x0f00): ['\x00\x00', '\x00\x00', '\x00\x00', '\x00\x00'], - (0x0700, 0x0a00): ['\x01', '\x00', '\xff', '\x00'], + (0x0100, 0x0082): [b'\x44\xac\x00', b'\x44\xac\x00', b'\x80\xbb\x00', b'\x80\xbb\x00'], + (0x0100, 0x0900): [b'\x00', b'\x00', b'\xff', b'\x00'], + (0x0100, 0x0a00): [b'\x01', b'\x00', b'\xff', b'\x00'], + (0x0100, 0x0d00): [b'\x01', b'\x00', b'\xff', b'\x00'], + (0x0101, 0x0f00): [b'\x01', b'\x00', b'\xff', b'\x00'], + (0x0102, 0x0f00): [b'\x01', b'\x00', b'\xff', b'\x00'], + (0x0200, 0x0a00): [b'\x00\x00', b'\x00\x00', b'\x55\x00', b'\x30\x00', b'\x00\x00'], + (0x0200, 0x0d00): [b'\x80\x22', b'\x00\x00', b'\xd0\x00', b'\x30\x00'], + (0x0201, 0x0900): [b'\x80\x22', b'\x20\x00', b'\xa0\x00', b'\x30\x00'], + (0x0201, 0x0f00): [b'\x01', b'\x00', b'\xff', b'\x00'], + (0x0202, 0x0900): [b'\xcf\x00', b'\x00\x00', b'\xcf\x00', b'\x30\x00'], + (0x0202, 0x0f00): [b'\x01', b'\x00', b'\xff', b'\x00'], + (0x0301, 0x0f00): [b'\x01', b'\x00', b'\xff', b'\x00'], + (0x0302, 0x0f00): [b'\x00\x00', b'\x00\x00', b'\x00\x00', b'\x00\x00'], + (0x0700, 0x0a00): [b'\x01', b'\x00', b'\xff', b'\x00'], } self._cur = b'\x44\xac\x00' @@ -205,29 +205,29 @@ def __init__(self, app, phy, vid=0x0d8c, pid=0x000c, rev=0x0001, *args, **kwargs app=app, phy=phy, iface_num=0, iface_alt=0, iface_str_idx=0, cs_ifaces=[ # Class specific AC interface: header (4.3.2) - USBCSInterface('ACHeader', app, phy, '\x01\x00\x01\x64\x00\x02\x01\x02'), + USBCSInterface('ACHeader', app, phy, b'\x01\x00\x01\x64\x00\x02\x01\x02'), # Class specific AC interface: input terminal (Table 4.3.2.1) - USBCSInterface('ACInputTerminal0', app, phy, '\x02\x01\x01\x01\x00\x02\x03\x00\x00\x00'), - USBCSInterface('ACInputTerminal1', app, phy, '\x02\x02\x01\x02\x00\x01\x01\x00\x00\x00'), + USBCSInterface('ACInputTerminal0', app, phy, b'\x02\x01\x01\x01\x00\x02\x03\x00\x00\x00'), + USBCSInterface('ACInputTerminal1', app, phy, b'\x02\x02\x01\x02\x00\x01\x01\x00\x00\x00'), # Class specific AC interface: output terminal (Table 4.3.2.2) - USBCSInterface('ACOutputTerminal0', app, phy, '\x03\x06\x01\x03\x00\x09\x00'), - USBCSInterface('ACOutputTerminal1', app, phy, '\x03\x07\x01\x01\x00\x08\x00'), + USBCSInterface('ACOutputTerminal0', app, phy, b'\x03\x06\x01\x03\x00\x09\x00'), + USBCSInterface('ACOutputTerminal1', app, phy, b'\x03\x07\x01\x01\x00\x08\x00'), # Class specific AC interface: selector unit (Table 4.3.2.4) - USBCSInterface('ACSelectorUnit', app, phy, '\x05\x08\x01\x0a\x00'), + USBCSInterface('ACSelectorUnit', app, phy, b'\x05\x08\x01\x0a\x00'), # Class specific AC interface: feature unit (Table 4.3.2.5) - USBCSInterface('ACFeatureUnit0', app, phy, '\x06\x09\x0f\x01\x01\x02\x02\x00'), - USBCSInterface('ACFeatureUnit1', app, phy, '\x06\x0a\x02\x01\x43\x00\x00'), - USBCSInterface('ACFeatureUnit2', app, phy, '\x06\x0d\x02\x01\x03\x00\x00'), + USBCSInterface('ACFeatureUnit0', app, phy, b'\x06\x09\x0f\x01\x01\x02\x02\x00'), + USBCSInterface('ACFeatureUnit1', app, phy, b'\x06\x0a\x02\x01\x43\x00\x00'), + USBCSInterface('ACFeatureUnit2', app, phy, b'\x06\x0d\x02\x01\x03\x00\x00'), # Class specific AC interface: mixer unit (Table 4.3.2.3) - USBCSInterface('ACMixerUnit', app, phy, '\x04\x0f\x02\x01\x0d\x02\x03\x00\x00\x00\x00'), + USBCSInterface('ACMixerUnit', app, phy, b'\x04\x0f\x02\x01\x0d\x02\x03\x00\x00\x00\x00'), ], usb_class=usb_class ), USBAudioStreamingInterface( app=app, phy=phy, iface_num=1, iface_alt=0, iface_str_idx=0, cs_ifaces=[ - USBCSInterface('ASGeneral', app, phy, '\x01\x01\x01\x01\x00'), - USBCSInterface('ASFormatType', app, phy, '\x02\x01\x02\x02\x10\x02\x44\xac\x00\x44\xac\x00'), + USBCSInterface('ASGeneral', app, phy, b'\x01\x01\x01\x01\x00'), + USBCSInterface('ASFormatType', app, phy, b'\x02\x01\x02\x02\x10\x02\x44\xac\x00\x44\xac\x00'), ], endpoints=[ USBEndpoint( @@ -240,7 +240,7 @@ def __init__(self, app, phy, vid=0x0d8c, pid=0x000c, rev=0x0001, *args, **kwargs interval=1, handler=audio_streaming.data_available, cs_endpoints=[ - USBCSEndpoint('ASEndpoint', app, phy, '\x01\x01\x01\x01\x00') + USBCSEndpoint('ASEndpoint', app, phy, b'\x01\x01\x01\x01\x00') ], usb_class=usb_class, ) @@ -250,8 +250,8 @@ def __init__(self, app, phy, vid=0x0d8c, pid=0x000c, rev=0x0001, *args, **kwargs USBAudioStreamingInterface( app=app, phy=phy, iface_num=2, iface_alt=0, iface_str_idx=0, cs_ifaces=[ - USBCSInterface('ASGeneral', app, phy, '\x01\x07\x01\x01\x00'), - USBCSInterface('ASFormatType', app, phy, '\x02\x01\x01\x02\x10\x02\x44\xac\x00\x44\xac\x00'), + USBCSInterface('ASGeneral', app, phy, b'\x01\x07\x01\x01\x00'), + USBCSInterface('ASFormatType', app, phy, b'\x02\x01\x01\x02\x10\x02\x44\xac\x00\x44\xac\x00'), ], endpoints=[ USBEndpoint( @@ -264,7 +264,7 @@ def __init__(self, app, phy, vid=0x0d8c, pid=0x000c, rev=0x0001, *args, **kwargs interval=1, handler=audio_streaming.buffer_available, cs_endpoints=[ - USBCSEndpoint('ASEndpoint', app, phy, '\x01\x01\x00\x00\x00') + USBCSEndpoint('ASEndpoint', app, phy, b'\x01\x01\x00\x00\x00') ], usb_class=usb_class, ) diff --git a/umap2/dev/cdc.py b/umap2/dev/cdc.py index 16235aa..9fae8fc 100644 --- a/umap2/dev/cdc.py +++ b/umap2/dev/cdc.py @@ -191,7 +191,7 @@ def handle_getter(self, req): key = (param_id, req.value, req.index) if key in self.params: return self.params[key] - return '\x00' * req.length + return b'\x00' * req.length def handle_clear(self, req): param_id = self.get_param_id_from_request(req.request) diff --git a/umap2/dev/cdc_acm.py b/umap2/dev/cdc_acm.py index ae3f063..712d747 100644 --- a/umap2/dev/cdc_acm.py +++ b/umap2/dev/cdc_acm.py @@ -29,7 +29,7 @@ def __init__(self, app, phy, vid=0x2548, pid=0x1001, rev=0x0010, cs_interfaces=N cdc_cls = self.get_default_class(app, phy) cs_interfaces = [ # Header Functional Descriptor - FD(app, phy, FD.Header, '\x01\x01'), + FD(app, phy, FD.Header, b'\x01\x01'), # Call Management Functional Descriptor FD(app, phy, FD.CM, struct.pack('BB', bmCapabilities, USBCDCDevice.bDataInterface)), FD(app, phy, FD.ACM, struct.pack('B', bmCapabilities)), diff --git a/umap2/dev/cdc_dl.py b/umap2/dev/cdc_dl.py index 5b6af81..c823570 100644 --- a/umap2/dev/cdc_dl.py +++ b/umap2/dev/cdc_dl.py @@ -28,7 +28,7 @@ def __init__(self, app, phy, vid=0x2548, pid=0x1001, rev=0x0010, cs_interfaces=N cdc_cls = self.get_default_class(app, phy) cs_interfaces = [ # Header Functional Descriptor - FD(app, phy, FD.Header, '\x01\x01'), + FD(app, phy, FD.Header, b'\x01\x01'), # Call Management Functional Descriptor FD(app, phy, FD.CM, struct.pack('BB', bmCapabilities, USBCDCDevice.bDataInterface)), FD(app, phy, FD.DLM, struct.pack('B', bmCapabilities)), diff --git a/umap2/dev/hub.py b/umap2/dev/hub.py index 5397dbd..47dc4c7 100644 --- a/umap2/dev/hub.py +++ b/umap2/dev/hub.py @@ -54,8 +54,8 @@ def handle_get_descriptor(self, req): num_bytes = self.num_ports // 7 if self.num_ports % 7 != 0: num_bytes += 1 - d += '\x00' * num_bytes - d += '\xff' * num_bytes + d += b'\x00' * num_bytes + d += b'\xff' * num_bytes d = struct.pack('B', len(d) + 1) + d return d diff --git a/umap2/dev/mass_storage.py b/umap2/dev/mass_storage.py index 26b09c1..cf04eec 100644 --- a/umap2/dev/mass_storage.py +++ b/umap2/dev/mass_storage.py @@ -129,7 +129,7 @@ def put_sector_data(self, address, data): block_end = (address + 1) * self.block_size # slices are NON-inclusive pad_len = (self.block_size - (len(data) % self.block_size)) % self.block_size - data += '\x00' * pad_len + data += b'\x00' * pad_len self.image[block_start:block_end] = data[:self.block_size] self.image.flush() @@ -391,7 +391,7 @@ def handle_scsi_mode_sense(self, mode_type, page, subpage, alloc_len, ctrl, with if report is None: # default behaviour, taken from previous implementation # this should probably be changed ... - report = '\x07\x00\x00\x00\x00\x00\x00\x00' + report = b'\x07\x00\x00\x00\x00\x00\x00\x00' if with_header: self.debug('SCSI mode sense (%d) - adding header' % (mode_type)) report = self._report_header(mode_type, len(report)) + report diff --git a/umap2/fuzz/fuzz_engine.py b/umap2/fuzz/fuzz_engine.py index 15bcee0..6b47a08 100755 --- a/umap2/fuzz/fuzz_engine.py +++ b/umap2/fuzz/fuzz_engine.py @@ -20,7 +20,7 @@ from kitty.model import GraphModel from kitty.model import Template, Meta, String, UInt32 -from templates import audio, cdc, enum, generic, hid, hub, mass_storage +from . templates import audio, cdc, enum, generic, hid, hub, mass_storage from templates import smart_card from controller import UmapController diff --git a/umap2/fuzz/helpers.py b/umap2/fuzz/helpers.py index 8eed012..72d36f0 100644 --- a/umap2/fuzz/helpers.py +++ b/umap2/fuzz/helpers.py @@ -6,7 +6,6 @@ import binascii import inspect - class StageLogger(object): def __init__(self, filename): @@ -22,7 +21,8 @@ def stop(self): def log_stage(self, stage): if self.fd: - self.fd.write(stage + '\n') + + self.fd.write(stage.encode() + b'\n') self.fd.flush() @@ -46,8 +46,8 @@ def mutable(stage, silent=False): def wrap_f(func): func_self = None if inspect.ismethod(func): - func_self = func.im_self - func = func.im_func + func_self = func.__self__ + func = func.__func__ def wrapper(*args, **kwargs): if func_self is None: @@ -79,7 +79,7 @@ def wrapper(*args, **kwargs): self.logger.error(''.join(traceback.format_stack())) raise e if response is not None: - info('Response: %s' % binascii.hexlify(response)) + info(f'Response: {response.hex()}') return response return wrapper return wrap_f diff --git a/umap2/fuzz/templates/audio.py b/umap2/fuzz/templates/audio.py index c8502a6..1e27346 100644 --- a/umap2/fuzz/templates/audio.py +++ b/umap2/fuzz/templates/audio.py @@ -6,8 +6,8 @@ from kitty.model import Template, Repeat, List, Container, ForEach, OneOf from kitty.model import ElementCount, SizeInBytes from kitty.model import ENC_INT_LE -from hid import GenerateHidReport -from generic import Descriptor, SizedPt, DynamicInt, SubDescriptor +from . hid import GenerateHidReport +from . generic import Descriptor, SizedPt, DynamicInt, SubDescriptor class _AC_DescriptorSubTypes: # AC Interface Descriptor Subtype @@ -138,7 +138,7 @@ class _AS_DescriptorSubTypes: # AS Interface Descriptor Subtype audio_report_descriptor = Template( name='audio_report_descriptor', fields=GenerateHidReport( - '050C0901A1011500250109E909EA75019502810209E209008106050B092095018142050C09009503810226FF000900750895038102090095049102C0'.decode('hex') + bytes.fromhex('050C0901A1011500250109E909EA75019502810209E209008106050B092095018142050C09009503810226FF000900750895038102090095049102C0') ) ) diff --git a/umap2/fuzz/templates/cdc.py b/umap2/fuzz/templates/cdc.py index d455e16..c1d61ef 100644 --- a/umap2/fuzz/templates/cdc.py +++ b/umap2/fuzz/templates/cdc.py @@ -8,7 +8,7 @@ from kitty.model import Template, Repeat, List, Container, ForEach, OneOf from kitty.model import ElementCount from kitty.model import MutableField -from generic import SubDescriptor +from . generic import SubDescriptor cdc_control_interface_descriptor = Template( diff --git a/umap2/fuzz/templates/enum.py b/umap2/fuzz/templates/enum.py index 56fde91..09dddd1 100644 --- a/umap2/fuzz/templates/enum.py +++ b/umap2/fuzz/templates/enum.py @@ -10,7 +10,7 @@ from kitty.model import ElementCount, SizeInBytes # encoders from kitty.model import StrEncodeEncoder, ENC_INT_LE -from generic import Descriptor, SubDescriptor +from . generic import Descriptor, SubDescriptor # Device descriptor diff --git a/umap2/fuzz/templates/hid.py b/umap2/fuzz/templates/hid.py index 21c041c..b3eaf9d 100644 --- a/umap2/fuzz/templates/hid.py +++ b/umap2/fuzz/templates/hid.py @@ -24,7 +24,7 @@ from kitty.model import ENC_INT_LE from kitty.core import KittyException from random import Random -from generic import DynamicInt, Descriptor +from . generic import DynamicInt, Descriptor opcodes = { @@ -124,7 +124,7 @@ def GenerateHidReport(report_str, name=None): index = 0 namer = NameGen() while index < len(report_str): - opcode = ord(report_str[index]) + opcode = report_str[index] num_args = opcode & 3 if index + num_args >= len(report_str): raise KittyException('Not enough bytes in hid report for last opcode') @@ -134,7 +134,7 @@ def GenerateHidReport(report_str, name=None): fields.append(UInt8(opcode, name=cur_name)) else: args = report_str[index:index + num_args] - value = sum(ord(args[i]) << (i * 8) for i in range(len(args))) # little endian... + value = sum(args[i] << (i * 8) for i in range(len(args))) # little endian... fields.append(Container( name=cur_name, fields=[ @@ -179,6 +179,6 @@ def GenerateHidReport(report_str, name=None): hid_report_descriptor = Template( name='hid_report_descriptor', fields=GenerateHidReport( - '05010906A101050719E029E7150025017501950881029501750881011900296515002565750895018100C0'.decode('hex') + bytes.fromhex('05010906A101050719E029E7150025017501950881029501750881011900296515002565750895018100C0') ) ) diff --git a/umap2/phy/facedancer/max342x_phy.py b/umap2/phy/facedancer/max342x_phy.py index 31ee3d6..1aa9ed9 100644 --- a/umap2/phy/facedancer/max342x_phy.py +++ b/umap2/phy/facedancer/max342x_phy.py @@ -210,7 +210,7 @@ def service_irqs(self): self.clear_irq_bit(Regs.endpoint_irq, PINCTL.setup_data_avail) b = self.read_bytes(Regs.setup_data_fifo, 8) - if (irq & PINCTL.out0_data_avail) and (ord(b[0]) & 0x80 == 0x00): + if (irq & PINCTL.out0_data_avail) and (b[0] & 0x80 == 0x00): data_bytes_len = struct.unpack(' Date: Thu, 30 Sep 2021 10:50:00 +0200 Subject: [PATCH 02/10] removed useless binascii import --- umap2/fuzz/helpers.py | 1 - 1 file changed, 1 deletion(-) diff --git a/umap2/fuzz/helpers.py b/umap2/fuzz/helpers.py index 72d36f0..277d1a2 100644 --- a/umap2/fuzz/helpers.py +++ b/umap2/fuzz/helpers.py @@ -3,7 +3,6 @@ ''' import traceback -import binascii import inspect class StageLogger(object): From 16b5c4fb6aec1f38735dfb5f07d9a90c8ccbdd09 Mon Sep 17 00:00:00 2001 From: Marius Karstedt Date: Mon, 23 Jun 2025 10:35:39 +0200 Subject: [PATCH 03/10] Added support for cynthion into cli options & added PhyInterface --- umap2/apps/base.py | 5 + umap2/apps/scan.py | 1 + umap2/phy/cynthion/cynthion_phy.py | 624 +++++++++++++++++++++++++++++ 3 files changed, 630 insertions(+) create mode 100644 umap2/phy/cynthion/cynthion_phy.py diff --git a/umap2/apps/base.py b/umap2/apps/base.py index 85c5712..e9c8da7 100644 --- a/umap2/apps/base.py +++ b/umap2/apps/base.py @@ -10,6 +10,7 @@ from umap2.phy.facedancer.max342x_phy import Max342xPhy from umap2.phy.gadgetfs.gadgetfs_phy import GadgetFsPhy +from umap2.phy.cynthion.cynthion_phy import CynthionPhy from umap2.utils.ulogger import set_default_handler_level @@ -79,6 +80,10 @@ def load_phy(self, phy_string): self.logger.debug('Physical interface is GadgetFs') phy = GadgetFsPhy(self) return phy + elif phy_type == 'cynthion': + self.logger.debug('Physical interface is Cynthion') + phy = CynthionPhy(self) + return phy raise Exception('Phy type not supported: %s' % phy_type) def load_device(self, dev_name, phy): diff --git a/umap2/apps/scan.py b/umap2/apps/scan.py index 564256c..732056a 100644 --- a/umap2/apps/scan.py +++ b/umap2/apps/scan.py @@ -12,6 +12,7 @@ Physical layer: fd: use facedancer connected to given serial port gadgetfs use gadgetfs (requires mounting of gadgetfs beforehand) + cynthion use cynthion Example: umap2scan -P fd:/dev/ttyUSB0 -q diff --git a/umap2/phy/cynthion/cynthion_phy.py b/umap2/phy/cynthion/cynthion_phy.py new file mode 100644 index 0000000..f5ff6b0 --- /dev/null +++ b/umap2/phy/cynthion/cynthion_phy.py @@ -0,0 +1,624 @@ +from umap2.phy.iphy import PhyInterface +import platform +from umap2.core.usb_device import USBDevice, USBDeviceRequest + +from typing import List, Tuple +from cynthion import Cynthion +from cynthion.boards.cynthion_moondancer import CynthionMoondancer +from pygreat.classes.core import CoreAPI + +from facedancer.types import USBDirection, DeviceSpeed +from facedancer.backends.moondancer import InterruptEvent +from facedancer.request import USBControlRequest + +# Based on: MoondancerApp from .venv/lib/python3.12/site-packages/facedancer/backends/moondancer.py +# MoondancerApp is used by the facedancer USBDevice .venv/lib/python3.12/site-packages/facedancer/device.py + +# CynthionPhy will be used by umap2 USBDevice umap2/core/usb_device.py + + + + + + +# Havily based on .venv/lib/python3.12/site-packages/facedancer/backends/moondancer.py aka MoondancerApp +class CynthionPhy(PhyInterface): + # Notes: + # Facedaner setAdress will do: + # request.acknowledge(blocking=True) + # self.set_address(request.value) + # + # We just self.ack_status_stage() + # maybe it is required to set_address + + + + + app_name = "Moondancer" + + # Number of supported USB endpoints. + SUPPORTED_ENDPOINTS = 16 + + + + device_speed : DeviceSpeed = None + device : CynthionMoondancer = None + api: CoreAPI = None + # 1 Create physical + # for device + # device = self.load_device(device_name, phy) + # device.connect() + # phy.connect(device) + # device.run() + # phy.run() + # device.disconnect() + # phy.disconnect() + # phy.disconnect() + + def __init__(self, app): + super(CynthionPhy, self).__init__(app, 'CynthionPhy') + if platform.system() != 'Linux': + raise Exception('CynthionPhy is only supported on Linux') + + + if not self.appropriate_for_environment('cynthion'): + raise Exception('CynthionPhy is not supported on this platform') + + self.device = Cynthion() + + # CynthionMoondancer is a subclass of CynthionBoard which is a subclass of GreatBoard + self.api = self.device.apis.moondancer + + # Initialize a dictionary that will store the last setup + # whether each endpoint is currently stalled. + self.endpoint_stalled = {} + for i in range(self.SUPPORTED_ENDPOINTS): + self.endpoint_stalled[i] = False + + # Assume a max packet size of 64 until configured otherwise. + self.max_packet_size_ep0 = 64 + + # Start off by assuming we're not waiting for an OUT control transfer's + # data stage. # See handle_setup_complete_on_endpoint for details. + self.pending_control_request = None + + # Store a reference to the device's active configuration, + # which we'll use to know which endpoints we'll need to check + # for data transfer readiness. + self.configuration = None + + # Maintain a list of configured endpoints with form: (address, max_packet_size, USBTransferType) + self.configured_endpoints = dict() + + def connect(self, usb_device: USBDevice, device_speed: DeviceSpeed=DeviceSpeed.FULL): + ''' + Connect a USB device to the Cynthion hardware + + :param usb_device: USB device instance to connect + ''' + super(CynthionPhy, self).connect(usb_device) + + # Quirks are not provided anywhere by UMAP2 + quirks = 0 + + if self.device_speed != None: + device_speed=self.device_speed + + if device_speed not in [DeviceSpeed.FULL, DeviceSpeed.HIGH]: + self.warning(f"Moondancer only supports USB Full and High Speed. Ignoring requested speed: {device_speed.name}") + + self.debug(f"moondancer.connect(max_packet_size_ep0:{self.max_packet_size_ep0}, device_speed:{device_speed}, quirks:{quirks})") + + # Override the max packet size if provided by the USBDevice + if usb_device.max_packet_size_ep0 is None: + raise Exception('max_packet_size_ep0 is not set') + + self.api.connect(usb_device.max_packet_size_ep0, device_speed, quirks) + + device_name = f"{type(self.connected_device).__module__}.{type(self.connected_device).__qualname__}" + + self.info(f"Connected {device_speed.name} speed device '{device_name}' to target host.") + + def disconnect(self): + ''' + Disconnect the device from the Cynthion hardware + ''' + self.info("Disconnecting from target host.") + + self.device.comms.release_exclusive_access() + self.api.disconnect() + return super(CynthionPhy, self).disconnect() + + def send_on_endpoint(self, ep_num: int, data: bytes): + ''' + Send data on a specific endpoint + + :param ep_num: number of endpoint + :param data: data to send + ''' + self._send_on_endpoint(ep_num, data) + + def stall_ep0(self): + ''' + Stalls control endpoint (0) + ''' + self._stall_endpoint(0) + + def ack_status_stage(self): + ''' + Acknowledge status stage + ''' + self._ack_status_stage() + + def run(self): + self.service_irqs() + + def service_irqs(self): + ''' + Handle USB requests + ''' + self.stop = False + # Constantly service any events that need to be performed. + while not self.stop: + + self._service_irqs() + + if self.app.should_stop_phy(): + self.stop = True + break + + # Hooks nessesary for umap2 + def pre_handle_request_hook(self, buf): + req = USBDeviceRequest(buf) + + if req.request == 9: + pass + + def post_handle_request_hook(self, buf): + req = USBDeviceRequest(buf) + + if req.request == 5:# handle_set_address_request + # Not sure if the api needs the address, but MoondancerApp does it. + self._set_address(req.value) + + elif req.request == 9:#handle_set_configuration_request + # Runs a version of the MoondancerApp.configured() method. + + + # If we need to issue a configuration command, issue one. + # (If there are no endpoints other than control, this command will be + # empty, and we can skip this.) + endpoint_triplets = [] + + for nr, endpoint in self.connected_device.endpoints.entries(): + self.debug(f"Configuring endpoint: {endpoint}.") + triple = (endpoint.address, endpoint.max_packet_size, endpoint.transfer_type,) + endpoint_triplets.append(triple) + + #for interface in self.configuration.get_interfaces(): + # for endpoint in interface.get_endpoints():# + + # self.debug(f"Configuring endpoint: {endpoint}.") + + # triple = (endpoint.get_address(), endpoint.max_packet_size, endpoint.transfer_type,) + # endpoint_triplets.append(triple) + + if len(endpoint_triplets): + self.api.configure_endpoints(*endpoint_triplets) + for triplet in endpoint_triplets: + self.configured_endpoints[triplet[0]] = triplet + + # If we've just set up endpoints, check to see if any of them + # have NAKs waiting. + nak_status = self.api.get_nak_status() + self._handle_ep_in_nak_status(nak_status) + + self.info("Target host configuration complete.") + + + + # Support for USBControlRequest class from MoondancerApp to answer requests from the host + def control_send(self, endpoint_number: int, in_request: USBControlRequest, data: bytes, *, blocking: bool = False): + """ Queues sending data on the provided control endpoint in + response to a IN control request. + + Args: + endpoint_number : The endpoint number to send data upon. + in_request : The control request being responded to. + data : The data to send. + blocking : If provided and true, this function will block + until the backend indicates the send is complete. + + """ + self._send_on_control_endpoint(endpoint_number, in_request, data, blocking=blocking) + + # Additional methods from MoondancerApp + + def _set_address(self, address: int, defer: bool = False): + """ Updates the device's knowledge of its own address. + + Args: + address : The address to apply. + defer : If true, the address change should be deferred + until the next time a control request ends. Should + be set if we're changing the address before we ack + the relevant transaction. + """ + self.address = address + self.debug(f"moondancer.set_address({address}, {defer})") + + self.api.set_address(address, 1 if defer else 0) + + def _stall_endpoint(self, endpoint_number:int, direction: USBDirection=USBDirection.OUT): + """ + Stalls the provided endpoint, as defined in the USB spec. + + Args: + endpoint_number : The number of the endpoint to be stalled. + """ + + endpoint_address = (endpoint_number | 0x80) if direction else endpoint_number + self.debug(f"Stalling EP{endpoint_number} {USBDirection(direction).name} (0x{endpoint_address:x})") + + # Mark endpoint number as stalled. + self.endpoint_stalled[endpoint_number] = True + + # Stall endpoint address. + if direction: + self.api.stall_endpoint_in(endpoint_number) + self.debug(f" moondancer.api.stall_endpoint_in({endpoint_number})") + else: + self.api.stall_endpoint_out(endpoint_number) + self.debug(f" moondancer.api.stall_endpoint_out({endpoint_number})") + + def _ack_status_stage(self, direction: USBDirection=USBDirection.OUT, endpoint_number:int =0, blocking: bool=False): + """ + Handles the status stage of a correctly completed control request, + by priming the appropriate endpoint to handle the status phase. + + Args: + direction : Determines if we're ACK'ing an IN or OUT vendor request. + (This should match the direction of the DATA stage.) + endpoint_number : The endpoint number on which the control request + occurred. + blocking : True if we should wait for the ACK to be fully issued + before returning. + """ + + self.debug(f"moondancer.ack_status_stage({direction.name}, {endpoint_number}, {blocking})") + + if direction == USBDirection.OUT: # HOST_TO_DEVICE + # If this was an OUT request, we'll prime the output buffer to + # respond with the ZLP expected during the status stage. + self.api.write_endpoint(endpoint_number, blocking, bytes([])) + + self.debug(f" moondancer.api.write_endpoint({endpoint_number}, {blocking}, [])") + + else: # DEVICE_TO_HOST (IN) + # If this was an IN request, we'll need to set up a transfer descriptor + # so the status phase can operate correctly. This effectively reads the + # zero length packet from the STATUS phase. + self.api.ep_out_prime_receive(endpoint_number) + + self.debug(f" moondancer.api.ep_out_prime_receive({endpoint_number})") + + def _send_on_endpoint(self, endpoint_number: int, data: bytes, blocking: bool=True): + """ + Sends a collection of USB data on a given endpoint. + + Args: + endpoint_number : The number of the IN endpoint on which data should be sent. + data : The data to be sent. + blocking : If true, this function will wait for the transfer to complete. + """ + + self.api.write_endpoint(endpoint_number, blocking, bytes(data)) + + self.debug(f"moondancer.send_on_endpoint({endpoint_number}, {len(data)}, {blocking})") + self.debug(f" moondancer.api.write_endpoint({endpoint_number}, {blocking}, {len(data)})") + + + def _read_from_endpoint(self, endpoint_number: int) -> bytes: + """ + Reads a block of data from the given endpoint. + + Args: + endpoint_number : The number of the OUT endpoint on which data is to be rx'd. + """ + + self.debug(f"moondancer.read_from_endpoint({endpoint_number})") + + # Read from the given endpoint... + data = self.api.read_endpoint(endpoint_number) + + # Re-enable OUT interface to receive data again... + self.api.ep_out_interface_enable() + + self.debug(f" moondancer.api.read_endpoint({endpoint_number}) -> {len(data)} '{data}'") + + # Finally, return the result. + return data + + def _send_on_control_endpoint(self, endpoint_number: int, in_request: USBControlRequest, data: bytes, blocking: bool=True): + """ + Sends a collection of USB data in response to a IN control request by the host. + + Args: + endpoint_number : The number of the IN endpoint on which data should be sent. + requested_length : The number of bytes requested by the host. + data : The data to be sent. + blocking : If true, this function should wait for the transfer to complete. + """ + requested_length = in_request.length + self.api.write_control_endpoint(endpoint_number, requested_length, blocking, bytes(data)) + + self.debug(f"moondancer.send_on_control_endpoint({endpoint_number}, {requested_length}, {len(data)}, {blocking})") + self.debug(f" moondancer.api.write_control_endpoint({endpoint_number}, {requested_length}, {blocking}, {len(data)})") + + def _clear_halt(self, endpoint_number: int, direction: USBDirection): + """ Clears a halt condition on the provided non-control endpoint. + + Args: + endpoint_number : The endpoint number + direction : The endpoint direction; or OUT if not provided. + """ + + endpoint_address = (endpoint_number | 0x80) if direction else endpoint_number + self.debug(f"Clearing halt EP{endpoint_number} {USBDirection(direction).name} (0x{endpoint_address:x})") + + self.api.clear_feature_endpoint_halt(endpoint_number, direction) + self.debug(f" moondancer.api.clear_feature_endpoint_halt({endpoint_number}, {direction})") + + def _service_irqs(self): + """ + Core routine of the Facedancer execution/event loop. Continuously monitors the + Moondancer's execution status, and reacts as events occur. + """ + + # Get latest interrupt events + events: List[Tuple[int, int]] = self.api.get_interrupt_events() + + # Handle interrupt events. + if len(events) > 0: + + # gcp doesn't seem to return a nested tuple if it's only one event + if isinstance(events[0], int): + events = [ events ] + + parsed_events = [InterruptEvent(event) for event in events] + + for event in parsed_events: + self.debug(f"MD IRQ => {event}") + if event == InterruptEvent.USB_BUS_RESET: + self._handle_bus_reset() + elif event == InterruptEvent.USB_RECEIVE_CONTROL: + self._handle_receive_control(event.endpoint_number) + elif event == InterruptEvent.USB_RECEIVE_PACKET and event.endpoint_number == 0: + # TODO support endpoints other than EP0 + self._handle_receive_control_packet(event.endpoint_number) + elif event == InterruptEvent.USB_RECEIVE_PACKET: + self._handle_receive_packet(event.endpoint_number) + elif event == InterruptEvent.USB_SEND_COMPLETE: + self._handle_send_complete(event.endpoint_number) + else: + self.error(f"Unhandled interrupt event: {event}") + + # Check EP_IN NAK status for pending data requests + else: + nak_status = self.api.get_nak_status() + if nak_status != 0: + self._handle_ep_in_nak_status(nak_status) + + # - Interrupt event handlers ---------------------------------------------- + + # USB0_BUS_RESET + def _handle_bus_reset(self): + """ + Triggers Moondancer to perform its side of a bus reset. + """ + + #if self.connected_device: + # self.connected_device.handle_bus_reset() + #else: + # self.api.bus_reset() + + # USBDevice from umap2 has no handle_bus_reset method, so we just call the api directly + self.info("Host issued a bus reset; resetting our connection.") + + # Clear our state back to address zero and no configuration. + self.configuration = None + #self.address = 0 + + self.debug(f"moondancer.bus_reset()") + self.api.bus_reset() + + # USB0_RECEIVE_CONTROL + def _handle_receive_control(self, endpoint_number: int): + """ + Handles a known outstanding control event on a given endpoint. + + endpoint_number: The endpoint number for which a control event should be serviced. + """ + + self.debug(f"handle_receive_control({endpoint_number})") + + # HACK: to maintain API compatibility with the existing facedancer API, + # we need to know if a stall happens at any point during our handler. + self.endpoint_stalled[endpoint_number] = False + + # Read the data from the SETUP stage... + data = bytearray(self.api.read_control()) + # !umap2 has no constructor for a request. It just uses the plain bytearray. + #request = self.connected_device.create_request(data) + request = USBControlRequest.from_raw_bytes(data, device=self) + + self.debug(f" moondancer.api.read_control({endpoint_number}) -> {len(data)} '{request}'") + + is_out = request.get_direction() == USBDirection.OUT # HOST_TO_DEVICE + has_data = (request.length > 0) + self.debug(f" is_out:{is_out} has_data:{has_data}") + + # Special case: if this is an OUT request with a data stage, we won't + # handle the request until the data stage has been completed. Instead, + # we'll stash away the data received in the setup stage, prime the + # endpoint for the data stage, and then wait for the data stage to + # complete, triggering a corresponding code path in + # in handle_transfer_complete_on_endpoint. + if is_out and has_data: + self.debug(f" setup packet has data - queueing read") + self.pending_control_request = request + self.api.ep_out_prime_receive(endpoint_number) + return + + # Pass the request to the emulated device for handling. + self.debug(f" connected_device.handle_request({request})") + + # !umap2 required the raw bytes to create its own request object. + self.pre_handle_request_hook(data) + self.connected_device.handle_request(data) + self.post_handle_request_hook(data) + + # If it was an IN request with a data stage we now need to + # prime the endpoint to receive a ZLP from the host + # acknowledging receipt of our response. + if has_data and not is_out and not self.endpoint_stalled[endpoint_number]: + self.debug(f" CONTROL IN -> prime ep to receive zlp") + self.api.ep_out_prime_receive(endpoint_number) + + + # USB0_RECEIVE_PACKET(0) + def _handle_receive_control_packet(self, endpoint_number: int): + self.debug(f"moondancer.handle_receive_control_packet({endpoint_number}) pending:{self.pending_control_request}") + + # Handle packet if we don't have a pending control request + if not self.pending_control_request: + data = self.api.read_endpoint(endpoint_number) + if len(data) == 0: + # It's a zlp following an IN control transfer, re-enable interface for reception on other endpoints. + self.api.ep_out_interface_enable() + else: + self.error(f"Discarding {len(data)} bytes on control endpoint with no pending control request") + return + + # We have a pending control request with a data stage... + # Read the rest of the data from the endpoint, completing the control request. + new_data = self.api.read_endpoint(endpoint_number) + + self.debug(f" handling control data stage: {len(new_data)} bytes") + self.debug(f" moondancer.api.read_endpoint({endpoint_number}) -> {len(new_data)}") + + if len(new_data) == 0: + # It's a zlp following a control IN transfer, re-enable interface for reception on other endpoints. + self.api.ep_out_interface_enable() + self.debug(f"ZLP ending Control IN transfer on ep: {endpoint_number}") + return + + # Append our new data to the pending control request. + self.pending_control_request.data.extend(new_data) + + all_data_received = len(self.pending_control_request.data) == self.pending_control_request.length + is_short_packet = len(new_data) < self.max_packet_size_ep0 + + if all_data_received or is_short_packet: + # Handle the completed setup request... + # !Transform self.pending_control_request (USBControlRequest) to raw bytes for umap2's USBDevice own USBDeviceRequest constructor + # self.connected_device.handle_request(self.pending_control_request) + self.pre_handle_request_hook(self.pending_control_request.raw() + self.pending_control_request.data) + self.connected_device.handle_request(self.pending_control_request.raw() + self.pending_control_request.data) + self.post_handle_request_hook(self.pending_control_request.raw() + self.pending_control_request.data) + + # And clear our pending setup data. + self.pending_control_request = None + + # Finally, re-enable interface for reception on other endpoints. + self.api.ep_out_interface_enable() + + return + + # Finally, re-prime our control endpoint to receive the rest of the control data. + self.api.ep_out_prime_receive(endpoint_number) + + # USB0_RECEIVE_PACKET(1...15) + def _handle_receive_packet(self, endpoint_number: int): + """ + Handles a known-completed transfer on a given endpoint. + + Args: + endpoint_number : The endpoint number for which the transfer should be serviced. + """ + + self.debug(f"moondancer.handle_receive_packet({endpoint_number})") + + # Read the data from the endpoint + data = self.api.read_endpoint(endpoint_number) + + self.debug(f" moondancer.api.read_endpoint({endpoint_number}) -> {len(data)}") + + # Ignore it if it's a ZLP ack as Facedancer devices don't handle it. + if len(data) == 0: + # Finally, Prime endpoint to receive again. + self.api.ep_out_interface_enable() + self.debug(f" ZLP ending Bulk IN transfer on ep: {endpoint_number}") + return + + # Pass it to the device's handler + self.connected_device.handle_data_available(endpoint_number, data) + + # Finally, re-enable other OUT endpoints so we can receive on them again. + self.api.ep_out_interface_enable() + + # USB0_SEND_COMPLETE + def _handle_send_complete(self, endpoint_number: int): + self.debug(f"handle_send_complete({endpoint_number})") + pass + + # Handle pending data requests on EP_IN + def _handle_ep_in_nak_status(self, nak_status: int): + nakked_endpoints = [epno for epno in range(self.SUPPORTED_ENDPOINTS) if (nak_status >> epno) & 1] + for endpoint_number in nakked_endpoints: + if endpoint_number != 0: + self.debug(f"Received IN NAK on ep{endpoint_number}") + # !!!not supported by umap2 + #self.connected_device.handle_nak(endpoint_number) + + + + + + + + + + + + + # Static methods + + @classmethod + def appropriate_for_environment(cls, backend_name: str) -> bool: + """ + Determines if the current environment seems appropriate + for using the Moondancer backend. + """ + + # Check: if we have a backend name other than moondancer, + # the user is trying to use something else. Abort! + if backend_name and backend_name != "cynthion": + return False + + # If we're not explicitly trying to use something else, + # see if there's a connected Cynthion. + try: + import cynthion + device = cynthion.Cynthion() + return device.supports_api('moondancer') + except ImportError: + cls.info("Skipping Cynthion-based devices, as the cynthion python module isn't installed.") + return False + except IOError: + cls.warning("Found Cynthion-based device, but could not access it. (Check permissions?)") + return False + except: + return False + + \ No newline at end of file From 728bff3a7d9978eb3bc4a51a977ff48eb0348bad Mon Sep 17 00:00:00 2001 From: Marius Karstedt Date: Mon, 23 Jun 2025 11:09:32 +0200 Subject: [PATCH 04/10] Migrated some leftover parts from python2 to python3 & Fixed issue with endpoint enumeration --- umap2/dev/keyboard.py | 2 +- umap2/dev/smartcard.py | 4 ++-- umap2/fuzz/templates/hub.py | 4 ++-- umap2/fuzz/templates/smart_card.py | 10 +++++----- umap2/phy/cynthion/cynthion_phy.py | 2 +- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/umap2/dev/keyboard.py b/umap2/dev/keyboard.py index a9834bf..389f257 100644 --- a/umap2/dev/keyboard.py +++ b/umap2/dev/keyboard.py @@ -181,7 +181,7 @@ def handle_buffer_available(self): if self.keys: letter = self.keys.pop(0) else: - letter = '\x00' + letter = b'\x00' self.type_letter(letter) def type_letter(self, letter, modifiers=0): diff --git a/umap2/dev/smartcard.py b/umap2/dev/smartcard.py index 1704aec..b3c1214 100644 --- a/umap2/dev/smartcard.py +++ b/umap2/dev/smartcard.py @@ -38,7 +38,7 @@ def setup_local_handlers(self): @mutable('get_clock_frequencies_response') def handle_get_clock_frequencies(self, req): - response = '' + response = b'' for frequency in self.interface.clock_frequencies: response += struct.pack(' Date: Mon, 23 Jun 2025 12:17:59 +0200 Subject: [PATCH 05/10] Added a more detailed README entry & Added facedancer license to CynthionPhy file. --- README.rst | 3 + umap2/phy/cynthion/cynthion_phy.py | 116 ++++++++++++----------------- 2 files changed, 49 insertions(+), 70 deletions(-) diff --git a/README.rst b/README.rst index 2cfe31f..19f14d2 100644 --- a/README.rst +++ b/README.rst @@ -75,6 +75,9 @@ Hardware - `Facedancer `_ is the recommended hardware for Umap2. Umap2 was developed based on it, and you'll get the most support with it. +- `Cynthion `_ is supported. + To get started, read the `Using Cynthion with Facedancer `_ + guide to install all the required dependencies. - `Raspdancer ` is supported on RPi - **GadgetFS** is partially supported. This support is very experimental (even more than the rest of Umap2) diff --git a/umap2/phy/cynthion/cynthion_phy.py b/umap2/phy/cynthion/cynthion_phy.py index a1bfeb0..d5d0c30 100644 --- a/umap2/phy/cynthion/cynthion_phy.py +++ b/umap2/phy/cynthion/cynthion_phy.py @@ -1,8 +1,42 @@ +# This file is heavily based on the Moondancer class from https://github.com/greatscottgadgets/facedancer/blob/main/facedancer/backends/moondancer.py +# This class reimplements the MoondancerApp class as CynthionPhy. +# The MoondancerApp class uses internal Facedancer classes like facedancer.device.USBDevice. +# The CynthionPhy implementation adds support for the internal umap2 classes like umap2.core.usb_device.USBDevice but still uses some of the Facedancer classes to reduce the amount of copyed code. + +# The original code is licensed under the BSD-3-Clause license: +# Copyright (c) 2019 Katherine J. Temkin +# Copyright (c) 2018 Dominic Spill +# Copyright (c) 2018 Travis Goodspeed +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation and/or +# other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its contributors +# may be used to endorse or promote products derived from this software without +# specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + from umap2.phy.iphy import PhyInterface -import platform from umap2.core.usb_device import USBDevice, USBDeviceRequest -from typing import List, Tuple +from typing import List, Tuple from cynthion import Cynthion from cynthion.boards.cynthion_moondancer import CynthionMoondancer from pygreat.classes.core import CoreAPI @@ -11,55 +45,17 @@ from facedancer.backends.moondancer import InterruptEvent from facedancer.request import USBControlRequest -# Based on: MoondancerApp from .venv/lib/python3.12/site-packages/facedancer/backends/moondancer.py -# MoondancerApp is used by the facedancer USBDevice .venv/lib/python3.12/site-packages/facedancer/device.py - -# CynthionPhy will be used by umap2 USBDevice umap2/core/usb_device.py - - - - - - -# Havily based on .venv/lib/python3.12/site-packages/facedancer/backends/moondancer.py aka MoondancerApp class CynthionPhy(PhyInterface): - # Notes: - # Facedaner setAdress will do: - # request.acknowledge(blocking=True) - # self.set_address(request.value) - # - # We just self.ack_status_stage() - # maybe it is required to set_address - - - - - app_name = "Moondancer" # Number of supported USB endpoints. - SUPPORTED_ENDPOINTS = 16 + SUPPORTED_ENDPOINTS: int = 16 - - - device_speed : DeviceSpeed = None + device_speed : DeviceSpeed = None device : CynthionMoondancer = None api: CoreAPI = None - # 1 Create physical - # for device - # device = self.load_device(device_name, phy) - # device.connect() - # phy.connect(device) - # device.run() - # phy.run() - # device.disconnect() - # phy.disconnect() - # phy.disconnect() def __init__(self, app): super(CynthionPhy, self).__init__(app, 'CynthionPhy') - if platform.system() != 'Linux': - raise Exception('CynthionPhy is only supported on Linux') - if not self.appropriate_for_environment('cynthion'): raise Exception('CynthionPhy is not supported on this platform') @@ -98,18 +94,18 @@ def connect(self, usb_device: USBDevice, device_speed: DeviceSpeed=DeviceSpeed.F ''' super(CynthionPhy, self).connect(usb_device) - # Quirks are not provided anywhere by UMAP2 + # Quirks are not provided anywhere by umap2 quirks = 0 if self.device_speed != None: - device_speed=self.device_speed + device_speed = self.device_speed if device_speed not in [DeviceSpeed.FULL, DeviceSpeed.HIGH]: self.warning(f"Moondancer only supports USB Full and High Speed. Ignoring requested speed: {device_speed.name}") self.debug(f"moondancer.connect(max_packet_size_ep0:{self.max_packet_size_ep0}, device_speed:{device_speed}, quirks:{quirks})") - # Override the max packet size if provided by the USBDevice + # Check if the USBDevice has the max_packet_size_ep0 set if usb_device.max_packet_size_ep0 is None: raise Exception('max_packet_size_ep0 is not set') @@ -178,7 +174,7 @@ def post_handle_request_hook(self, buf): req = USBDeviceRequest(buf) if req.request == 5:# handle_set_address_request - # Not sure if the api needs the address, but MoondancerApp does it. + # Not sure if the api needs the address, but MoondancerApp does it this way. self._set_address(req.value) elif req.request == 9:#handle_set_configuration_request @@ -195,14 +191,6 @@ def post_handle_request_hook(self, buf): triple = (endpoint.address, endpoint.max_packet_size, endpoint.transfer_type,) endpoint_triplets.append(triple) - #for interface in self.configuration.get_interfaces(): - # for endpoint in interface.get_endpoints():# - - # self.debug(f"Configuring endpoint: {endpoint}.") - - # triple = (endpoint.get_address(), endpoint.max_packet_size, endpoint.transfer_type,) - # endpoint_triplets.append(triple) - if len(endpoint_triplets): self.api.configure_endpoints(*endpoint_triplets) for triplet in endpoint_triplets: @@ -216,7 +204,6 @@ def post_handle_request_hook(self, buf): self.info("Target host configuration complete.") - # Support for USBControlRequest class from MoondancerApp to answer requests from the host def control_send(self, endpoint_number: int, in_request: USBControlRequest, data: bytes, *, blocking: bool = False): """ Queues sending data on the provided control endpoint in @@ -232,8 +219,8 @@ def control_send(self, endpoint_number: int, in_request: USBControlRequest, data """ self._send_on_control_endpoint(endpoint_number, in_request, data, blocking=blocking) - # Additional methods from MoondancerApp + # Additional slightly modified methods from MoondancerApp def _set_address(self, address: int, defer: bool = False): """ Updates the device's knowledge of its own address. @@ -578,20 +565,9 @@ def _handle_ep_in_nak_status(self, nak_status: int): for endpoint_number in nakked_endpoints: if endpoint_number != 0: self.debug(f"Received IN NAK on ep{endpoint_number}") - # !!!not supported by umap2 - #self.connected_device.handle_nak(endpoint_number) - - - - - - - - - - - - + # TODO not supported by umap2!!! + # self.connected_device.handle_nak(endpoint_number) + # Static methods @classmethod From f85cd342573763d1d1382629b8ac95f4582bd332 Mon Sep 17 00:00:00 2001 From: Marius Karstedt Date: Mon, 23 Jun 2025 15:30:55 +0200 Subject: [PATCH 06/10] Fixed module not found error by adding empty __init__.py file --- umap2/phy/cynthion/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 umap2/phy/cynthion/__init__.py diff --git a/umap2/phy/cynthion/__init__.py b/umap2/phy/cynthion/__init__.py new file mode 100644 index 0000000..e69de29 From a45b566c188bf9780a32d1e1de866108d0c03a3a Mon Sep 17 00:00:00 2001 From: Marius Karstedt Date: Tue, 24 Jun 2025 09:38:35 +0200 Subject: [PATCH 07/10] Added fuzzer support for python3 by implementing a custom StrEncoder because kitty does not support custom encodings for StrEncodeEncoder like utf_16_le for python >= 3 --- umap2/fuzz/fuzz_engine.py | 6 +++--- umap2/fuzz/templates/audio.py | 4 ++-- umap2/fuzz/templates/cdc.py | 2 +- umap2/fuzz/templates/enum.py | 18 +++++++++++++++--- umap2/fuzz/templates/hid.py | 2 +- umap2/fuzz/templates/hub.py | 2 +- umap2/fuzz/templates/mass_storage.py | 2 +- umap2/fuzz/templates/smart_card.py | 2 +- 8 files changed, 25 insertions(+), 13 deletions(-) diff --git a/umap2/fuzz/fuzz_engine.py b/umap2/fuzz/fuzz_engine.py index 6b47a08..c96885f 100755 --- a/umap2/fuzz/fuzz_engine.py +++ b/umap2/fuzz/fuzz_engine.py @@ -20,10 +20,10 @@ from kitty.model import GraphModel from kitty.model import Template, Meta, String, UInt32 -from . templates import audio, cdc, enum, generic, hid, hub, mass_storage -from templates import smart_card +from .templates import audio, cdc, enum, generic, hid, hub, mass_storage +from .templates import smart_card -from controller import UmapController +from .controller import UmapController def enumerate_templates(module): diff --git a/umap2/fuzz/templates/audio.py b/umap2/fuzz/templates/audio.py index 1e27346..159bd6e 100644 --- a/umap2/fuzz/templates/audio.py +++ b/umap2/fuzz/templates/audio.py @@ -6,8 +6,8 @@ from kitty.model import Template, Repeat, List, Container, ForEach, OneOf from kitty.model import ElementCount, SizeInBytes from kitty.model import ENC_INT_LE -from . hid import GenerateHidReport -from . generic import Descriptor, SizedPt, DynamicInt, SubDescriptor +from .hid import GenerateHidReport +from .generic import Descriptor, SizedPt, DynamicInt, SubDescriptor class _AC_DescriptorSubTypes: # AC Interface Descriptor Subtype diff --git a/umap2/fuzz/templates/cdc.py b/umap2/fuzz/templates/cdc.py index c1d61ef..abc244e 100644 --- a/umap2/fuzz/templates/cdc.py +++ b/umap2/fuzz/templates/cdc.py @@ -8,7 +8,7 @@ from kitty.model import Template, Repeat, List, Container, ForEach, OneOf from kitty.model import ElementCount from kitty.model import MutableField -from . generic import SubDescriptor +from .generic import SubDescriptor cdc_control_interface_descriptor = Template( diff --git a/umap2/fuzz/templates/enum.py b/umap2/fuzz/templates/enum.py index 09dddd1..d214887 100644 --- a/umap2/fuzz/templates/enum.py +++ b/umap2/fuzz/templates/enum.py @@ -9,8 +9,20 @@ # dynamic fields from kitty.model import ElementCount, SizeInBytes # encoders -from kitty.model import StrEncodeEncoder, ENC_INT_LE -from . generic import Descriptor, SubDescriptor +from kitty.model import ENC_INT_LE, StrEncoder +from .generic import Descriptor, SubDescriptor + + +# Custom encoder for UTF-16-LE that works with bytes +class Utf16LeEncoder(StrEncoder): + ''' + Encode bytes to UTF-16-LE + ''' + def encode(self, value): + # value is already bytes, decode to string first, then encode to UTF-16-LE + if isinstance(value, bytes): + value = value.decode('utf-8', errors='ignore') + return super().encode(value.encode('utf_16_le')) # Device descriptor @@ -125,7 +137,7 @@ name='string_descriptor', descriptor_type=DescriptorType.string, fields=[ - String(name='bString', value='hello_kitty', encoder=StrEncodeEncoder('utf_16_le'), max_size=254 / 2) + String(name='bString', value='hello_kitty', encoder=Utf16LeEncoder(), max_size=254 / 2) ]) diff --git a/umap2/fuzz/templates/hid.py b/umap2/fuzz/templates/hid.py index b3eaf9d..b6f69fe 100644 --- a/umap2/fuzz/templates/hid.py +++ b/umap2/fuzz/templates/hid.py @@ -24,7 +24,7 @@ from kitty.model import ENC_INT_LE from kitty.core import KittyException from random import Random -from . generic import DynamicInt, Descriptor +from .generic import DynamicInt, Descriptor opcodes = { diff --git a/umap2/fuzz/templates/hub.py b/umap2/fuzz/templates/hub.py index 59655df..b3c03e5 100644 --- a/umap2/fuzz/templates/hub.py +++ b/umap2/fuzz/templates/hub.py @@ -4,7 +4,7 @@ from umap2.core.usb import DescriptorType from kitty.model import UInt8, LE16, RandomBytes from kitty.model import Size -from generic import Descriptor +from .generic import Descriptor # hub_descriptor diff --git a/umap2/fuzz/templates/mass_storage.py b/umap2/fuzz/templates/mass_storage.py index 5e474ee..77c853d 100644 --- a/umap2/fuzz/templates/mass_storage.py +++ b/umap2/fuzz/templates/mass_storage.py @@ -4,7 +4,7 @@ from kitty.model import Template, Pad from kitty.model import String, UInt8, BE32, BE16, RandomBytes from kitty.model import SizeInBytes -from generic import SizedPt +from .generic import SizedPt # TODO: scsi_test_unit_ready_response (nothing to fuzz! no data returned, besides the csw) # TODO: scsi_send_diagnostic_response diff --git a/umap2/fuzz/templates/smart_card.py b/umap2/fuzz/templates/smart_card.py index 78885c3..a38433a 100644 --- a/umap2/fuzz/templates/smart_card.py +++ b/umap2/fuzz/templates/smart_card.py @@ -6,7 +6,7 @@ from kitty.model import SizeInBytes from kitty.model import ENC_INT_LE from kitty.model import Template, Container -from generic import DynamicInt +from .generic import DynamicInt class R2PParameters(Template): From 64ca809274644cd75419b3460eab07ed38e61e21 Mon Sep 17 00:00:00 2001 From: Marius Karstedt Date: Tue, 24 Jun 2025 12:38:11 +0200 Subject: [PATCH 08/10] Added cynthion phy to all cli help messages --- umap2/apps/detect_os.py | 1 + umap2/apps/emulate.py | 1 + umap2/apps/fuzz.py | 1 + umap2/apps/makestages.py | 1 + umap2/apps/vsscan.py | 1 + 5 files changed, 5 insertions(+) diff --git a/umap2/apps/detect_os.py b/umap2/apps/detect_os.py index b49cdfa..fae5579 100644 --- a/umap2/apps/detect_os.py +++ b/umap2/apps/detect_os.py @@ -13,6 +13,7 @@ Physical layer: fd: use facedancer connected to given serial port gadgetfs use gadgetfs (requires mounting of gadgetfs beforehand) + cynthion use cynthion Example: umap2detect -P fd:/dev/ttyUSB0 -q diff --git a/umap2/apps/emulate.py b/umap2/apps/emulate.py index afcf5f0..06aee39 100644 --- a/umap2/apps/emulate.py +++ b/umap2/apps/emulate.py @@ -16,6 +16,7 @@ Physical layer: fd: use facedancer connected to given serial port gadgetfs use gadgetfs (requires mounting of gadgetfs beforehand) + cynthion use cynthion Examples: emulate keyboard: diff --git a/umap2/apps/fuzz.py b/umap2/apps/fuzz.py index 0574987..255b7e9 100644 --- a/umap2/apps/fuzz.py +++ b/umap2/apps/fuzz.py @@ -18,6 +18,7 @@ Physical layer: fd: use facedancer connected to given serial port gadgetfs use gadgetfs (requires mounting of gadgetfs beforehand) + cynthion use cynthion Examples: emulate disk-on-key: diff --git a/umap2/apps/makestages.py b/umap2/apps/makestages.py index 0645b55..1a3be98 100644 --- a/umap2/apps/makestages.py +++ b/umap2/apps/makestages.py @@ -17,6 +17,7 @@ Physical layer: fd: use facedancer connected to given serial port gadgetfs use gadgetfs (requires mounting of gadgetfs beforehand) + cynthion use cynthion ''' import time from umap2.apps.emulate import Umap2EmulationApp diff --git a/umap2/apps/vsscan.py b/umap2/apps/vsscan.py index a25664e..8c2ee52 100644 --- a/umap2/apps/vsscan.py +++ b/umap2/apps/vsscan.py @@ -20,6 +20,7 @@ Physical layer: fd: use facedancer connected to given serial port gadgetfs use gadgetfs (requires mounting of gadgetfs beforehand) + cynthion use cynthion DB_FILE: a python file with a db member which is a list of DBEntry() objects. From cddc09e0044d3b56d9e1407542d3b9f7b5d2b22f Mon Sep 17 00:00:00 2001 From: Marius Karstedt Date: Wed, 23 Jul 2025 10:55:38 +0200 Subject: [PATCH 09/10] fixed import error by moving import statement into try-catch statement and promt the user to follow the documentation. --- README.rst | 4 ++-- umap2/apps/base.py | 12 ++++++++---- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/README.rst b/README.rst index 19f14d2..c5881dd 100644 --- a/README.rst +++ b/README.rst @@ -76,8 +76,8 @@ Hardware is the recommended hardware for Umap2. Umap2 was developed based on it, and you'll get the most support with it. - `Cynthion `_ is supported. - To get started, read the `Using Cynthion with Facedancer `_ - guide to install all the required dependencies. + To get started, read the `Getting started with Cynthion `_ and `Using Cynthion with Facedancer `_ + guides to install all the required dependencies. - `Raspdancer ` is supported on RPi - **GadgetFS** is partially supported. This support is very experimental (even more than the rest of Umap2) diff --git a/umap2/apps/base.py b/umap2/apps/base.py index e9c8da7..eb3dfd0 100644 --- a/umap2/apps/base.py +++ b/umap2/apps/base.py @@ -10,7 +10,7 @@ from umap2.phy.facedancer.max342x_phy import Max342xPhy from umap2.phy.gadgetfs.gadgetfs_phy import GadgetFsPhy -from umap2.phy.cynthion.cynthion_phy import CynthionPhy +# CynthionPhy import moved to load_phy method with error handling from umap2.utils.ulogger import set_default_handler_level @@ -81,9 +81,13 @@ def load_phy(self, phy_string): phy = GadgetFsPhy(self) return phy elif phy_type == 'cynthion': - self.logger.debug('Physical interface is Cynthion') - phy = CynthionPhy(self) - return phy + try: + from umap2.phy.cynthion.cynthion_phy import CynthionPhy + self.logger.debug('Physical interface is Cynthion') + phy = CynthionPhy(self) + return phy + except ImportError: + raise Exception('Cynthion support requires additional dependencies. Please follow the guides linked in the README file.') raise Exception('Phy type not supported: %s' % phy_type) def load_device(self, dev_name, phy): From 62703e7125663d9b5991b8d254f3f7c0be565d1c Mon Sep 17 00:00:00 2001 From: Marius Karstedt Date: Tue, 2 Sep 2025 11:26:02 +0200 Subject: [PATCH 10/10] Migrated print statements from python2 to python3 --- data/vid_pid_db.py | 4 ++-- data/vid_pid_db_from_usb_ids.py | 4 ++-- umap2/utils/dev_generator.py | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/data/vid_pid_db.py b/data/vid_pid_db.py index de16d9a..e4a683c 100644 --- a/data/vid_pid_db.py +++ b/data/vid_pid_db.py @@ -6185,8 +6185,8 @@ history = set() for db_entry in db: if (db_entry.vid, db_entry.pid) in history: - print 'Duplicate found: 0x%04x, 0x%04x' % (db_entry.vid, db_entry.pid) + print('Duplicate found: 0x%04x, 0x%04x' % (db_entry.vid, db_entry.pid)) sys.exit(1) else: history.add((db_entry.vid, db_entry.pid)) - print 'DB OK!' + print('DB OK!') diff --git a/data/vid_pid_db_from_usb_ids.py b/data/vid_pid_db_from_usb_ids.py index 6f3b8db..421a2c2 100644 --- a/data/vid_pid_db_from_usb_ids.py +++ b/data/vid_pid_db_from_usb_ids.py @@ -15492,8 +15492,8 @@ history = set() for db_entry in db: if (db_entry.vid, db_entry.pid) in history: - print 'Duplicate found: 0x%04x, 0x%04x' % (db_entry.vid, db_entry.pid) + print('Duplicate found: 0x%04x, 0x%04x' % (db_entry.vid, db_entry.pid)) sys.exit(1) else: history.add((db_entry.vid, db_entry.pid)) - print 'DB OK!' + print('DB OK!') diff --git a/umap2/utils/dev_generator.py b/umap2/utils/dev_generator.py index 1d46c28..8b03596 100644 --- a/umap2/utils/dev_generator.py +++ b/umap2/utils/dev_generator.py @@ -41,7 +41,7 @@ def __init__(self, node_type): self.parent = None def get_by_type(self, req_type): - # print 'get_by_type: %02x (%02x)' % (req_type, self.node_type) + # print('get_by_type: %02x (%02x)' % (req_type, self.node_type)) if self.node_type == req_type: return self elif self.parent is None: @@ -58,7 +58,7 @@ def get_dep_by_type(self, def_node): return def_node def get_pre(self): - # print 'to_code(node_type: %02x)' % (self.node_type) + # print('to_code(node_type: %02x)' % (self.node_type)) pre_code = '' available_list_types = set([n.node_type for n in self.deps if isinstance(n, ListNode)]) known_list_types = set(t for t in self.list_names) @@ -134,7 +134,7 @@ def get_text(self): def parse_pfn(desc_type): def parser_wrapper(pfn): def wrapper(self, desc): - # print 'Parsing desc %02x: %s' % (desc_type, hexlify(desc)) + # print('Parsing desc %02x: %s' % (desc_type, hexlify(desc))) self.select_node(desc_type) node = DescriptorNode(desc_type) node.text = pfn(self, desc, node) @@ -353,7 +353,7 @@ def parse_config_descs(self, config_descs): self.parse_config_desc(desc_buff) def emit_output(self): - print self.root_node.to_code() + print(self.root_node.to_code()) def main():