Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion scripts/interactive.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

# ------------------------------------------------------ IMPORTS -------------------------------------------------------

from superkey import Interface, InteractiveInterface
from superkey.interface import Interface, InteractiveInterface
from superkey.types import *

# ----------------------------------------------------- PROCEDURES -----------------------------------------------------

Expand Down
2 changes: 2 additions & 0 deletions scripts/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ function declared on the `Interface` class. No instance is required.
>>> set_buzzer_frequency(800)
>>> autokey('cq cq de n0vig n0vig k')
```

The `dir()` function may be used to get a list of available functions.
313 changes: 251 additions & 62 deletions scripts/superkey/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ def __init__(self):
super().__init__("Reply: Invalid payload.")


class InvalidValueError(InterfaceError):
"""Exception indicating that a `REPLY_INVALID_VALUE` reply was received."""
def __init__(self):
"""Initializes a new instance."""
super().__init__('Reply: Invalid value.')


class Interface:
"""
Class encapsulating the serial interface provided by the SuperKey hardware.
Expand Down Expand Up @@ -103,79 +110,177 @@ def autokey(self, string: str):
"""
Sends the `REQUEST_AUTOKEY` command. Queues the specified string to be automatically keyed.
"""
# Assemble packet
payload = bytes(string, encoding='ascii') + b'\x00' # null char is not added by default
size = len(payload)
crc = self.__class__.__crc16(payload)
header = self.__class__.__pack_header(MessageID.REQUEST_AUTOKEY, size, crc)
self.__send_packet(MessageID.REQUEST_AUTOKEY, bytes(string, encoding='ascii') + b'\x00')
self.__check_reply_empty()

# Send packet
self.__send(header)
self.__send(payload)
self.__check_empty_reply()
def get_buzzer_enabled(self) -> bool:
"""
Sends the `REQUEST_GET_BUZZER_ENABLED` command. Returns whether the buzzer is enabled or not.
"""
self.__send_packet(MessageID.REQUEST_GET_BUZZER_ENABLED)
return self.__check_reply('<?')[0]

def get_buzzer_frequency(self) -> int:
"""
Sends the `REQUEST_GET_BUZZER_FREQUENCY` command. Returns the current buzzer frequency, in Hz.
"""
self.__send_packet(MessageID.REQUEST_GET_BUZZER_FREQUENCY)
return self.__check_reply('<H')[0]

def get_invert_paddles(self) -> bool:
"""
Sends the `REQUEST_GET_INVERT_PADDLES` command. Returns whether or not the paddles are inverted.
"""
self.__send_packet(MessageID.REQUEST_GET_INVERT_PADDLES)
return self.__check_reply('<?')[0]

def get_io_polarity(self, pin: IOPin) -> IOPolarity:
"""
Sends the `REQUEST_GET_IO_POLARITY` command. Returns the polarity of the specified I/O pin.
"""
self.__send_packet(MessageID.REQUEST_GET_IO_POLARITY, struct.pack('<B', pin))
return IOPolarity(self.__check_reply('<B')[0])

def get_io_state(self, pin: IOPin) -> IOPin:
"""
Sends the `REQUEST_GET_IO_STATE` command. Returns `true` if the specified input / output pin is active.
"""
self.__send_packet(MessageID.REQUEST_GET_IO_STATE, struct.pack('<B', pin))
return self.__check_reply('<?')[0]

def get_io_state_for_type(self, type: IOType) -> IOType:
"""
Sends the `REQUEST_GET_IO_STATE_FOR_TYPE` command. Returns `true` if any I/O pin with the specified type is on.
"""
self.__send_packet(MessageID.REQUEST_GET_IO_STATE_FOR_TYPE, struct.pack('<B', type))
return self.__check_reply('<?')[0]

def get_io_type(self, pin: IOPin) -> IOType:
"""
Sends the `REQUEST_GET_IO_TYPE` command. Returns the type of the specified I/O pin.
"""
self.__send_packet(MessageID.REQUEST_GET_IO_TYPE, struct.pack('<B', pin))
return IOType(self.__check_reply('<B')[0])

def get_led_enabled(self, led: LED) -> bool:
"""
Sends the `REQUEST_GET_LED_ENABLED` command. Returns whether or not the specified LED is enabled.
"""
self.__send_packet(MessageID.REQUEST_GET_LED_ENABLED, struct.pack('<b', led))
return self.__check_reply('<?')[0]

def get_paddle_mode(self) -> PaddleMode:
"""
Sends the `REQUEST_GET_PADDLE_MODE` command. Returns the currently selected paddle mode.
"""
self.__send_packet(MessageID.REQUEST_GET_PADDLE_MODE)
return PaddleMode(self.__check_reply('<B')[0])

def get_wpm(self) -> float:
"""
Sends the `REQUEST_GET_WPM` command. Returns the current WPM setting.
"""
self.__send_packet(MessageID.REQUEST_GET_WPM)
return self.__check_reply('<f')[0]

def get_wpm_scale(self, element: CodeElement) -> float:
"""
Sends the `REQUEST_GET_WPM_SCALE` command. Returns the current WPM scale for the specified code element.
"""
self.__send_packet(MessageID.REQUEST_GET_WPM_SCALE, struct.pack('<B', element))
return self.__check_reply('<f')[0]

def panic(self):
"""
Sends the `REQUEST_PANIC` command. Immediately and unconditionally stops keying.
"""
# Assemble packet
header = self.__class__.__pack_header(MessageID.REQUEST_PANIC)

# Send packet
self.__send(header)
self.__check_empty_reply()
self.__send_packet(MessageID.REQUEST_PANIC)
self.__check_reply_empty()

def ping(self):
"""
Sends the `REQUEST_PING` command. Keys a short test message.
"""
# Assemble packet
header = self.__class__.__pack_header(MessageID.REQUEST_PING)

# Send packet
self.__send(header)
self.__check_empty_reply()
self.__send_packet(MessageID.REQUEST_PING)
self.__check_reply_empty()

def restore_default_config(self):
"""
Sends the `REQUEST_RESTORE_DEFAULT_CONFIG` command. Restores the device to its default configuration.
"""
# Assemble packet
header = self.__class__.__pack_header(MessageID.REQUEST_RESTORE_DEFAULT_CONFIG)

# Send packet
self.__send(header)
self.__check_empty_reply()
self.__send_packet(MessageID.REQUEST_RESTORE_DEFAULT_CONFIG)
self.__check_reply_empty()

def set_buzzer_enabled(self, enabled: bool):
"""
Sends the `REQUEST_SET_BUZZER_ENABLED` command. Enables or disables the device's built-in buzzer.
"""
# Assemble packet
payload = struct.pack('<?', enabled)
size = len(payload)
crc = self.__class__.__crc16(payload)
header = self.__class__.__pack_header(MessageID.REQUEST_SET_BUZZER_ENABLED, size, crc)

# Send packet
self.__send(header)
self.__send(payload)
self.__check_empty_reply()
self.__send_packet(MessageID.REQUEST_SET_BUZZER_ENABLED, struct.pack('<?', enabled))
self.__check_reply_empty()

def set_buzzer_frequency(self, frequency: int):
"""
Sends the `REQUEST_SET_BUZZER_FREQUENCY` command. Sets the frequency (in Hz) of the device's built-in buzzer.
"""
# Assemble packet
payload = struct.pack('<H', frequency)
size = len(payload)
crc = self.__class__.__crc16(payload)
header = self.__class__.__pack_header(MessageID.REQUEST_SET_BUZZER_FREQUENCY, size, crc)
self.__send_packet(MessageID.REQUEST_SET_BUZZER_FREQUENCY, struct.pack('<H', frequency))
self.__check_reply_empty()

def set_invert_paddles(self, inverted: bool):
"""
Sends the `REQUEST_SET_INVERT_PADDLES` command. Sets whether the paddles are inverted.
"""
self.__send_packet(MessageID.REQUEST_SET_INVERT_PADDLES, struct.pack('<?', inverted))
self.__check_reply_empty()

def set_io_polarity(self, pin: IOPin, polarity: IOPolarity):
"""
Sends the `REQUEST_SET_IO_POLARITY` command. Sets the polarity of the specified I/O pin.
"""
self.__send_packet(MessageID.REQUEST_SET_IO_POLARITY, struct.pack('<BB', pin, polarity))
self.__check_reply_empty()

def set_io_type(self, pin: IOPin, type: IOType):
"""
Sends the `REQUEST_SET_IO_TYPE` command. Sets the type of the specified I/O pin.
"""
self.__send_packet(MessageID.REQUEST_SET_IO_TYPE, struct.pack('<BB', pin, type))
self.__check_reply_empty()

def set_led_enabled(self, led: LED, enabled: bool):
"""
Sends the `REQUEST_SET_LED_ENABLED` command. Sets whether the specified LED is enabled or not.
"""
self.__send_packet(MessageID.REQUEST_SET_LED_ENABLED, struct.pack('<B?', led, enabled))
self.__check_reply_empty()

def set_paddle_mode(self, mode: PaddleMode):
"""
Sends the `REQUEST_SET_PADDLE_MODE` command. Sets the current keyer paddle mode.
"""
self.__send_packet(MessageID.REQUEST_SET_PADDLE_MODE, struct.pack('<B', mode))
self.__check_reply_empty()

def set_wpm(self, wpm: float):
"""
Sends the `REQUEST_SET_WPM` command. Sets the keyer's WPM setting.
"""
self.__send_packet(MessageID.REQUEST_SET_WPM, struct.pack('<f', wpm))
self.__check_reply_empty()

def set_wpm_scale(self, element: CodeElement, scale: float):
"""
Sends the `REQUEST_SET_WPM_SCALE` command. Sets the WPM scale for the specified code element.
"""
self.__send_packet(MessageID.REQUEST_SET_WPM_SCALE, struct.pack('<Bf', element, scale))
self.__check_reply_empty()

# Send packet
def version(self) -> str:
"""
Sends the `REQUEST_VERSION` command. Returns the device's version information.
"""
header = self.__class__.__pack_header(MessageID.REQUEST_VERSION)
self.__send(header)
self.__send(payload)
self.__check_empty_reply()
return self.__check_reply_str()

def __validate_serial(self):
"""
Expand All @@ -184,21 +289,117 @@ def __validate_serial(self):
if self.serial is None:
raise InterfaceError("The serial port is not open.")

def __check_empty_reply(self) -> bool:
def __send(self, buffer: bytes):
"""
Attempts to receive a generic empty reply from the device.
Transmits the specified buffer.
"""
self.__validate_serial()
self.serial.write(buffer)

def __send_packet(self, message: MessageID, payload: Optional[bytes] = None):
"""
Sends a packet with the specified message ID and payload.
"""
# Get header
size = 0
crc = 0
if payload is not None:
size = len(payload)
crc = self.__class__.__crc16(payload)
header = self.__class__.__pack_header(message, size, crc)

# Send data
self.__send(header)
if payload is not None:
self.__send(payload)

def __receive(self, size: int):
"""
Receives the specified number of bytes.
"""
self.__validate_serial()
return self.serial.read(size=size)

def __receive_header(self):
"""
Attempts to receive a header from the serial port.
"""
# Receive reply and verify we got enough data
reply = self.__receive(HEADER_STRUCT_SIZE)
if len(reply) != HEADER_STRUCT_SIZE:
raise InterfaceError('No reply received.')

# Unpack the header and verify the size and CRC are correct
message, size, crc = self.__class__.__unpack_header(reply)
return self.__class__.__unpack_header(reply)

def __check_reply(self, format: str) -> Tuple[any, ...]:
"""
Attempts to receive a reply with a payload from the device.
"""
# Unpack header
message, size, crc = self.__receive_header()

# Receive payload
if size != 0:
payload = self.__receive(size)
if len(payload) != size:
raise InterfaceError('No payload received.')
else:
payload = None

# Check the message ID
self.__check_reply_message_id(message)

# Check CRC
if payload is not None and self.__class__.__crc16(payload) != crc:
raise InterfaceError('Reply: Invalid CRC?')

# Check payload length
if len(payload) != struct.calcsize(format):
raise InterfaceError('Reply: Invalid payload?')

return struct.unpack(format, payload)

def __check_reply_empty(self):
"""
Attempts to receive a generic empty reply from the device.
"""
# Unpack message
message, size, crc = self.__receive_header()
if size != 0 or crc != 0:
raise InterfaceError('Reply: Invalid size / CRC?')

# Check the message ID
self.__check_reply_message_id(message)

def __check_reply_str(self) -> Optional[str]:
"""
Attempts to receive a reply with a string payload from the device.
"""
# Unpack header
message, size, crc = self.__receive_header()

# Receive payload
if size != 0:
payload = self.__receive(size)
if len(payload) != size:
raise InterfaceError('No payload received.')
else:
return None

# Check the message ID
self.__check_reply_message_id(message)

# Check CRC
if self.__class__.__crc16(payload) != crc:
raise InterfaceError('Reply: Invalid CRC?')

return str(payload, encoding='ascii')

def __check_reply_message_id(self, message: MessageID):
"""
Throws an exception if the specified message ID represent a failure.
"""
if message == MessageID.REPLY_SUCCESS:
return # no error
elif message == MessageID.REPLY_INVALID_MESSAGE:
Expand All @@ -209,23 +410,11 @@ def __check_empty_reply(self) -> bool:
raise InvalidCRCError()
elif message == MessageID.REPLY_INVALID_PAYLOAD:
raise InvalidPayloadError()
elif message == MessageID.REPLY_INVALID_VALUE:
raise InvalidValueError()
else:
raise InterfaceError('Reply: Unknown reply?')

def __send(self, buffer: bytes):
"""
Transmits the specified buffer.
"""
self.__validate_serial()
self.serial.write(buffer)

def __receive(self, size: int):
"""
Receives the specified number of bytes.
"""
self.__validate_serial()
return self.serial.read(size=size)

@staticmethod
def __crc16(buffer: bytes, seed: int = 0xFFFF):
"""
Expand Down
Loading