diff --git a/README.rst b/README.rst
index 2cfe31f..c5881dd 100644
--- a/README.rst
+++ b/README.rst
@@ -75,6 +75,9 @@ Hardware
- `Facedancer `_
is the recommended hardware for Umap2.
Umap2 was developed based on it, and you'll get the most support with it.
+- `Cynthion `_ is supported.
+ To get started, read the `Getting started with Cynthion `_ and `Using Cynthion with Facedancer `_
+ guides to install all the required dependencies.
- `Raspdancer ` is supported on RPi
- **GadgetFS** is partially supported.
This support is very experimental (even more than the rest of Umap2)
diff --git a/data/vid_pid_db.py b/data/vid_pid_db.py
index de16d9a..e4a683c 100644
--- a/data/vid_pid_db.py
+++ b/data/vid_pid_db.py
@@ -6185,8 +6185,8 @@
history = set()
for db_entry in db:
if (db_entry.vid, db_entry.pid) in history:
- print 'Duplicate found: 0x%04x, 0x%04x' % (db_entry.vid, db_entry.pid)
+ print('Duplicate found: 0x%04x, 0x%04x' % (db_entry.vid, db_entry.pid))
sys.exit(1)
else:
history.add((db_entry.vid, db_entry.pid))
- print 'DB OK!'
+ print('DB OK!')
diff --git a/data/vid_pid_db_from_usb_ids.py b/data/vid_pid_db_from_usb_ids.py
index 6f3b8db..421a2c2 100644
--- a/data/vid_pid_db_from_usb_ids.py
+++ b/data/vid_pid_db_from_usb_ids.py
@@ -15492,8 +15492,8 @@
history = set()
for db_entry in db:
if (db_entry.vid, db_entry.pid) in history:
- print 'Duplicate found: 0x%04x, 0x%04x' % (db_entry.vid, db_entry.pid)
+ print('Duplicate found: 0x%04x, 0x%04x' % (db_entry.vid, db_entry.pid))
sys.exit(1)
else:
history.add((db_entry.vid, db_entry.pid))
- print 'DB OK!'
+ print('DB OK!')
diff --git a/umap2/apps/base.py b/umap2/apps/base.py
index 85c5712..eb3dfd0 100644
--- a/umap2/apps/base.py
+++ b/umap2/apps/base.py
@@ -10,6 +10,7 @@
from umap2.phy.facedancer.max342x_phy import Max342xPhy
from umap2.phy.gadgetfs.gadgetfs_phy import GadgetFsPhy
+# CynthionPhy import moved to load_phy method with error handling
from umap2.utils.ulogger import set_default_handler_level
@@ -79,6 +80,14 @@ def load_phy(self, phy_string):
self.logger.debug('Physical interface is GadgetFs')
phy = GadgetFsPhy(self)
return phy
+ elif phy_type == 'cynthion':
+ try:
+ from umap2.phy.cynthion.cynthion_phy import CynthionPhy
+ self.logger.debug('Physical interface is Cynthion')
+ phy = CynthionPhy(self)
+ return phy
+ except ImportError:
+ raise Exception('Cynthion support requires additional dependencies. Please follow the guides linked in the README file.')
raise Exception('Phy type not supported: %s' % phy_type)
def load_device(self, dev_name, phy):
diff --git a/umap2/apps/detect_os.py b/umap2/apps/detect_os.py
index b49cdfa..fae5579 100644
--- a/umap2/apps/detect_os.py
+++ b/umap2/apps/detect_os.py
@@ -13,6 +13,7 @@
Physical layer:
fd: use facedancer connected to given serial port
gadgetfs use gadgetfs (requires mounting of gadgetfs beforehand)
+ cynthion use cynthion
Example:
umap2detect -P fd:/dev/ttyUSB0 -q
diff --git a/umap2/apps/emulate.py b/umap2/apps/emulate.py
index afcf5f0..06aee39 100644
--- a/umap2/apps/emulate.py
+++ b/umap2/apps/emulate.py
@@ -16,6 +16,7 @@
Physical layer:
fd: use facedancer connected to given serial port
gadgetfs use gadgetfs (requires mounting of gadgetfs beforehand)
+ cynthion use cynthion
Examples:
emulate keyboard:
diff --git a/umap2/apps/fuzz.py b/umap2/apps/fuzz.py
index 0574987..255b7e9 100644
--- a/umap2/apps/fuzz.py
+++ b/umap2/apps/fuzz.py
@@ -18,6 +18,7 @@
Physical layer:
fd: use facedancer connected to given serial port
gadgetfs use gadgetfs (requires mounting of gadgetfs beforehand)
+ cynthion use cynthion
Examples:
emulate disk-on-key:
diff --git a/umap2/apps/list_classes.py b/umap2/apps/list_classes.py
index cb37d31..80ff896 100644
--- a/umap2/apps/list_classes.py
+++ b/umap2/apps/list_classes.py
@@ -17,8 +17,8 @@ def run(self):
ks = self.umap_classes
verbose = self.options.get('--verbose', False)
if verbose:
- print '%-20s %s' % ('Device', 'Description')
- print '-------------------- ----------------------------------------------------'
+ print('%-20s %s' % ('Device', 'Description'))
+ print('-------------------- ----------------------------------------------------')
for k in ks:
if verbose:
print('%-20s %s' % (k, self.umap_class_dict[k][1]))
diff --git a/umap2/apps/makestages.py b/umap2/apps/makestages.py
index 0645b55..1a3be98 100644
--- a/umap2/apps/makestages.py
+++ b/umap2/apps/makestages.py
@@ -17,6 +17,7 @@
Physical layer:
fd: use facedancer connected to given serial port
gadgetfs use gadgetfs (requires mounting of gadgetfs beforehand)
+ cynthion use cynthion
'''
import time
from umap2.apps.emulate import Umap2EmulationApp
diff --git a/umap2/apps/scan.py b/umap2/apps/scan.py
index 564256c..732056a 100644
--- a/umap2/apps/scan.py
+++ b/umap2/apps/scan.py
@@ -12,6 +12,7 @@
Physical layer:
fd: use facedancer connected to given serial port
gadgetfs use gadgetfs (requires mounting of gadgetfs beforehand)
+ cynthion use cynthion
Example:
umap2scan -P fd:/dev/ttyUSB0 -q
diff --git a/umap2/apps/vsscan.py b/umap2/apps/vsscan.py
index a25664e..8c2ee52 100644
--- a/umap2/apps/vsscan.py
+++ b/umap2/apps/vsscan.py
@@ -20,6 +20,7 @@
Physical layer:
fd: use facedancer connected to given serial port
gadgetfs use gadgetfs (requires mounting of gadgetfs beforehand)
+ cynthion use cynthion
DB_FILE:
a python file with a db member which is a list of DBEntry() objects.
diff --git a/umap2/dev/audio.py b/umap2/dev/audio.py
index 6897b83..f917c9f 100644
--- a/umap2/dev/audio.py
+++ b/umap2/dev/audio.py
@@ -42,23 +42,23 @@ def setup_local_handlers(self):
}
self._settings = {
# (val, index): [cur, min, max, res, (idle)]
- (0x0100, 0x0001): ['\x44\xac\x00', '\x44\xac\x00', '\x80\xbb\x00', '\x80\xbb\x00'],
+ (0x0100, 0x0001): [b'\x44\xac\x00', b'\x44\xac\x00', b'\x80\xbb\x00', b'\x80\xbb\x00'],
# (0x0100, 0x0002): ['\x44\xac\x00', '\x44\xac\x00', '\x80\xbb\x00', '\x80\xbb\x00'],
- (0x0100, 0x0082): ['\x44\xac\x00', '\x44\xac\x00', '\x80\xbb\x00', '\x80\xbb\x00'],
- (0x0100, 0x0900): ['\x00', '\x00', '\xff', '\x00'],
- (0x0100, 0x0a00): ['\x01', '\x00', '\xff', '\x00'],
- (0x0100, 0x0d00): ['\x01', '\x00', '\xff', '\x00'],
- (0x0101, 0x0f00): ['\x01', '\x00', '\xff', '\x00'],
- (0x0102, 0x0f00): ['\x01', '\x00', '\xff', '\x00'],
- (0x0200, 0x0a00): ['\x00\x00', '\x00\x00', '\x55\x00', '\x30\x00', '\x00\x00'],
- (0x0200, 0x0d00): ['\x80\x22', '\x00\x00', '\xd0\x00', '\x30\x00'],
- (0x0201, 0x0900): ['\x80\x22', '\x20\x00', '\xa0\x00', '\x30\x00'],
- (0x0201, 0x0f00): ['\x01', '\x00', '\xff', '\x00'],
- (0x0202, 0x0900): ['\xcf\x00', '\x00\x00', '\xcf\x00', '\x30\x00'],
- (0x0202, 0x0f00): ['\x01', '\x00', '\xff', '\x00'],
- (0x0301, 0x0f00): ['\x01', '\x00', '\xff', '\x00'],
- (0x0302, 0x0f00): ['\x00\x00', '\x00\x00', '\x00\x00', '\x00\x00'],
- (0x0700, 0x0a00): ['\x01', '\x00', '\xff', '\x00'],
+ (0x0100, 0x0082): [b'\x44\xac\x00', b'\x44\xac\x00', b'\x80\xbb\x00', b'\x80\xbb\x00'],
+ (0x0100, 0x0900): [b'\x00', b'\x00', b'\xff', b'\x00'],
+ (0x0100, 0x0a00): [b'\x01', b'\x00', b'\xff', b'\x00'],
+ (0x0100, 0x0d00): [b'\x01', b'\x00', b'\xff', b'\x00'],
+ (0x0101, 0x0f00): [b'\x01', b'\x00', b'\xff', b'\x00'],
+ (0x0102, 0x0f00): [b'\x01', b'\x00', b'\xff', b'\x00'],
+ (0x0200, 0x0a00): [b'\x00\x00', b'\x00\x00', b'\x55\x00', b'\x30\x00', b'\x00\x00'],
+ (0x0200, 0x0d00): [b'\x80\x22', b'\x00\x00', b'\xd0\x00', b'\x30\x00'],
+ (0x0201, 0x0900): [b'\x80\x22', b'\x20\x00', b'\xa0\x00', b'\x30\x00'],
+ (0x0201, 0x0f00): [b'\x01', b'\x00', b'\xff', b'\x00'],
+ (0x0202, 0x0900): [b'\xcf\x00', b'\x00\x00', b'\xcf\x00', b'\x30\x00'],
+ (0x0202, 0x0f00): [b'\x01', b'\x00', b'\xff', b'\x00'],
+ (0x0301, 0x0f00): [b'\x01', b'\x00', b'\xff', b'\x00'],
+ (0x0302, 0x0f00): [b'\x00\x00', b'\x00\x00', b'\x00\x00', b'\x00\x00'],
+ (0x0700, 0x0a00): [b'\x01', b'\x00', b'\xff', b'\x00'],
}
self._cur = b'\x44\xac\x00'
@@ -205,29 +205,29 @@ def __init__(self, app, phy, vid=0x0d8c, pid=0x000c, rev=0x0001, *args, **kwargs
app=app, phy=phy, iface_num=0, iface_alt=0, iface_str_idx=0,
cs_ifaces=[
# Class specific AC interface: header (4.3.2)
- USBCSInterface('ACHeader', app, phy, '\x01\x00\x01\x64\x00\x02\x01\x02'),
+ USBCSInterface('ACHeader', app, phy, b'\x01\x00\x01\x64\x00\x02\x01\x02'),
# Class specific AC interface: input terminal (Table 4.3.2.1)
- USBCSInterface('ACInputTerminal0', app, phy, '\x02\x01\x01\x01\x00\x02\x03\x00\x00\x00'),
- USBCSInterface('ACInputTerminal1', app, phy, '\x02\x02\x01\x02\x00\x01\x01\x00\x00\x00'),
+ USBCSInterface('ACInputTerminal0', app, phy, b'\x02\x01\x01\x01\x00\x02\x03\x00\x00\x00'),
+ USBCSInterface('ACInputTerminal1', app, phy, b'\x02\x02\x01\x02\x00\x01\x01\x00\x00\x00'),
# Class specific AC interface: output terminal (Table 4.3.2.2)
- USBCSInterface('ACOutputTerminal0', app, phy, '\x03\x06\x01\x03\x00\x09\x00'),
- USBCSInterface('ACOutputTerminal1', app, phy, '\x03\x07\x01\x01\x00\x08\x00'),
+ USBCSInterface('ACOutputTerminal0', app, phy, b'\x03\x06\x01\x03\x00\x09\x00'),
+ USBCSInterface('ACOutputTerminal1', app, phy, b'\x03\x07\x01\x01\x00\x08\x00'),
# Class specific AC interface: selector unit (Table 4.3.2.4)
- USBCSInterface('ACSelectorUnit', app, phy, '\x05\x08\x01\x0a\x00'),
+ USBCSInterface('ACSelectorUnit', app, phy, b'\x05\x08\x01\x0a\x00'),
# Class specific AC interface: feature unit (Table 4.3.2.5)
- USBCSInterface('ACFeatureUnit0', app, phy, '\x06\x09\x0f\x01\x01\x02\x02\x00'),
- USBCSInterface('ACFeatureUnit1', app, phy, '\x06\x0a\x02\x01\x43\x00\x00'),
- USBCSInterface('ACFeatureUnit2', app, phy, '\x06\x0d\x02\x01\x03\x00\x00'),
+ USBCSInterface('ACFeatureUnit0', app, phy, b'\x06\x09\x0f\x01\x01\x02\x02\x00'),
+ USBCSInterface('ACFeatureUnit1', app, phy, b'\x06\x0a\x02\x01\x43\x00\x00'),
+ USBCSInterface('ACFeatureUnit2', app, phy, b'\x06\x0d\x02\x01\x03\x00\x00'),
# Class specific AC interface: mixer unit (Table 4.3.2.3)
- USBCSInterface('ACMixerUnit', app, phy, '\x04\x0f\x02\x01\x0d\x02\x03\x00\x00\x00\x00'),
+ USBCSInterface('ACMixerUnit', app, phy, b'\x04\x0f\x02\x01\x0d\x02\x03\x00\x00\x00\x00'),
],
usb_class=usb_class
),
USBAudioStreamingInterface(
app=app, phy=phy, iface_num=1, iface_alt=0, iface_str_idx=0,
cs_ifaces=[
- USBCSInterface('ASGeneral', app, phy, '\x01\x01\x01\x01\x00'),
- USBCSInterface('ASFormatType', app, phy, '\x02\x01\x02\x02\x10\x02\x44\xac\x00\x44\xac\x00'),
+ USBCSInterface('ASGeneral', app, phy, b'\x01\x01\x01\x01\x00'),
+ USBCSInterface('ASFormatType', app, phy, b'\x02\x01\x02\x02\x10\x02\x44\xac\x00\x44\xac\x00'),
],
endpoints=[
USBEndpoint(
@@ -240,7 +240,7 @@ def __init__(self, app, phy, vid=0x0d8c, pid=0x000c, rev=0x0001, *args, **kwargs
interval=1,
handler=audio_streaming.data_available,
cs_endpoints=[
- USBCSEndpoint('ASEndpoint', app, phy, '\x01\x01\x01\x01\x00')
+ USBCSEndpoint('ASEndpoint', app, phy, b'\x01\x01\x01\x01\x00')
],
usb_class=usb_class,
)
@@ -250,8 +250,8 @@ def __init__(self, app, phy, vid=0x0d8c, pid=0x000c, rev=0x0001, *args, **kwargs
USBAudioStreamingInterface(
app=app, phy=phy, iface_num=2, iface_alt=0, iface_str_idx=0,
cs_ifaces=[
- USBCSInterface('ASGeneral', app, phy, '\x01\x07\x01\x01\x00'),
- USBCSInterface('ASFormatType', app, phy, '\x02\x01\x01\x02\x10\x02\x44\xac\x00\x44\xac\x00'),
+ USBCSInterface('ASGeneral', app, phy, b'\x01\x07\x01\x01\x00'),
+ USBCSInterface('ASFormatType', app, phy, b'\x02\x01\x01\x02\x10\x02\x44\xac\x00\x44\xac\x00'),
],
endpoints=[
USBEndpoint(
@@ -264,7 +264,7 @@ def __init__(self, app, phy, vid=0x0d8c, pid=0x000c, rev=0x0001, *args, **kwargs
interval=1,
handler=audio_streaming.buffer_available,
cs_endpoints=[
- USBCSEndpoint('ASEndpoint', app, phy, '\x01\x01\x00\x00\x00')
+ USBCSEndpoint('ASEndpoint', app, phy, b'\x01\x01\x00\x00\x00')
],
usb_class=usb_class,
)
diff --git a/umap2/dev/cdc.py b/umap2/dev/cdc.py
index 16235aa..9fae8fc 100644
--- a/umap2/dev/cdc.py
+++ b/umap2/dev/cdc.py
@@ -191,7 +191,7 @@ def handle_getter(self, req):
key = (param_id, req.value, req.index)
if key in self.params:
return self.params[key]
- return '\x00' * req.length
+ return b'\x00' * req.length
def handle_clear(self, req):
param_id = self.get_param_id_from_request(req.request)
diff --git a/umap2/dev/cdc_acm.py b/umap2/dev/cdc_acm.py
index ae3f063..712d747 100644
--- a/umap2/dev/cdc_acm.py
+++ b/umap2/dev/cdc_acm.py
@@ -29,7 +29,7 @@ def __init__(self, app, phy, vid=0x2548, pid=0x1001, rev=0x0010, cs_interfaces=N
cdc_cls = self.get_default_class(app, phy)
cs_interfaces = [
# Header Functional Descriptor
- FD(app, phy, FD.Header, '\x01\x01'),
+ FD(app, phy, FD.Header, b'\x01\x01'),
# Call Management Functional Descriptor
FD(app, phy, FD.CM, struct.pack('BB', bmCapabilities, USBCDCDevice.bDataInterface)),
FD(app, phy, FD.ACM, struct.pack('B', bmCapabilities)),
diff --git a/umap2/dev/cdc_dl.py b/umap2/dev/cdc_dl.py
index 5b6af81..c823570 100644
--- a/umap2/dev/cdc_dl.py
+++ b/umap2/dev/cdc_dl.py
@@ -28,7 +28,7 @@ def __init__(self, app, phy, vid=0x2548, pid=0x1001, rev=0x0010, cs_interfaces=N
cdc_cls = self.get_default_class(app, phy)
cs_interfaces = [
# Header Functional Descriptor
- FD(app, phy, FD.Header, '\x01\x01'),
+ FD(app, phy, FD.Header, b'\x01\x01'),
# Call Management Functional Descriptor
FD(app, phy, FD.CM, struct.pack('BB', bmCapabilities, USBCDCDevice.bDataInterface)),
FD(app, phy, FD.DLM, struct.pack('B', bmCapabilities)),
diff --git a/umap2/dev/hub.py b/umap2/dev/hub.py
index 5397dbd..47dc4c7 100644
--- a/umap2/dev/hub.py
+++ b/umap2/dev/hub.py
@@ -54,8 +54,8 @@ def handle_get_descriptor(self, req):
num_bytes = self.num_ports // 7
if self.num_ports % 7 != 0:
num_bytes += 1
- d += '\x00' * num_bytes
- d += '\xff' * num_bytes
+ d += b'\x00' * num_bytes
+ d += b'\xff' * num_bytes
d = struct.pack('B', len(d) + 1) + d
return d
diff --git a/umap2/dev/keyboard.py b/umap2/dev/keyboard.py
index a9834bf..389f257 100644
--- a/umap2/dev/keyboard.py
+++ b/umap2/dev/keyboard.py
@@ -181,7 +181,7 @@ def handle_buffer_available(self):
if self.keys:
letter = self.keys.pop(0)
else:
- letter = '\x00'
+ letter = b'\x00'
self.type_letter(letter)
def type_letter(self, letter, modifiers=0):
diff --git a/umap2/dev/mass_storage.py b/umap2/dev/mass_storage.py
index 26b09c1..cf04eec 100644
--- a/umap2/dev/mass_storage.py
+++ b/umap2/dev/mass_storage.py
@@ -129,7 +129,7 @@ def put_sector_data(self, address, data):
block_end = (address + 1) * self.block_size # slices are NON-inclusive
pad_len = (self.block_size - (len(data) % self.block_size)) % self.block_size
- data += '\x00' * pad_len
+ data += b'\x00' * pad_len
self.image[block_start:block_end] = data[:self.block_size]
self.image.flush()
@@ -391,7 +391,7 @@ def handle_scsi_mode_sense(self, mode_type, page, subpage, alloc_len, ctrl, with
if report is None:
# default behaviour, taken from previous implementation
# this should probably be changed ...
- report = '\x07\x00\x00\x00\x00\x00\x00\x00'
+ report = b'\x07\x00\x00\x00\x00\x00\x00\x00'
if with_header:
self.debug('SCSI mode sense (%d) - adding header' % (mode_type))
report = self._report_header(mode_type, len(report)) + report
diff --git a/umap2/dev/smartcard.py b/umap2/dev/smartcard.py
index 1704aec..b3c1214 100644
--- a/umap2/dev/smartcard.py
+++ b/umap2/dev/smartcard.py
@@ -38,7 +38,7 @@ def setup_local_handlers(self):
@mutable('get_clock_frequencies_response')
def handle_get_clock_frequencies(self, req):
- response = ''
+ response = b''
for frequency in self.interface.clock_frequencies:
response += struct.pack('= len(report_str):
raise KittyException('Not enough bytes in hid report for last opcode')
@@ -134,7 +134,7 @@ def GenerateHidReport(report_str, name=None):
fields.append(UInt8(opcode, name=cur_name))
else:
args = report_str[index:index + num_args]
- value = sum(ord(args[i]) << (i * 8) for i in range(len(args))) # little endian...
+ value = sum(args[i] << (i * 8) for i in range(len(args))) # little endian...
fields.append(Container(
name=cur_name,
fields=[
@@ -179,6 +179,6 @@ def GenerateHidReport(report_str, name=None):
hid_report_descriptor = Template(
name='hid_report_descriptor',
fields=GenerateHidReport(
- '05010906A101050719E029E7150025017501950881029501750881011900296515002565750895018100C0'.decode('hex')
+ bytes.fromhex('05010906A101050719E029E7150025017501950881029501750881011900296515002565750895018100C0')
)
)
diff --git a/umap2/fuzz/templates/hub.py b/umap2/fuzz/templates/hub.py
index fb6f30c..b3c03e5 100644
--- a/umap2/fuzz/templates/hub.py
+++ b/umap2/fuzz/templates/hub.py
@@ -4,7 +4,7 @@
from umap2.core.usb import DescriptorType
from kitty.model import UInt8, LE16, RandomBytes
from kitty.model import Size
-from generic import Descriptor
+from .generic import Descriptor
# hub_descriptor
@@ -21,8 +21,8 @@
num_bytes = self.num_ports // 7
if self.num_ports % 7 != 0:
num_bytes += 1
- d += '\x00' * num_bytes
- d += '\xff' * num_bytes
+ d += b'\x00' * num_bytes
+ d += b'\xff' * num_bytes
d = struct.pack('B', len(d) + 1) + d
return d
'''
diff --git a/umap2/fuzz/templates/mass_storage.py b/umap2/fuzz/templates/mass_storage.py
index 5e474ee..77c853d 100644
--- a/umap2/fuzz/templates/mass_storage.py
+++ b/umap2/fuzz/templates/mass_storage.py
@@ -4,7 +4,7 @@
from kitty.model import Template, Pad
from kitty.model import String, UInt8, BE32, BE16, RandomBytes
from kitty.model import SizeInBytes
-from generic import SizedPt
+from .generic import SizedPt
# TODO: scsi_test_unit_ready_response (nothing to fuzz! no data returned, besides the csw)
# TODO: scsi_send_diagnostic_response
diff --git a/umap2/fuzz/templates/smart_card.py b/umap2/fuzz/templates/smart_card.py
index 8223a3f..a38433a 100644
--- a/umap2/fuzz/templates/smart_card.py
+++ b/umap2/fuzz/templates/smart_card.py
@@ -6,7 +6,7 @@
from kitty.model import SizeInBytes
from kitty.model import ENC_INT_LE
from kitty.model import Template, Container
-from generic import DynamicInt
+from .generic import DynamicInt
class R2PParameters(Template):
@@ -30,7 +30,7 @@ def __init__(self, name, status, error, proto, ab_data, fuzzable=True):
status=0x00,
error=0x80,
proto=0,
- ab_data=RandomBytes(name='data', value='\x11\x00\x00\x0a\x00', min_length=0, max_length=150),
+ ab_data=RandomBytes(name='data', value=b'\x11\x00\x00\x0a\x00', min_length=0, max_length=150),
)
@@ -39,7 +39,7 @@ def __init__(self, name, status, error, proto, ab_data, fuzzable=True):
status=0x00,
error=0x80,
proto=0,
- ab_data=RandomBytes(name='data', value='\x11\x00\x00\x0a\x00', min_length=0, max_length=150),
+ ab_data=RandomBytes(name='data', value=b'\x11\x00\x00\x0a\x00', min_length=0, max_length=150),
)
smartcard_SetParameters_response = R2PParameters(
@@ -47,7 +47,7 @@ def __init__(self, name, status, error, proto, ab_data, fuzzable=True):
status=0x00,
error=0x80,
proto=0,
- ab_data=RandomBytes(name='data', value='\x11\x00\x00\x0a\x00', min_length=0, max_length=150),
+ ab_data=RandomBytes(name='data', value=b'\x11\x00\x00\x0a\x00', min_length=0, max_length=150),
)
@@ -71,7 +71,7 @@ def __init__(self, name, status, error, chain_param, ab_data, fuzzable=True):
status=0x00,
error=0x80,
chain_param=0x00,
- ab_data=RandomBytes(name='data', value='\x3b\x6e\x00\x00\x80\x31\x80\x66\xb0\x84\x12\x01\x6e\x01\x83\x00\x90\x00', min_length=0, max_length=150),
+ ab_data=RandomBytes(name='data', value=b'\x3b\x6e\x00\x00\x80\x31\x80\x66\xb0\x84\x12\x01\x6e\x01\x83\x00\x90\x00', min_length=0, max_length=150),
)
smartcard_XfrBlock_response = R2PDataBlock(
@@ -79,7 +79,7 @@ def __init__(self, name, status, error, chain_param, ab_data, fuzzable=True):
status=0x00,
error=0x80,
chain_param=0x00,
- ab_data=RandomBytes(name='data', value='\x6a\x82', min_length=0, max_length=150),
+ ab_data=RandomBytes(name='data', value=b'\x6a\x82', min_length=0, max_length=150),
)
diff --git a/umap2/phy/cynthion/__init__.py b/umap2/phy/cynthion/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/umap2/phy/cynthion/cynthion_phy.py b/umap2/phy/cynthion/cynthion_phy.py
new file mode 100644
index 0000000..d5d0c30
--- /dev/null
+++ b/umap2/phy/cynthion/cynthion_phy.py
@@ -0,0 +1,600 @@
+# This file is heavily based on the Moondancer class from https://github.com/greatscottgadgets/facedancer/blob/main/facedancer/backends/moondancer.py
+# This class reimplements the MoondancerApp class as CynthionPhy.
+# The MoondancerApp class uses internal Facedancer classes like facedancer.device.USBDevice.
+# The CynthionPhy implementation adds support for the internal umap2 classes like umap2.core.usb_device.USBDevice but still uses some of the Facedancer classes to reduce the amount of copyed code.
+
+# The original code is licensed under the BSD-3-Clause license:
+# Copyright (c) 2019 Katherine J. Temkin
+# Copyright (c) 2018 Dominic Spill
+# Copyright (c) 2018 Travis Goodspeed
+#
+# Redistribution and use in source and binary forms, with or without modification,
+# are permitted provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice, this
+# list of conditions and the following disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation and/or
+# other materials provided with the distribution.
+#
+# 3. Neither the name of the copyright holder nor the names of its contributors
+# may be used to endorse or promote products derived from this software without
+# specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
+# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+from umap2.phy.iphy import PhyInterface
+from umap2.core.usb_device import USBDevice, USBDeviceRequest
+
+from typing import List, Tuple
+from cynthion import Cynthion
+from cynthion.boards.cynthion_moondancer import CynthionMoondancer
+from pygreat.classes.core import CoreAPI
+
+from facedancer.types import USBDirection, DeviceSpeed
+from facedancer.backends.moondancer import InterruptEvent
+from facedancer.request import USBControlRequest
+
+class CynthionPhy(PhyInterface):
+
+ # Number of supported USB endpoints.
+ SUPPORTED_ENDPOINTS: int = 16
+
+ device_speed : DeviceSpeed = None
+ device : CynthionMoondancer = None
+ api: CoreAPI = None
+
+ def __init__(self, app):
+ super(CynthionPhy, self).__init__(app, 'CynthionPhy')
+
+ if not self.appropriate_for_environment('cynthion'):
+ raise Exception('CynthionPhy is not supported on this platform')
+
+ self.device = Cynthion()
+
+ # CynthionMoondancer is a subclass of CynthionBoard which is a subclass of GreatBoard
+ self.api = self.device.apis.moondancer
+
+ # Initialize a dictionary that will store the last setup
+ # whether each endpoint is currently stalled.
+ self.endpoint_stalled = {}
+ for i in range(self.SUPPORTED_ENDPOINTS):
+ self.endpoint_stalled[i] = False
+
+ # Assume a max packet size of 64 until configured otherwise.
+ self.max_packet_size_ep0 = 64
+
+ # Start off by assuming we're not waiting for an OUT control transfer's
+ # data stage. # See handle_setup_complete_on_endpoint for details.
+ self.pending_control_request = None
+
+ # Store a reference to the device's active configuration,
+ # which we'll use to know which endpoints we'll need to check
+ # for data transfer readiness.
+ self.configuration = None
+
+ # Maintain a list of configured endpoints with form: (address, max_packet_size, USBTransferType)
+ self.configured_endpoints = dict()
+
+ def connect(self, usb_device: USBDevice, device_speed: DeviceSpeed=DeviceSpeed.FULL):
+ '''
+ Connect a USB device to the Cynthion hardware
+
+ :param usb_device: USB device instance to connect
+ '''
+ super(CynthionPhy, self).connect(usb_device)
+
+ # Quirks are not provided anywhere by umap2
+ quirks = 0
+
+ if self.device_speed != None:
+ device_speed = self.device_speed
+
+ if device_speed not in [DeviceSpeed.FULL, DeviceSpeed.HIGH]:
+ self.warning(f"Moondancer only supports USB Full and High Speed. Ignoring requested speed: {device_speed.name}")
+
+ self.debug(f"moondancer.connect(max_packet_size_ep0:{self.max_packet_size_ep0}, device_speed:{device_speed}, quirks:{quirks})")
+
+ # Check if the USBDevice has the max_packet_size_ep0 set
+ if usb_device.max_packet_size_ep0 is None:
+ raise Exception('max_packet_size_ep0 is not set')
+
+ self.api.connect(usb_device.max_packet_size_ep0, device_speed, quirks)
+
+ device_name = f"{type(self.connected_device).__module__}.{type(self.connected_device).__qualname__}"
+
+ self.info(f"Connected {device_speed.name} speed device '{device_name}' to target host.")
+
+ def disconnect(self):
+ '''
+ Disconnect the device from the Cynthion hardware
+ '''
+ self.info("Disconnecting from target host.")
+
+ self.device.comms.release_exclusive_access()
+ self.api.disconnect()
+ return super(CynthionPhy, self).disconnect()
+
+ def send_on_endpoint(self, ep_num: int, data: bytes):
+ '''
+ Send data on a specific endpoint
+
+ :param ep_num: number of endpoint
+ :param data: data to send
+ '''
+ self._send_on_endpoint(ep_num, data)
+
+ def stall_ep0(self):
+ '''
+ Stalls control endpoint (0)
+ '''
+ self._stall_endpoint(0)
+
+ def ack_status_stage(self):
+ '''
+ Acknowledge status stage
+ '''
+ self._ack_status_stage()
+
+ def run(self):
+ self.service_irqs()
+
+ def service_irqs(self):
+ '''
+ Handle USB requests
+ '''
+ self.stop = False
+ # Constantly service any events that need to be performed.
+ while not self.stop:
+
+ self._service_irqs()
+
+ if self.app.should_stop_phy():
+ self.stop = True
+ break
+
+ # Hooks nessesary for umap2
+ def pre_handle_request_hook(self, buf):
+ req = USBDeviceRequest(buf)
+
+ if req.request == 9:
+ pass
+
+ def post_handle_request_hook(self, buf):
+ req = USBDeviceRequest(buf)
+
+ if req.request == 5:# handle_set_address_request
+ # Not sure if the api needs the address, but MoondancerApp does it this way.
+ self._set_address(req.value)
+
+ elif req.request == 9:#handle_set_configuration_request
+ # Runs a version of the MoondancerApp.configured() method.
+
+
+ # If we need to issue a configuration command, issue one.
+ # (If there are no endpoints other than control, this command will be
+ # empty, and we can skip this.)
+ endpoint_triplets = []
+
+ for nr, endpoint in self.connected_device.endpoints.items():
+ self.debug(f"Configuring endpoint: {endpoint}.")
+ triple = (endpoint.address, endpoint.max_packet_size, endpoint.transfer_type,)
+ endpoint_triplets.append(triple)
+
+ if len(endpoint_triplets):
+ self.api.configure_endpoints(*endpoint_triplets)
+ for triplet in endpoint_triplets:
+ self.configured_endpoints[triplet[0]] = triplet
+
+ # If we've just set up endpoints, check to see if any of them
+ # have NAKs waiting.
+ nak_status = self.api.get_nak_status()
+ self._handle_ep_in_nak_status(nak_status)
+
+ self.info("Target host configuration complete.")
+
+
+ # Support for USBControlRequest class from MoondancerApp to answer requests from the host
+ def control_send(self, endpoint_number: int, in_request: USBControlRequest, data: bytes, *, blocking: bool = False):
+ """ Queues sending data on the provided control endpoint in
+ response to a IN control request.
+
+ Args:
+ endpoint_number : The endpoint number to send data upon.
+ in_request : The control request being responded to.
+ data : The data to send.
+ blocking : If provided and true, this function will block
+ until the backend indicates the send is complete.
+
+ """
+ self._send_on_control_endpoint(endpoint_number, in_request, data, blocking=blocking)
+
+
+ # Additional slightly modified methods from MoondancerApp
+ def _set_address(self, address: int, defer: bool = False):
+ """ Updates the device's knowledge of its own address.
+
+ Args:
+ address : The address to apply.
+ defer : If true, the address change should be deferred
+ until the next time a control request ends. Should
+ be set if we're changing the address before we ack
+ the relevant transaction.
+ """
+ self.address = address
+ self.debug(f"moondancer.set_address({address}, {defer})")
+
+ self.api.set_address(address, 1 if defer else 0)
+
+ def _stall_endpoint(self, endpoint_number:int, direction: USBDirection=USBDirection.OUT):
+ """
+ Stalls the provided endpoint, as defined in the USB spec.
+
+ Args:
+ endpoint_number : The number of the endpoint to be stalled.
+ """
+
+ endpoint_address = (endpoint_number | 0x80) if direction else endpoint_number
+ self.debug(f"Stalling EP{endpoint_number} {USBDirection(direction).name} (0x{endpoint_address:x})")
+
+ # Mark endpoint number as stalled.
+ self.endpoint_stalled[endpoint_number] = True
+
+ # Stall endpoint address.
+ if direction:
+ self.api.stall_endpoint_in(endpoint_number)
+ self.debug(f" moondancer.api.stall_endpoint_in({endpoint_number})")
+ else:
+ self.api.stall_endpoint_out(endpoint_number)
+ self.debug(f" moondancer.api.stall_endpoint_out({endpoint_number})")
+
+ def _ack_status_stage(self, direction: USBDirection=USBDirection.OUT, endpoint_number:int =0, blocking: bool=False):
+ """
+ Handles the status stage of a correctly completed control request,
+ by priming the appropriate endpoint to handle the status phase.
+
+ Args:
+ direction : Determines if we're ACK'ing an IN or OUT vendor request.
+ (This should match the direction of the DATA stage.)
+ endpoint_number : The endpoint number on which the control request
+ occurred.
+ blocking : True if we should wait for the ACK to be fully issued
+ before returning.
+ """
+
+ self.debug(f"moondancer.ack_status_stage({direction.name}, {endpoint_number}, {blocking})")
+
+ if direction == USBDirection.OUT: # HOST_TO_DEVICE
+ # If this was an OUT request, we'll prime the output buffer to
+ # respond with the ZLP expected during the status stage.
+ self.api.write_endpoint(endpoint_number, blocking, bytes([]))
+
+ self.debug(f" moondancer.api.write_endpoint({endpoint_number}, {blocking}, [])")
+
+ else: # DEVICE_TO_HOST (IN)
+ # If this was an IN request, we'll need to set up a transfer descriptor
+ # so the status phase can operate correctly. This effectively reads the
+ # zero length packet from the STATUS phase.
+ self.api.ep_out_prime_receive(endpoint_number)
+
+ self.debug(f" moondancer.api.ep_out_prime_receive({endpoint_number})")
+
+ def _send_on_endpoint(self, endpoint_number: int, data: bytes, blocking: bool=True):
+ """
+ Sends a collection of USB data on a given endpoint.
+
+ Args:
+ endpoint_number : The number of the IN endpoint on which data should be sent.
+ data : The data to be sent.
+ blocking : If true, this function will wait for the transfer to complete.
+ """
+
+ self.api.write_endpoint(endpoint_number, blocking, bytes(data))
+
+ self.debug(f"moondancer.send_on_endpoint({endpoint_number}, {len(data)}, {blocking})")
+ self.debug(f" moondancer.api.write_endpoint({endpoint_number}, {blocking}, {len(data)})")
+
+
+ def _read_from_endpoint(self, endpoint_number: int) -> bytes:
+ """
+ Reads a block of data from the given endpoint.
+
+ Args:
+ endpoint_number : The number of the OUT endpoint on which data is to be rx'd.
+ """
+
+ self.debug(f"moondancer.read_from_endpoint({endpoint_number})")
+
+ # Read from the given endpoint...
+ data = self.api.read_endpoint(endpoint_number)
+
+ # Re-enable OUT interface to receive data again...
+ self.api.ep_out_interface_enable()
+
+ self.debug(f" moondancer.api.read_endpoint({endpoint_number}) -> {len(data)} '{data}'")
+
+ # Finally, return the result.
+ return data
+
+ def _send_on_control_endpoint(self, endpoint_number: int, in_request: USBControlRequest, data: bytes, blocking: bool=True):
+ """
+ Sends a collection of USB data in response to a IN control request by the host.
+
+ Args:
+ endpoint_number : The number of the IN endpoint on which data should be sent.
+ requested_length : The number of bytes requested by the host.
+ data : The data to be sent.
+ blocking : If true, this function should wait for the transfer to complete.
+ """
+ requested_length = in_request.length
+ self.api.write_control_endpoint(endpoint_number, requested_length, blocking, bytes(data))
+
+ self.debug(f"moondancer.send_on_control_endpoint({endpoint_number}, {requested_length}, {len(data)}, {blocking})")
+ self.debug(f" moondancer.api.write_control_endpoint({endpoint_number}, {requested_length}, {blocking}, {len(data)})")
+
+ def _clear_halt(self, endpoint_number: int, direction: USBDirection):
+ """ Clears a halt condition on the provided non-control endpoint.
+
+ Args:
+ endpoint_number : The endpoint number
+ direction : The endpoint direction; or OUT if not provided.
+ """
+
+ endpoint_address = (endpoint_number | 0x80) if direction else endpoint_number
+ self.debug(f"Clearing halt EP{endpoint_number} {USBDirection(direction).name} (0x{endpoint_address:x})")
+
+ self.api.clear_feature_endpoint_halt(endpoint_number, direction)
+ self.debug(f" moondancer.api.clear_feature_endpoint_halt({endpoint_number}, {direction})")
+
+ def _service_irqs(self):
+ """
+ Core routine of the Facedancer execution/event loop. Continuously monitors the
+ Moondancer's execution status, and reacts as events occur.
+ """
+
+ # Get latest interrupt events
+ events: List[Tuple[int, int]] = self.api.get_interrupt_events()
+
+ # Handle interrupt events.
+ if len(events) > 0:
+
+ # gcp doesn't seem to return a nested tuple if it's only one event
+ if isinstance(events[0], int):
+ events = [ events ]
+
+ parsed_events = [InterruptEvent(event) for event in events]
+
+ for event in parsed_events:
+ self.debug(f"MD IRQ => {event}")
+ if event == InterruptEvent.USB_BUS_RESET:
+ self._handle_bus_reset()
+ elif event == InterruptEvent.USB_RECEIVE_CONTROL:
+ self._handle_receive_control(event.endpoint_number)
+ elif event == InterruptEvent.USB_RECEIVE_PACKET and event.endpoint_number == 0:
+ # TODO support endpoints other than EP0
+ self._handle_receive_control_packet(event.endpoint_number)
+ elif event == InterruptEvent.USB_RECEIVE_PACKET:
+ self._handle_receive_packet(event.endpoint_number)
+ elif event == InterruptEvent.USB_SEND_COMPLETE:
+ self._handle_send_complete(event.endpoint_number)
+ else:
+ self.error(f"Unhandled interrupt event: {event}")
+
+ # Check EP_IN NAK status for pending data requests
+ else:
+ nak_status = self.api.get_nak_status()
+ if nak_status != 0:
+ self._handle_ep_in_nak_status(nak_status)
+
+ # - Interrupt event handlers ----------------------------------------------
+
+ # USB0_BUS_RESET
+ def _handle_bus_reset(self):
+ """
+ Triggers Moondancer to perform its side of a bus reset.
+ """
+
+ #if self.connected_device:
+ # self.connected_device.handle_bus_reset()
+ #else:
+ # self.api.bus_reset()
+
+ # USBDevice from umap2 has no handle_bus_reset method, so we just call the api directly
+ self.info("Host issued a bus reset; resetting our connection.")
+
+ # Clear our state back to address zero and no configuration.
+ self.configuration = None
+ #self.address = 0
+
+ self.debug(f"moondancer.bus_reset()")
+ self.api.bus_reset()
+
+ # USB0_RECEIVE_CONTROL
+ def _handle_receive_control(self, endpoint_number: int):
+ """
+ Handles a known outstanding control event on a given endpoint.
+
+ endpoint_number: The endpoint number for which a control event should be serviced.
+ """
+
+ self.debug(f"handle_receive_control({endpoint_number})")
+
+ # HACK: to maintain API compatibility with the existing facedancer API,
+ # we need to know if a stall happens at any point during our handler.
+ self.endpoint_stalled[endpoint_number] = False
+
+ # Read the data from the SETUP stage...
+ data = bytearray(self.api.read_control())
+ # !umap2 has no constructor for a request. It just uses the plain bytearray.
+ #request = self.connected_device.create_request(data)
+ request = USBControlRequest.from_raw_bytes(data, device=self)
+
+ self.debug(f" moondancer.api.read_control({endpoint_number}) -> {len(data)} '{request}'")
+
+ is_out = request.get_direction() == USBDirection.OUT # HOST_TO_DEVICE
+ has_data = (request.length > 0)
+ self.debug(f" is_out:{is_out} has_data:{has_data}")
+
+ # Special case: if this is an OUT request with a data stage, we won't
+ # handle the request until the data stage has been completed. Instead,
+ # we'll stash away the data received in the setup stage, prime the
+ # endpoint for the data stage, and then wait for the data stage to
+ # complete, triggering a corresponding code path in
+ # in handle_transfer_complete_on_endpoint.
+ if is_out and has_data:
+ self.debug(f" setup packet has data - queueing read")
+ self.pending_control_request = request
+ self.api.ep_out_prime_receive(endpoint_number)
+ return
+
+ # Pass the request to the emulated device for handling.
+ self.debug(f" connected_device.handle_request({request})")
+
+ # !umap2 required the raw bytes to create its own request object.
+ self.pre_handle_request_hook(data)
+ self.connected_device.handle_request(data)
+ self.post_handle_request_hook(data)
+
+ # If it was an IN request with a data stage we now need to
+ # prime the endpoint to receive a ZLP from the host
+ # acknowledging receipt of our response.
+ if has_data and not is_out and not self.endpoint_stalled[endpoint_number]:
+ self.debug(f" CONTROL IN -> prime ep to receive zlp")
+ self.api.ep_out_prime_receive(endpoint_number)
+
+
+ # USB0_RECEIVE_PACKET(0)
+ def _handle_receive_control_packet(self, endpoint_number: int):
+ self.debug(f"moondancer.handle_receive_control_packet({endpoint_number}) pending:{self.pending_control_request}")
+
+ # Handle packet if we don't have a pending control request
+ if not self.pending_control_request:
+ data = self.api.read_endpoint(endpoint_number)
+ if len(data) == 0:
+ # It's a zlp following an IN control transfer, re-enable interface for reception on other endpoints.
+ self.api.ep_out_interface_enable()
+ else:
+ self.error(f"Discarding {len(data)} bytes on control endpoint with no pending control request")
+ return
+
+ # We have a pending control request with a data stage...
+ # Read the rest of the data from the endpoint, completing the control request.
+ new_data = self.api.read_endpoint(endpoint_number)
+
+ self.debug(f" handling control data stage: {len(new_data)} bytes")
+ self.debug(f" moondancer.api.read_endpoint({endpoint_number}) -> {len(new_data)}")
+
+ if len(new_data) == 0:
+ # It's a zlp following a control IN transfer, re-enable interface for reception on other endpoints.
+ self.api.ep_out_interface_enable()
+ self.debug(f"ZLP ending Control IN transfer on ep: {endpoint_number}")
+ return
+
+ # Append our new data to the pending control request.
+ self.pending_control_request.data.extend(new_data)
+
+ all_data_received = len(self.pending_control_request.data) == self.pending_control_request.length
+ is_short_packet = len(new_data) < self.max_packet_size_ep0
+
+ if all_data_received or is_short_packet:
+ # Handle the completed setup request...
+ # !Transform self.pending_control_request (USBControlRequest) to raw bytes for umap2's USBDevice own USBDeviceRequest constructor
+ # self.connected_device.handle_request(self.pending_control_request)
+ self.pre_handle_request_hook(self.pending_control_request.raw() + self.pending_control_request.data)
+ self.connected_device.handle_request(self.pending_control_request.raw() + self.pending_control_request.data)
+ self.post_handle_request_hook(self.pending_control_request.raw() + self.pending_control_request.data)
+
+ # And clear our pending setup data.
+ self.pending_control_request = None
+
+ # Finally, re-enable interface for reception on other endpoints.
+ self.api.ep_out_interface_enable()
+
+ return
+
+ # Finally, re-prime our control endpoint to receive the rest of the control data.
+ self.api.ep_out_prime_receive(endpoint_number)
+
+ # USB0_RECEIVE_PACKET(1...15)
+ def _handle_receive_packet(self, endpoint_number: int):
+ """
+ Handles a known-completed transfer on a given endpoint.
+
+ Args:
+ endpoint_number : The endpoint number for which the transfer should be serviced.
+ """
+
+ self.debug(f"moondancer.handle_receive_packet({endpoint_number})")
+
+ # Read the data from the endpoint
+ data = self.api.read_endpoint(endpoint_number)
+
+ self.debug(f" moondancer.api.read_endpoint({endpoint_number}) -> {len(data)}")
+
+ # Ignore it if it's a ZLP ack as Facedancer devices don't handle it.
+ if len(data) == 0:
+ # Finally, Prime endpoint to receive again.
+ self.api.ep_out_interface_enable()
+ self.debug(f" ZLP ending Bulk IN transfer on ep: {endpoint_number}")
+ return
+
+ # Pass it to the device's handler
+ self.connected_device.handle_data_available(endpoint_number, data)
+
+ # Finally, re-enable other OUT endpoints so we can receive on them again.
+ self.api.ep_out_interface_enable()
+
+ # USB0_SEND_COMPLETE
+ def _handle_send_complete(self, endpoint_number: int):
+ self.debug(f"handle_send_complete({endpoint_number})")
+ pass
+
+ # Handle pending data requests on EP_IN
+ def _handle_ep_in_nak_status(self, nak_status: int):
+ nakked_endpoints = [epno for epno in range(self.SUPPORTED_ENDPOINTS) if (nak_status >> epno) & 1]
+ for endpoint_number in nakked_endpoints:
+ if endpoint_number != 0:
+ self.debug(f"Received IN NAK on ep{endpoint_number}")
+ # TODO not supported by umap2!!!
+ # self.connected_device.handle_nak(endpoint_number)
+
+ # Static methods
+
+ @classmethod
+ def appropriate_for_environment(cls, backend_name: str) -> bool:
+ """
+ Determines if the current environment seems appropriate
+ for using the Moondancer backend.
+ """
+
+ # Check: if we have a backend name other than moondancer,
+ # the user is trying to use something else. Abort!
+ if backend_name and backend_name != "cynthion":
+ return False
+
+ # If we're not explicitly trying to use something else,
+ # see if there's a connected Cynthion.
+ try:
+ import cynthion
+ device = cynthion.Cynthion()
+ return device.supports_api('moondancer')
+ except ImportError:
+ cls.info("Skipping Cynthion-based devices, as the cynthion python module isn't installed.")
+ return False
+ except IOError:
+ cls.warning("Found Cynthion-based device, but could not access it. (Check permissions?)")
+ return False
+ except:
+ return False
+
+
\ No newline at end of file
diff --git a/umap2/phy/facedancer/max342x_phy.py b/umap2/phy/facedancer/max342x_phy.py
index 31ee3d6..1aa9ed9 100644
--- a/umap2/phy/facedancer/max342x_phy.py
+++ b/umap2/phy/facedancer/max342x_phy.py
@@ -210,7 +210,7 @@ def service_irqs(self):
self.clear_irq_bit(Regs.endpoint_irq, PINCTL.setup_data_avail)
b = self.read_bytes(Regs.setup_data_fifo, 8)
- if (irq & PINCTL.out0_data_avail) and (ord(b[0]) & 0x80 == 0x00):
+ if (irq & PINCTL.out0_data_avail) and (b[0] & 0x80 == 0x00):
data_bytes_len = struct.unpack('