Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mpf/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
__version__ = '0.57.5.dev1' # Also consider whether MPF-MC pyproject.toml should be updated
'''The full version of MPF.'''

__short_version__ = '0.57'
__short_version__ = '0.58'
'''The major.minor version of MPF.'''

__bcp_version__ = '1.1'
Expand Down
8 changes: 4 additions & 4 deletions mpf/config_spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -668,11 +668,11 @@ fast_exp_board:
fast_breakout:
port: single|enum(1,2,3)|
model: single|str|
led_ports: list|subconfig(fast_led_port)|None
fast_led_port:
port: single|str|
type: single|enum(ws2812,apa-102)|ws2812
leds: single|int|32
port: single|int|
type: single|enum(ws2812,apa-102,sk6812,mixed)|ws2812
count: single|int|32
rgbw_numbers: list|int|None # only used with the "mixed" type
fast_aud:
port: list|str|auto
baud: single|int|230400
Expand Down
322 changes: 165 additions & 157 deletions mpf/devices/light.py

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions mpf/platforms/fast/communicators/exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def __init__(self, platform, processor, config):
self.active_board = None

self.message_processors['BR:'] = self._process_br
self.message_processors['ER:'] = self._process_er

async def init(self):
"""Query the expansion boards."""
Expand Down Expand Up @@ -91,6 +92,10 @@ def _process_br(self, msg):
self.active_board = None
self.done_processing_msg_response()

def _process_er(self, msg):
del msg
self.done_processing_msg_response()

def set_led_fade_rate(self, board_address: str, rate: int) -> None:
"""Sets the hardware LED fade rate for an EXP board.

Expand Down
250 changes: 149 additions & 101 deletions mpf/platforms/fast/fast.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,12 +562,10 @@ def configure_switch(self, number: str, config: SwitchConfig, platform_config: d
raise AssertionError("Switch needs a number")

if not self.serial_connections['net']:
raise AssertionError("A request was made to configure a FAST "
"switch, but no connection to a NET processor"
"is available")
raise AssertionError("A request was made to configure a FAST switch, "
"but no connection to a NET processor is available")

if self.is_retro:
# translate switch num to FAST switch
if self.is_retro: # translate switch num to FAST switch
try:
number = fast_defines.RETRO_SWITCH_MAP[str(number).upper()]
except KeyError:
Expand All @@ -584,7 +582,6 @@ def configure_switch(self, number: str, config: SwitchConfig, platform_config: d

return switch

# pylint: disable-msg=too-many-locals
def configure_light(self, number, subtype, config, platform_settings) -> LightPlatformInterface:
"""Configure light in platform."""
del platform_settings
Expand All @@ -601,82 +598,123 @@ def configure_light(self, number, subtype, config, platform_settings) -> LightPl
# split into board name, breakout, port, led
parts = parts.split('-')

if parts[0] in self.exp_boards_by_name:
# this is an expansion board LED in config file format
exp_board = self.exp_boards_by_name[parts[0]]
if parts[0] in self.exp_boards_by_name: # this is an expansion board LED in config file format
return self._add_exp_led_with_config_format(parts, channel, config.name)

try:
_, port, led = parts
breakout = '0'
except ValueError:
_, breakout, port, led = parts
breakout = breakout.strip('b')
if int(parts[0]) > 255: # EXP LED in int form, which is how "previous:" values are calculated
return self._add_exp_led_with_int_format(parts, channel)

# ports are always 1-4, but some EXP boards have more which are labeled 5-8
# Those are really 1-4 of the next breakout board, so if we get a port > 4
# then sort it out to the real internal values
if int(port) > 4:
# assume 4 LED ports per breakout, could change to a lookup
breakout = str((int(port) - 1) // 4)
port = str((int(port) - 1) % 4 + 1)
# else it's a Nano LED
return self._add_nano_led(parts, channel)

try:
brk_board = exp_board.breakouts[breakout]
except KeyError:
# TODO change to mpf config exception
raise AssertionError(f'Board {exp_board} does not have a config entry for Breakout {breakout}')
raise AssertionError(f"Unknown light subtype {subtype}")

index = self.port_idx_to_hex(port, led, 32, config.name)
this_led_number = f'{brk_board.address}{index}'
# pylint: disable-msg=too-many-locals
def _add_exp_led_with_config_format(self, parts, channel, name):
exp_board = self.exp_boards_by_name[parts[0]]

# this code runs once for each channel, so it will be called 3x per LED which
# is why we check this here
if this_led_number not in self.fast_exp_leds:
self.fast_exp_leds[this_led_number] = FASTExpLED(this_led_number,
exp_board.config['led_fade_time'], self)
try:
_, port, led = parts
breakout = '0'
except ValueError:
_, breakout, port, led = parts
breakout = breakout.strip('b')

lights_before_port = 0
light_count_on_port = exp_board.light_count_on_port(port)
port_number = int(port)
range_end = port_number
if port_number <= 4:
range_start = 1
else:
range_start = 5
idx = range_start
while idx < range_end:
lights_before_port += exp_board.light_count_on_port(idx)
idx += 1

# ports are always 1-4, but some EXP boards have more which are labeled 5-8
# Those are really 1-4 of the next breakout board, so if we get a port > 4
# then sort it out to the real internal values
if int(port) > 4:
# assume 4 LED ports per breakout, could change to a lookup
breakout = str((int(port) - 1) // 4)
port = str((int(port) - 1) % 4 + 1)

fast_led_channel = FASTLEDChannel(self.fast_exp_leds[this_led_number], channel)
self.fast_exp_leds[this_led_number].add_channel(int(channel), fast_led_channel)
try:
brk_board = exp_board.breakouts[breakout]
except KeyError:
# TODO change to mpf config exception
raise AssertionError(f'Board {exp_board} does not have a config entry for Breakout {breakout}')

elif int(parts[0]) > 255:
# EXP LED in int form, which is how "previous:" values are calculated
index = self.port_idx_to_hex(port, led, light_count_on_port, lights_before_port, name)
this_led_number = f'{brk_board.address}{index}'

raw_hex_string = hex(int(parts[0]))[2:] # lowercase with 0x prefix stripped"
this_led_number = Util.normalize_hex_string(raw_hex_string, len(raw_hex_string))
# this code runs once for each channel, so it will be called 3x per LED which
# is why we check this here
if this_led_number not in self.fast_exp_leds:
self.fast_exp_leds[this_led_number] = FASTExpLED(this_led_number,
exp_board.config['led_fade_time'], self)

exp_board = self.exp_boards_by_address[this_led_number[:2]]
fast_led_channel = FASTLEDChannel(self.fast_exp_leds[this_led_number], channel)
self.fast_exp_leds[this_led_number].add_channel(int(channel), fast_led_channel)
return fast_led_channel

if this_led_number not in self.fast_exp_leds:
# RGBW LEDs could span multiple FAST LEDs, so make sure it exists
self.fast_exp_leds[this_led_number] = FASTExpLED(this_led_number,
exp_board.config['led_fade_time'], self)
def _add_exp_led_with_int_format(self, parts, channel):
raw_hex_string = hex(int(parts[0]))[2:] # lowercase with 0x prefix stripped"
this_led_number = Util.normalize_hex_string(raw_hex_string, len(raw_hex_string))

fast_led_channel = FASTLEDChannel(self.fast_exp_leds[this_led_number], channel)
self.fast_exp_leds[this_led_number].add_channel(int(channel), fast_led_channel)
exp_board = self.exp_boards_by_address[this_led_number[:2]]

else:
# Nano LED
if this_led_number not in self.fast_exp_leds:
# RGBW LEDs could span multiple FAST LEDs, so make sure it exists
self.fast_exp_leds[this_led_number] = FASTExpLED(this_led_number,
exp_board.config['led_fade_time'], self)

fast_led_channel = FASTLEDChannel(self.fast_exp_leds[this_led_number], channel)
self.fast_exp_leds[this_led_number].add_channel(int(channel), fast_led_channel)
return fast_led_channel

try:
number = self.port_idx_to_hex(parts[0], parts[1], 64)
except IndexError:
# this is a legacy LED number as an int
number = f'{int(parts[0]):02X}'
def _add_nano_led(self, parts, channel):
try:
number = self.nano_port_idx_to_hex(parts[0], parts[1])
except IndexError:
# this is a legacy LED number as an int
number = f'{int(parts[0]):02X}'

if number not in self.fast_rgb_leds:
try:
self.fast_rgb_leds[number] = FASTRGBLED(number, self)
except KeyError:
# This number is not valid
raise ConfigFileError(f"Invalid LED number: {'_'.join(parts)}", 3, self.log.name)
if number not in self.fast_rgb_leds:
try:
self.fast_rgb_leds[number] = FASTRGBLED(number, self)
except KeyError:
# This number is not valid
raise ConfigFileError(f"Invalid LED number: {'_'.join(parts)}", 3, self.log.name)

fast_led_channel = FASTLEDChannel(self.fast_rgb_leds[number], channel)
self.fast_rgb_leds[number].add_channel(int(channel), fast_led_channel)
fast_led_channel = FASTLEDChannel(self.fast_rgb_leds[number], channel)
self.fast_rgb_leds[number].add_channel(int(channel), fast_led_channel)
return fast_led_channel

return fast_led_channel
raise AssertionError(f"Unknown light subtype {subtype}")
def nano_port_idx_to_hex(self, port, device_num):
"""Converts port number and LED index into the Nano FAST hex number.

def port_idx_to_hex(self, port, device_num, devices_per_port, name=None):
port: the LED port number printed on the board. First port is 1. No zeros.
device_num: LED position in the change, First LED is 1. No zeros.

Returns: FAST hex string for the LED
"""
port = int(port)
device_num = int(device_num)
if device_num < 1:
raise AssertionError(f"Device number {device_num} is not valid for Nano light device. "
"The first device in the chain should be 1, not 0")
if port < 1:
raise AssertionError(f"Port {port} is not valid for device {device_num}")
if device_num > 64:
raise AssertionError(f"Device number {device_num} exceeds the number of devices on port (64)")

return f'{(64 * (port - 1) + device_num - 1):02X}'

# pylint: disable-msg=too-many-arguments
def port_idx_to_hex(self, port, device_num, devices_on_port, devices_on_previous_ports, name=None):
"""Converts port number and LED index into the proper FAST hex number.

port: the LED port number printed on the board. First port is 1. No zeros.
Expand All @@ -696,17 +734,15 @@ def port_idx_to_hex(self, port, device_num, devices_per_port, name=None):
if port < 1:
raise AssertionError(f"Port {port} is not valid for device {device_num}")

if device_num > devices_per_port:
if device_num > devices_on_port:
if name:
self.raise_config_error(f"Device number {device_num} exceeds the number of devices per port "
f"({devices_per_port}) for LED {name}", 9)
self.raise_config_error(f"Device number {device_num} exceeds the number of devices on port "
f"({devices_on_port}) for LED {name}", 9)
else:
raise AssertionError(f"Device number {device_num} exceeds the number of devices per port "
f"({devices_per_port})")
raise AssertionError(f"Device number {device_num} exceeds the number of devices on port "
f"({devices_on_port})")

port_offset = (port - 1) * devices_per_port
device_num = device_num - 1
return f'{(port_offset + device_num):02X}'
return f'{(devices_on_previous_ports + device_num - 1):02X}'

def parse_light_number_to_channels(self, number: str, subtype: str = "led"):
"""Transform an MPF config light number to a FAST channel.
Expand Down Expand Up @@ -737,37 +773,49 @@ def _parse_led_light_number(self, number):

if parts[0] in self.exp_boards_by_name:
# This is an expansion board LED
if not parts[1].startswith('b'):
# No breakout specified, so we insert a b0
parts.insert(1, 'b0')

if len(parts) == 4:
# No channel specified, so we return 3 channels 0,1,2
return [{'number': '-'.join(parts) + f'-{i}'} for i in range(3)]

if len(parts) == 5:
# We have a channel specified
channel = int(parts[4])
if 0 <= channel <= 2:
result = []
for i in range(3):
working_parts = parts.copy()
if i + channel > 2:
# Channel rolls over, increment the LED number
working_parts[3] = str(int(working_parts[3]) + 1)
working_parts[4] = str((channel + i) % 3)
else:
working_parts[4] = str(channel + i)
result.append({'number': '-'.join(working_parts)})
return result
raise AssertionError(f"Invalid LED channel: {channel}")
raise AssertionError(f"Invalid LED number: {number}")

# This is a Nano LED
return self._parse_expansion_board_light_number(number, parts)

# Nano LEDs do not have expansion board names
return self._parse_nano_light_number(number, parts)

def _parse_expansion_board_light_number(self, number, parts):
if not parts[1].startswith('b'):
# No breakout specified, so we insert a b0
parts.insert(1, 'b0')

if len(parts) == 4:
# No channel specified, so we return 3 channels 0,1,2
return [{'number': '-'.join(parts) + f'-{i}'} for i in range(3)]

if len(parts) == 5:
# We have a channel specified
channel = int(parts[4])
if 0 <= channel <= 2:
result = []
for i in range(3):
working_parts = parts.copy()
absolute_channel = channel + i
if absolute_channel > 2:
# Channel rolls over, increment the LED number
working_parts[3] = str(int(working_parts[3]) + absolute_channel // 3)
working_parts[4] = str(absolute_channel % 3)
else:
working_parts[4] = str(absolute_channel)
result.append({'number': '-'.join(working_parts)})

return result

# channel out of bounds
raise AssertionError(f"Invalid LED channel: {channel}")

# wrong number of parts
raise AssertionError(f"Invalid LED number: {number}")

def _parse_nano_light_number(self, number, parts):
if '-' in str(number):
# num = list(map(int, str(number).split('-')))
# index = num[0] * 64 + num[1]
index = int(self.port_idx_to_hex(parts[0], parts[1], 64), 16)
index = int(self.nano_port_idx_to_hex(parts[0], parts[1]), 16)
else:
index = int(number)

Expand Down
Loading
Loading