From 3cb95c2689d6ed1246e0d706632223c0d77d16be Mon Sep 17 00:00:00 2001 From: Alex Lobascio Date: Fri, 2 Jan 2026 18:44:54 -0800 Subject: [PATCH 1/7] refactor and document commands in fast EXP test --- mpf/tests/test_Fast_Exp.py | 199 ++++++++++++++++++++----------------- 1 file changed, 109 insertions(+), 90 deletions(-) diff --git a/mpf/tests/test_Fast_Exp.py b/mpf/tests/test_Fast_Exp.py index 3ed26ef53..249f3cbc6 100644 --- a/mpf/tests/test_Fast_Exp.py +++ b/mpf/tests/test_Fast_Exp.py @@ -17,29 +17,49 @@ def create_expected_commands(self): # These are all the defaults based on the config file for this test. # Individual tests can override / add as needed - self.serial_connections['exp'].expected_commands = {'RA@880:000000': '', - 'RA@881:000000': '', - 'RA@882:000000': '', - 'RA@890:000000': '', - 'RA@892:000000': '', - 'RA@B40:000000': '', - 'RA@840:000000': '', - 'RA@841:000000': '', - 'RA@480:000000': '', - 'RA@481:000000': '', - 'RA@482:000000': '', - 'RF@89:5DC': '', - 'EM@B40:0,1,7D0,1F4,9C4,5DC': '', - 'EM@B40:1,1,7D0,3E8,7D0,5DC': '', - 'EM@882:7,1,7D0,3E8,7D0,5DC': '', - 'MP@B40:0,7F,7D0': '', - 'MP@B40:1,7F,7D0': '', - 'MP@882:7,7F,7D0': '',} + self.serial_connections['exp'].expected_commands = { + # set all lights off on 0091 + 'RA@880:000000': '', + 'RA@881:000000': '', + 'RA@882:000000': '', + + # set all lights off on secondary 0091 + 'RA@890:000000': '', + 'RA@892:000000': '', + + # set all lights off on 0081 + 'RA@B40:000000': '', + 'RA@840:000000': '', + 'RA@841:000000': '', + + # set all lights off on neuron + 'RA@480:000000': '', + 'RA@481:000000': '', + 'RA@482:000000': '', + + # set fade rate on secondary 0091 + 'RF@89:5DC': '', + + # configure motors on 0081 + 'EM@B40:0,1,7D0,1F4,9C4,5DC': '', + 'EM@B40:1,1,7D0,3E8,7D0,5DC': '', + + # configure motor on 0091 + 'EM@882:7,1,7D0,3E8,7D0,5DC': '', + + # set motor position on 0071 + 'MP@B40:0,7F,7D0': '', + 'MP@B40:1,7F,7D0': '', + + # set motor position on 0091 + 'MP@882:7,7F,7D0': '', + } def test_servo(self): # go to min position self.exp_cpu.expected_commands = { - "MP@B40:0,00,7D0": "" # MP:,, + # MP:,, + "MP@B40:0,00,7D0": "" # motor position on 0071 } self.machine.servos["servo1"].go_to_position(0) self.advance_time_and_run(1) @@ -47,14 +67,13 @@ def test_servo(self): # go to max position self.exp_cpu.expected_commands = { - "MP@B40:0,FF,7D0": "" + "MP@B40:0,FF,7D0": "" # motor position on 0071 } self.machine.servos["servo1"].go_to_position(1) self.advance_time_and_run(.1) self.assertFalse(self.exp_cpu.expected_commands) def test_leds(self): - # create local references to all the lights so they can be accessed like `led1.on()` for led_name, led_obj in self.machine.lights.items(): setattr(self, led_name, led_obj) @@ -71,7 +90,6 @@ def test_leds(self): self._test_lew_hardware_fade() def _test_led_internals(self): - # Make sure the internal LED map is correct self.assertIn("88100", self.fast_exp_leds) self.assertIn("88001", self.fast_exp_leds) @@ -81,61 +99,39 @@ def _test_led_internals(self): self.assertIn("89200", self.fast_exp_leds) # Make sure explicit offset declarations work - self.assertEqual(self.led4.hw_drivers['red'][0].number, '88120-0') - self.assertEqual(self.led4.hw_drivers['green'][0].number, '88120-1') - self.assertEqual(self.led4.hw_drivers['blue'][0].number, '88120-2') - self.assertEqual(self.led5.hw_drivers['red'][0].number, '88121-1') - self.assertEqual(self.led5.hw_drivers['green'][0].number, '88121-2') - self.assertEqual(self.led5.hw_drivers['blue'][0].number, '88122-0') - self.assertEqual(self.led6.hw_drivers['red'][0].number, '89200-2') - self.assertEqual(self.led6.hw_drivers['green'][0].number, '89201-0') - self.assertEqual(self.led6.hw_drivers['blue'][0].number, '89201-1') - + self._test_led_drivers(self.led4, '88120-0', '88120-1', '88120-2', None) + self._test_led_drivers(self.led5, '88121-1', '88121-2', '88122-0', None) + self._test_led_drivers(self.led6, '89200-2', '89201-0', '89201-1', None) # Make sure all the RGBW, channels, previous, and start_channels are working - self.assertEqual(self.led22.hw_drivers['red'][0].number, '48002-0') - self.assertEqual(self.led22.hw_drivers['green'][0].number, '48002-1') - self.assertEqual(self.led22.hw_drivers['blue'][0].number, '48002-2') - self.assertEqual(self.led23.hw_drivers['red'][0].number, '48003-0') - self.assertEqual(self.led23.hw_drivers['green'][0].number, '48003-1') - self.assertEqual(self.led23.hw_drivers['blue'][0].number, '48003-2') - self.assertEqual(self.led24.hw_drivers['red'][0].number, '48004-0') - self.assertEqual(self.led24.hw_drivers['green'][0].number, '48004-1') - self.assertEqual(self.led24.hw_drivers['blue'][0].number, '48004-2') - self.assertEqual(self.led24.hw_drivers['white'][0].number, '48005-0') - self.assertEqual(self.led25.hw_drivers['red'][0].number, '48005-1') - self.assertEqual(self.led25.hw_drivers['green'][0].number, '48005-2') - self.assertEqual(self.led25.hw_drivers['blue'][0].number, '48006-0') - self.assertEqual(self.led25.hw_drivers['white'][0].number, '48006-1') - self.assertEqual(self.led26.hw_drivers['red'][0].number, '48006-2') - self.assertEqual(self.led26.hw_drivers['green'][0].number, '48007-0') - self.assertEqual(self.led26.hw_drivers['blue'][0].number, '48007-1') - self.assertEqual(self.led26.hw_drivers['white'][0].number, '48007-2') - self.assertEqual(self.led27.hw_drivers['red'][0].number, '48008-0') - self.assertEqual(self.led27.hw_drivers['green'][0].number, '48008-1') - self.assertEqual(self.led27.hw_drivers['blue'][0].number, '48008-2') - self.assertEqual(self.led28.hw_drivers['red'][0].number, '88222-0') - self.assertEqual(self.led28.hw_drivers['green'][0].number, '88222-1') - self.assertEqual(self.led28.hw_drivers['blue'][0].number, '88222-2') - self.assertEqual(self.led28.hw_drivers['white'][0].number, '88223-0') - self.assertEqual(self.led29.hw_drivers['red'][0].number, '48009-0') - self.assertEqual(self.led29.hw_drivers['green'][0].number, '48009-2') - self.assertEqual(self.led29.hw_drivers['blue'][0].number, '48009-1') - self.assertEqual(self.led29.hw_drivers['white'][0].number, '4800A-2') - self.assertEqual(self.led30.hw_drivers['red'][0].number, 'B406B-0') - self.assertEqual(self.led30.hw_drivers['green'][0].number, 'B406B-1') - self.assertEqual(self.led30.hw_drivers['blue'][0].number, 'B406B-2') - self.assertEqual(self.led31.hw_drivers['red'][0].number, 'B406C-0') - self.assertEqual(self.led31.hw_drivers['green'][0].number, 'B406C-1') - self.assertEqual(self.led31.hw_drivers['blue'][0].number, 'B406C-2') - + self._test_led_drivers(self.led22, '48002-0', '48002-1', '48002-2', None) + self._test_led_drivers(self.led23, '48003-0', '48003-1', '48003-2', None) + self._test_led_drivers(self.led24, '48004-0', '48004-1', '48004-2', '48005-0') + self._test_led_drivers(self.led25, '48005-1', '48005-2', '48006-0', '48006-1') + self._test_led_drivers(self.led26, '48006-2', '48007-0', '48007-1', '48007-2') + self._test_led_drivers(self.led27, '48008-0', '48008-1', '48008-2', None) + self._test_led_drivers(self.led28, '88222-0', '88222-1', '88222-2', '88223-0') + self._test_led_drivers(self.led29, '48009-0', '48009-2', '48009-1', '4800A-2') + self._test_led_drivers(self.led30, 'B406B-0', 'B406B-1', 'B406B-2', None) + self._test_led_drivers(self.led31, 'B406C-0', 'B406C-1', 'B406C-2', None) + + def _test_led_drivers(self, led, red, green, blue, white): + drivers = led.hw_drivers + if red: + self.assertEqual(drivers['red'][0].number, red) + if green: + self.assertEqual(drivers['green'][0].number, green) + if blue: + self.assertEqual(drivers['blue'][0].number, blue) + if white: + self.assertEqual(drivers['white'][0].number, white) def _test_led_colors(self): - self.exp_cpu.expected_commands = { - 'RD@880:0201ff123402121212': '', - 'RD@881:0100ffffff': '', - 'RD@841:0160ffffff': ','} + 'RD@880:0201ff123402121212': '', # set individual lights on 0091 + 'RD@881:0100ffffff': '', # set individual lights on 0091 + 'RD@841:0160ffffff': ',' # set individual lights on 0081 + } self.led1.on() self.led2.color("ff1234") @@ -149,19 +145,25 @@ def _test_led_colors(self): self.assertFalse(self.exp_cpu.expected_commands) # turn on a LED on a different board that has a hex index too - self.exp_cpu.expected_commands = {'RD@B40:016affffff': '',} + self.exp_cpu.expected_commands = { + 'RD@B40:016affffff': '', # set individual lights on 0071 + } self.led18.on() self.advance_time_and_run() self.assertEqual("FFFFFF", self.exp_cpu.leds['led18']) # # test led off - self.exp_cpu.expected_commands = {'RD@881:0100000000': '',} + self.exp_cpu.expected_commands = { + 'RD@881:0100000000': '', # set individual lights on 0091 + } self.led1.off() self.advance_time_and_run() self.assertEqual("000000", self.exp_cpu.leds['led1']) # # test led color - self.exp_cpu.expected_commands = {'RD@890:010002172a': '',} + self.exp_cpu.expected_commands = { + 'RD@890:010002172a': '', # set individual lights on second 0091 + } self.led7.color(RGBColor((2, 23, 42))) self.advance_time_and_run(1) self.assertEqual("02172A", self.exp_cpu.leds['led7']) @@ -170,35 +172,44 @@ def _test_exp_board_reset(self): # verify a board reset turns off the LEDs only on the board addresses self.exp_cpu.expected_commands = { + # set individual lights on 0091 'RD@881:0100ff1234': '', 'RD@880:0102467fff': '', - 'RD@B40:016a6a6a6a': '',} + # set individual lights on 0071 + 'RD@B40:016a6a6a6a': '', + } self.led1.color("ff1234") self.led3.color("467fff") self.led18.color("6a6a6a") self.advance_time_and_run() - self.exp_cpu.write(b'BR@B40:') + self.exp_cpu.write(b'BR@B40:') # board reset command on 0071 self.advance_time_and_run() self.assertEqual("000000", self.exp_cpu.leds['led18']) # this is on the active board and should be off - self.assertEqual("FF1234", self.exp_cpu.leds['led1']) # this is on a non-active board ans should still be on + self.assertEqual("FF1234", self.exp_cpu.leds['led1']) # this is on a non-active board ans should still be on self.assertEqual("467FFF", self.exp_cpu.leds['led3']) def _test_grb_led(self): # test led10 grb - self.exp_cpu.expected_commands = {'RD@B40:014212ff34': '',} + self.exp_cpu.expected_commands = { + 'RD@B40:014212ff34': '', # set leds on 0071 + } self.led10.color("ff1234") self.advance_time_and_run() self.assertEqual("12FF34", self.exp_cpu.leds['led10']) # ensure the hardware received the colors in RGB order def _test_rgbw_leds(self): - self.exp_cpu.expected_commands = {'RD@480:02050000000600ff00': '',} + self.exp_cpu.expected_commands = { + 'RD@480:02050000000600ff00': '', # set leds on neuron + } self.led25.color("ffffff") self.advance_time_and_run() - self.exp_cpu.expected_commands = {'RD@882:022200112223110000': '',} + self.exp_cpu.expected_commands = { + 'RD@882:022200112223110000': '', # set leds on 0091 + } self.led28.color("112233") self.advance_time_and_run() @@ -208,21 +219,27 @@ def _test_led_channels(self): # white = 00 00 00 FF which becomes # 09 [00 00 00] 0A [00 00 FF] - self.exp_cpu.expected_commands = {'RD@480:02090000000a0000ff': '',} + self.exp_cpu.expected_commands = { + 'RD@480:02090000000a0000ff': '', # set LED on neuron + } self.led29.color("ffffff") self.advance_time_and_run() - self.exp_cpu.expected_commands = {'RD@480:02090022110a000022': '',} + self.exp_cpu.expected_commands = { + 'RD@480:02090022110a000022': '', # set LED on neuron + } self.led29.color("223344") # -> 00112222 self.advance_time_and_run() def _test_led_software_fade(self): - - self.exp_cpu.expected_commands = {'RD@B40:0169151515': '', - 'RD@B40:01692b2b2b': '', - 'RD@B40:0169424242': '', - 'RD@B40:0169585858': '', - 'RD@B40:0169646464': '',} + self.exp_cpu.expected_commands = { + # set LEDs on 0071 + 'RD@B40:0169151515': '', + 'RD@B40:01692b2b2b': '', + 'RD@B40:0169424242': '', + 'RD@B40:0169585858': '', + 'RD@B40:0169646464': '', + } self.led17.color(RGBColor((100, 100, 100)), fade_ms=150) self.advance_time_and_run(.04) @@ -236,6 +253,8 @@ def _test_led_software_fade(self): def _test_lew_hardware_fade(self): # This is also tested via the config file and the expected commands - self.exp_cpu.expected_commands = {'RF@88:3E8': '',} + self.exp_cpu.expected_commands = { + 'RF@88:3E8': '', # set fade rate on 0091 + } self.machine.default_platform.exp_boards_by_name["brian"].set_led_fade(1000) self.advance_time_and_run() \ No newline at end of file From e912e9442443762b8d3e45c06025c3553634d2eb Mon Sep 17 00:00:00 2001 From: Alex Lobascio Date: Thu, 1 Jan 2026 13:19:23 -0800 Subject: [PATCH 2/7] fast light channel generation properly handles channel offsetting with > 1 carry value though this should never happen in practice since we only call loop range(3) times, so +0, +1, and +2. Even a 4-channel light could only carry once, so really this should only matter in the 5+ channel case (or perhaps if you can start with an offset of +3, which really should be a carry+1 with 0 offset --- mpf/platforms/fast/fast.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/mpf/platforms/fast/fast.py b/mpf/platforms/fast/fast.py index 6e7dc0a79..aa5a738cc 100644 --- a/mpf/platforms/fast/fast.py +++ b/mpf/platforms/fast/fast.py @@ -752,12 +752,13 @@ def _parse_led_light_number(self, number): result = [] for i in range(3): working_parts = parts.copy() - if i + channel > 2: + absolute_channel = channel + i + if absolute_channel > 2: # Channel rolls over, increment the LED number - working_parts[3] = str(int(working_parts[3]) + 1) - working_parts[4] = str((channel + i) % 3) + working_parts[3] = str(int(working_parts[3]) + absolute_channel // 3) + working_parts[4] = str(absolute_channel % 3) else: - working_parts[4] = str(channel + i) + working_parts[4] = str(absolute_channel) result.append({'number': '-'.join(working_parts)}) return result raise AssertionError(f"Invalid LED channel: {channel}") From 7dddc78cded0b6eb14d6431b338d3267058880a0 Mon Sep 17 00:00:00 2001 From: Alex Lobascio Date: Thu, 1 Jan 2026 15:23:35 -0800 Subject: [PATCH 3/7] split FAST.configure_light into separate private methods --- mpf/platforms/fast/fast.py | 131 +++++++++++++++++++------------------ 1 file changed, 67 insertions(+), 64 deletions(-) diff --git a/mpf/platforms/fast/fast.py b/mpf/platforms/fast/fast.py index aa5a738cc..61bdb7f50 100644 --- a/mpf/platforms/fast/fast.py +++ b/mpf/platforms/fast/fast.py @@ -562,12 +562,10 @@ def configure_switch(self, number: str, config: SwitchConfig, platform_config: d raise AssertionError("Switch needs a number") if not self.serial_connections['net']: - raise AssertionError("A request was made to configure a FAST " - "switch, but no connection to a NET processor" - "is available") + raise AssertionError("A request was made to configure a FAST switch, " + "but no connection to a NET processor is available") - if self.is_retro: - # translate switch num to FAST switch + if self.is_retro: # translate switch num to FAST switch try: number = fast_defines.RETRO_SWITCH_MAP[str(number).upper()] except KeyError: @@ -584,7 +582,6 @@ def configure_switch(self, number: str, config: SwitchConfig, platform_config: d return switch - # pylint: disable-msg=too-many-locals def configure_light(self, number, subtype, config, platform_settings) -> LightPlatformInterface: """Configure light in platform.""" del platform_settings @@ -601,80 +598,86 @@ def configure_light(self, number, subtype, config, platform_settings) -> LightPl # split into board name, breakout, port, led parts = parts.split('-') - if parts[0] in self.exp_boards_by_name: - # this is an expansion board LED in config file format - exp_board = self.exp_boards_by_name[parts[0]] + if parts[0] in self.exp_boards_by_name: # this is an expansion board LED in config file format + return self._add_exp_led_with_config_format(parts, channel, config.name) - try: - _, port, led = parts - breakout = '0' - except ValueError: - _, breakout, port, led = parts - breakout = breakout.strip('b') + if int(parts[0]) > 255: # EXP LED in int form, which is how "previous:" values are calculated + return self._add_exp_led_with_int_format(parts, channel) - # ports are always 1-4, but some EXP boards have more which are labeled 5-8 - # Those are really 1-4 of the next breakout board, so if we get a port > 4 - # then sort it out to the real internal values - if int(port) > 4: - # assume 4 LED ports per breakout, could change to a lookup - breakout = str((int(port) - 1) // 4) - port = str((int(port) - 1) % 4 + 1) + # else it's a Nano LED + return self._add_nano_led(parts, channel) - try: - brk_board = exp_board.breakouts[breakout] - except KeyError: - # TODO change to mpf config exception - raise AssertionError(f'Board {exp_board} does not have a config entry for Breakout {breakout}') + raise AssertionError(f"Unknown light subtype {subtype}") - index = self.port_idx_to_hex(port, led, 32, config.name) - this_led_number = f'{brk_board.address}{index}' + def _add_exp_led_with_config_format(self, parts, channel, name): + exp_board = self.exp_boards_by_name[parts[0]] + + try: + _, port, led = parts + breakout = '0' + except ValueError: + _, breakout, port, led = parts + breakout = breakout.strip('b') - # this code runs once for each channel, so it will be called 3x per LED which - # is why we check this here - if this_led_number not in self.fast_exp_leds: - self.fast_exp_leds[this_led_number] = FASTExpLED(this_led_number, - exp_board.config['led_fade_time'], self) + # ports are always 1-4, but some EXP boards have more which are labeled 5-8 + # Those are really 1-4 of the next breakout board, so if we get a port > 4 + # then sort it out to the real internal values + if int(port) > 4: + # assume 4 LED ports per breakout, could change to a lookup + breakout = str((int(port) - 1) // 4) + port = str((int(port) - 1) % 4 + 1) - fast_led_channel = FASTLEDChannel(self.fast_exp_leds[this_led_number], channel) - self.fast_exp_leds[this_led_number].add_channel(int(channel), fast_led_channel) + try: + brk_board = exp_board.breakouts[breakout] + except KeyError: + # TODO change to mpf config exception + raise AssertionError(f'Board {exp_board} does not have a config entry for Breakout {breakout}') - elif int(parts[0]) > 255: - # EXP LED in int form, which is how "previous:" values are calculated + index = self.port_idx_to_hex(port, led, 32, name) + this_led_number = f'{brk_board.address}{index}' - raw_hex_string = hex(int(parts[0]))[2:] # lowercase with 0x prefix stripped" - this_led_number = Util.normalize_hex_string(raw_hex_string, len(raw_hex_string)) + # this code runs once for each channel, so it will be called 3x per LED which + # is why we check this here + if this_led_number not in self.fast_exp_leds: + self.fast_exp_leds[this_led_number] = FASTExpLED(this_led_number, + exp_board.config['led_fade_time'], self) - exp_board = self.exp_boards_by_address[this_led_number[:2]] + fast_led_channel = FASTLEDChannel(self.fast_exp_leds[this_led_number], channel) + self.fast_exp_leds[this_led_number].add_channel(int(channel), fast_led_channel) + return fast_led_channel - if this_led_number not in self.fast_exp_leds: - # RGBW LEDs could span multiple FAST LEDs, so make sure it exists - self.fast_exp_leds[this_led_number] = FASTExpLED(this_led_number, - exp_board.config['led_fade_time'], self) + def _add_exp_led_with_int_format(self, parts, channel): + raw_hex_string = hex(int(parts[0]))[2:] # lowercase with 0x prefix stripped" + this_led_number = Util.normalize_hex_string(raw_hex_string, len(raw_hex_string)) - fast_led_channel = FASTLEDChannel(self.fast_exp_leds[this_led_number], channel) - self.fast_exp_leds[this_led_number].add_channel(int(channel), fast_led_channel) + exp_board = self.exp_boards_by_address[this_led_number[:2]] - else: - # Nano LED + if this_led_number not in self.fast_exp_leds: + # RGBW LEDs could span multiple FAST LEDs, so make sure it exists + self.fast_exp_leds[this_led_number] = FASTExpLED(this_led_number, + exp_board.config['led_fade_time'], self) - try: - number = self.port_idx_to_hex(parts[0], parts[1], 64) - except IndexError: - # this is a legacy LED number as an int - number = f'{int(parts[0]):02X}' + fast_led_channel = FASTLEDChannel(self.fast_exp_leds[this_led_number], channel) + self.fast_exp_leds[this_led_number].add_channel(int(channel), fast_led_channel) + return fast_led_channel - if number not in self.fast_rgb_leds: - try: - self.fast_rgb_leds[number] = FASTRGBLED(number, self) - except KeyError: - # This number is not valid - raise ConfigFileError(f"Invalid LED number: {'_'.join(parts)}", 3, self.log.name) + def _add_nano_led(self, parts, channel): + try: + number = self.port_idx_to_hex(parts[0], parts[1], 64) + except IndexError: + # this is a legacy LED number as an int + number = f'{int(parts[0]):02X}' - fast_led_channel = FASTLEDChannel(self.fast_rgb_leds[number], channel) - self.fast_rgb_leds[number].add_channel(int(channel), fast_led_channel) + if number not in self.fast_rgb_leds: + try: + self.fast_rgb_leds[number] = FASTRGBLED(number, self) + except KeyError: + # This number is not valid + raise ConfigFileError(f"Invalid LED number: {'_'.join(parts)}", 3, self.log.name) - return fast_led_channel - raise AssertionError(f"Unknown light subtype {subtype}") + fast_led_channel = FASTLEDChannel(self.fast_rgb_leds[number], channel) + self.fast_rgb_leds[number].add_channel(int(channel), fast_led_channel) + return fast_led_channel def port_idx_to_hex(self, port, device_num, devices_per_port, name=None): """Converts port number and LED index into the proper FAST hex number. From f64a9c9e4775d1b3df6dcde58dd37278145a65ff Mon Sep 17 00:00:00 2001 From: Alex Lobascio Date: Thu, 1 Jan 2026 20:59:55 -0800 Subject: [PATCH 4/7] extract private helpers for various generic light config steps for readability. The config params are generally pulled out inside the initialize in these, rather than from the self config reference within the function -- this is intentional to try and keep config reading inside initialize itself --- mpf/devices/light.py | 129 ++++++++++++++++++++++--------------------- 1 file changed, 66 insertions(+), 63 deletions(-) diff --git a/mpf/devices/light.py b/mpf/devices/light.py index 7362af559..3b17a6aa6 100644 --- a/mpf/devices/light.py +++ b/mpf/devices/light.py @@ -275,10 +275,7 @@ def _load_hw_driver(self, channel, color): if not platform.features['allow_empty_numbers'] and channel['number'] is None: self.raise_config_error("Light must have a number.", 1) - config = LightConfig( - name=self.name, - color=LightConfigColors[color.upper()] - ) + config = LightConfig(name=self.name, color=LightConfigColors[color.upper()]) try: return platform.configure_light(channel['number'], channel['subtype'], config, channel['platform_settings']) @@ -290,22 +287,7 @@ async def _initialize(self): await super()._initialize() try: if self.config['previous']: - if self.config['previous'].name == self.name: - self.raise_config_error( - "Failed to configure light {} in platform. 'previous' value cannot refer to itself.". - format(self.name), 8) - - # If we are in development mode, do a robust tree traversal to catch infinite light loops - if not self.machine.options['production']: - tree = [self.name] - prev = self.config['previous'] - while prev: - tree.append(prev.name) - prev = prev.config.get('previous') - if prev is not None and prev.name in tree: - tree.append(prev.name) - self.raise_config_error("Cyclical light chain found: {}".format(" -> ".join(tree)), 9) - + self._detect_previous_reference_loop() await self.config['previous'].wait_for_loaded() start_channel = self.config['previous'].get_successor_number() self._load_hw_driver_sequentially(start_channel) @@ -313,48 +295,63 @@ async def _initialize(self): self._load_hw_driver_sequentially(self.config['start_channel']) else: self._load_hw_drivers() - self._drivers_loaded.set_result(True) + self._drivers_loaded.set_result(True) self.config['default_on_color'] = RGBColor(self.config['default_on_color']) - if self.config['color_correction_profile'] is not None: - profile_name = self.config['color_correction_profile'] - elif 'light_settings' in self.machine.config and \ - self.machine.config['light_settings']['default_color_correction_profile'] is not None: - profile_name = self.machine.config['light_settings']['default_color_correction_profile'] - else: - profile_name = None - - if profile_name: - if profile_name in self.machine.light_controller.light_color_correction_profiles: - profile = self.machine.light_controller.light_color_correction_profiles[profile_name] - - if profile is not None: - self._set_color_correction_profile(profile) - else: # pragma: no cover - error = "Color correction profile '{}' was specified for light '{}'"\ - " but the color correction profile does not exist."\ - .format(profile_name, self.name) - self.error_log(error) - raise ValueError(error) - - if self.config['fade_ms'] is not None: - self.default_fade_ms = self.config['fade_ms'] - else: - self.default_fade_ms = (self.machine.config['light_settings'] - ['default_fade_ms']) - - if len(self.hw_drivers) == 4 and all(channel in self.hw_drivers - for channel in ['red', 'green', 'blue', 'white']): - self._rbgw_style = self.machine.config['mpf']['rgbw_white_behavior'] + self._apply_color_correction_profile(self.config['color_correction_profile']) + self._apply_fade(self.config['fade_ms']) + self._apply_rgbw_style() - self.debug_log("Initializing Light. CC Profile: %s, " - "Default fade: %sms", self._color_correction_profile, + self.debug_log("Initializing Light. CC Profile: %s, Default fade: %sms", + self._color_correction_profile, self.default_fade_ms) except Exception: self._drivers_loaded.cancel() raise + def _detect_previous_reference_loop(self): + if self.config['previous'].name == self.name: + self.raise_config_error( + "Failed to configure light {} in platform. 'previous' value cannot refer to itself.". + format(self.name), 8) + + # If we are in development mode, do a robust tree traversal to catch infinite light loops + if not self.machine.options['production']: + tree = [self.name] + prev = self.config['previous'] + while prev: + tree.append(prev.name) + prev = prev.config.get('previous') + if prev is not None and prev.name in tree: + tree.append(prev.name) + self.raise_config_error("Cyclical light chain found: {}".format(" -> ".join(tree)), 9) + + def _apply_color_correction_profile(self, profile_value): + profile_name = self._determine_color_correction_profile(profile_value) + if profile_name: + if profile_name in self.machine.light_controller.light_color_correction_profiles: + profile = self.machine.light_controller.light_color_correction_profiles[profile_name] + + if profile is not None: + self._set_color_correction_profile(profile) + else: # pragma: no cover + error = "Color correction profile '{}' was specified for light '{}'"\ + " but the color correction profile does not exist."\ + .format(profile_name, self.name) + self.error_log(error) + raise ValueError(error) + + def _determine_color_correction_profile(self, profile_value): + if profile_value is not None: + return profile_value + + if 'light_settings' in self.machine.config and \ + self.machine.config['light_settings']['default_color_correction_profile'] is not None: + return self.machine.config['light_settings']['default_color_correction_profile'] + + return None + def _set_color_correction_profile(self, profile): """Apply a color correction profile to this light. @@ -365,6 +362,19 @@ def _set_color_correction_profile(self, profile): """ self._color_correction_profile = profile + def _apply_fade(self, fade_value): + if fade_value is not None: + self.default_fade_ms = fade_value + else: + self.default_fade_ms = self.machine.config['light_settings']['default_fade_ms'] + + def _apply_rgbw_style(self): + rgbw_all_present = all(channel in self.hw_drivers + for channel in ['red', 'green', 'blue', 'white']) + + if len(self.hw_drivers) == 4 and rgbw_all_present: + self._rbgw_style = self.machine.config['mpf']['rgbw_white_behavior'] + # pylint: disable-msg=too-many-arguments def color(self, color, fade_ms=None, priority=0, key=None, start_time=None): """Add or update a color entry in this light's stack. @@ -392,8 +402,7 @@ def color(self, color, fade_ms=None, priority=0, key=None, start_time=None): """ if self._debug: self.debug_log("Received color() command. color: %s, fade_ms: %s " - "priority: %s, key: %s", color, fade_ms, priority, - key) + "priority: %s, key: %s", color, fade_ms, priority, key) if isinstance(color, str) and color == "on": color = self.config['default_on_color'] @@ -440,8 +449,7 @@ def off(self, fade_ms=None, priority=0, key=None, **kwargs): fade_ms: duration of fade """ del kwargs - self.color(color=self._off_color, fade_ms=fade_ms, priority=priority, - key=key) + self.color(color=self._off_color, fade_ms=fade_ms, priority=priority, key=key) # pylint: disable-msg=too-many-arguments def _add_to_stack(self, color, fade_ms, priority, key, start_time): @@ -473,12 +481,7 @@ def _add_to_stack(self, color, fade_ms, priority, key, start_time): if self.stack: self._remove_from_stack_by_key(key) - self.stack.append(LightStackEntry(priority, - key, - start_time, - color_below, - dest_time, - color)) + self.stack.append(LightStackEntry(priority, key, start_time, color_below, dest_time, color)) if len(self.stack) > 1: self.stack.sort(reverse=True) From 3663c604778dfe13aa5ed3faaba1bf0bdffe6f8c Mon Sep 17 00:00:00 2001 From: Alex Lobascio Date: Thu, 1 Jan 2026 22:16:40 -0800 Subject: [PATCH 5/7] extract more helper functions in light.py and reorder functions into rought execution order for readability --- mpf/devices/light.py | 232 +++++++++++++++++++++---------------------- 1 file changed, 116 insertions(+), 116 deletions(-) diff --git a/mpf/devices/light.py b/mpf/devices/light.py index 3b17a6aa6..79a004b5a 100644 --- a/mpf/devices/light.py +++ b/mpf/devices/light.py @@ -138,47 +138,21 @@ def _check_duplicate_light_numbers(machine: MachineController, **kwargs): check_set.add(key) - def _map_channels_to_colors(self, channel_list) -> dict: - if self.config['type']: - color_channels = self.config['type'] - else: - if len(channel_list) == 1: - # for one channel default to a white channel - color_channels = "w" - elif len(channel_list) == 3: - # for three channels default to RGB - color_channels = "rgb" - else: - self.raise_config_error("Please provide a type for light {}. No default for channels {}.". - format(self.name, channel_list), 11) + def _color_letter_to_name(self, letter): + if letter == 'r': + return 'red' - if len(channel_list) != len(color_channels): - self.raise_config_error("Type {} does not match channels {} for light {}".format( - color_channels, channel_list, self.name), 12) + if letter == 'g': + return 'green' - channels = {} # type: Dict[str, List[Any]] - for color_name in color_channels: - # red channel - if color_name == 'r': - full_color_name = "red" - # green channel - elif color_name == 'g': - full_color_name = "green" - # blue channel - elif color_name == 'b': - full_color_name = "blue" - # simple white channel - elif color_name == 'w': - full_color_name = "white" - else: - self.raise_config_error("Invalid element {} in type {} of light {}".format( - color_name, self.config['type'], self.name), 13) + if letter == 'b': + return 'blue' - if full_color_name not in channels: - channels[full_color_name] = [] - channels[full_color_name].append(channel_list.pop(0)) + if letter == 'w': + return 'white' - return channels + self.raise_config_error("Invalid element {} in type {} of light {}".format( + letter, self.config['type'], self.name), 14) def wait_for_loaded(self): """Return future.""" @@ -195,6 +169,50 @@ def get_successor_number(self): sorted_channels = sorted(all_drivers) return sorted_channels[-1].get_successor_number() + async def _initialize(self): + await super()._initialize() + try: + if self.config['previous']: + self._detect_previous_reference_loop() + await self.config['previous'].wait_for_loaded() + start_channel = self.config['previous'].get_successor_number() + self._load_hw_driver_sequentially(start_channel) + elif self.config['start_channel']: + self._load_hw_driver_sequentially(self.config['start_channel']) + else: + self._load_hw_drivers() + + self._drivers_loaded.set_result(True) + self.config['default_on_color'] = RGBColor(self.config['default_on_color']) + + self._apply_color_correction_profile(self.config['color_correction_profile']) + self._apply_fade(self.config['fade_ms']) + self._apply_rgbw_style() + + self.debug_log("Initializing Light. CC Profile: %s, Default fade: %sms", + self._color_correction_profile, + self.default_fade_ms) + except Exception: + self._drivers_loaded.cancel() + raise + + def _detect_previous_reference_loop(self): + if self.config['previous'].name == self.name: + self.raise_config_error( + "Failed to configure light {} in platform. 'previous' value cannot refer to itself.". + format(self.name), 8) + + # If we are in development mode, do a robust tree traversal to catch infinite light loops + if not self.machine.options['production']: + tree = [self.name] + prev = self.config['previous'] + while prev: + tree.append(prev.name) + prev = prev.config.get('previous') + if prev is not None and prev.name in tree: + tree.append(prev.name) + self.raise_config_error("Cyclical light chain found: {}".format(" -> ".join(tree)), 9) + def _load_hw_driver_sequentially(self, next_channel): if self.config['number'] or self.config['channels']: self.raise_config_error("Cannot use start_channel/previous and number or channels.", 3) @@ -202,22 +220,8 @@ def _load_hw_driver_sequentially(self, next_channel): self.raise_config_error("Cannot use previous or start_channel without type. " "Add a type setting to your light.", 2) - for color_name in self.config['type']: - # red channel - if color_name == 'r': - full_color_name = "red" - # green channel - elif color_name == 'g': - full_color_name = "green" - # blue channel - elif color_name == 'b': - full_color_name = "blue" - # simple white channel - elif color_name == 'w': - full_color_name = "white" - else: - self.raise_config_error("Invalid element {} in type {} of light {}".format( - color_name, self.config['type'], self.name), 14) + for color_letter in self.config['type']: + full_color_name = self._color_letter_to_name(color_letter) if full_color_name not in self.hw_drivers: self.hw_drivers[full_color_name] = [] @@ -230,32 +234,9 @@ def _load_hw_driver_sequentially(self, next_channel): def _load_hw_drivers(self): if not self.config['channels']: - # get channels from number + platform - platform = self.machine.get_platform_sections('lights', self.config['platform']) - platform.assert_has_feature("lights") - try: - channel_list = platform.parse_light_number_to_channels(self.config['number'], self.config['subtype']) - except AssertionError as e: - self.raise_config_error("Failed to parse light number {} in platform. See error above". - format(self.name), 4, source_exception=e) - - # copy platform and platform_settings to all channels - for channel, _ in enumerate(channel_list): - channel_list[channel]['subtype'] = self.config['subtype'] - channel_list[channel]['platform'] = self.config['platform'] - channel_list[channel]['platform_settings'] = self.config['platform_settings'] - # map channels to colors - channels = self._map_channels_to_colors(channel_list) + channels = self._derive_platform_channels() else: - if self.config['number'] or self.config['platform'] or self.config['platform_settings']: - self.raise_config_error("Light {} cannot contain platform/platform_settings/number and channels". - format(self.name), 5) - # alternatively use channels from config - channels = self.config['channels'] - # ensure that we got lists - for channel in channels: - if not isinstance(channels[channel], list): - channels[channel] = [channels[channel]] + channels = self._prepare_config_channels() if not channels: self.raise_config_error("Light {} has no channels.".format(self.name), 6) @@ -283,49 +264,68 @@ def _load_hw_driver(self, channel, color): self.raise_config_error("Failed to configure light {} in platform. See error above".format(self.name), 7, source_exception=e) - async def _initialize(self): - await super()._initialize() + def _prepare_config_channels(self): + if self.config['number'] or self.config['platform'] or self.config['platform_settings']: + self.raise_config_error("Light {} cannot contain platform/platform_settings/number and channels". + format(self.name), 5) + channels = self.config['channels'] + + # ensure that each color's channels are a list, not single value + for channel in channels: + if not isinstance(channels[channel], list): + channels[channel] = [channels[channel]] + + return channels + + def _derive_platform_channels(self): + # get channels from number + platform + platform = self.machine.get_platform_sections('lights', self.config['platform']) + platform.assert_has_feature("lights") + try: - if self.config['previous']: - self._detect_previous_reference_loop() - await self.config['previous'].wait_for_loaded() - start_channel = self.config['previous'].get_successor_number() - self._load_hw_driver_sequentially(start_channel) - elif self.config['start_channel']: - self._load_hw_driver_sequentially(self.config['start_channel']) - else: - self._load_hw_drivers() + channel_list = platform.parse_light_number_to_channels(self.config['number'], self.config['subtype']) + except AssertionError as e: + self.raise_config_error("Failed to parse light number {} in platform. See error above". + format(self.name), 4, source_exception=e) - self._drivers_loaded.set_result(True) - self.config['default_on_color'] = RGBColor(self.config['default_on_color']) + # copy platform and platform_settings to all channels + for channel, _ in enumerate(channel_list): + channel_list[channel]['subtype'] = self.config['subtype'] + channel_list[channel]['platform'] = self.config['platform'] + channel_list[channel]['platform_settings'] = self.config['platform_settings'] + # map channels to colors + return self._map_channels_to_colors(channel_list) - self._apply_color_correction_profile(self.config['color_correction_profile']) - self._apply_fade(self.config['fade_ms']) - self._apply_rgbw_style() + def _map_channels_to_colors(self, channel_list) -> dict: + if self.config['type']: + color_channels = self.config['type'] + else: + if len(channel_list) == 1: + # for one channel default to a white channel + color_channels = "w" + elif len(channel_list) == 3: + # TODO this seems like a bug waiting to happen -- what if the channel list ISNT R G B ordered? + # -- the blind .pop(0) down below combined with the "for letter in 'rgb'" seems risky - self.debug_log("Initializing Light. CC Profile: %s, Default fade: %sms", - self._color_correction_profile, - self.default_fade_ms) - except Exception: - self._drivers_loaded.cancel() - raise + # for three channels default to RGB + color_channels = "rgb" + else: + self.raise_config_error("Please provide a type for light {}. No default for channels {}.". + format(self.name, channel_list), 11) - def _detect_previous_reference_loop(self): - if self.config['previous'].name == self.name: - self.raise_config_error( - "Failed to configure light {} in platform. 'previous' value cannot refer to itself.". - format(self.name), 8) + if len(channel_list) != len(color_channels): + self.raise_config_error("Type {} does not match channels {} for light {}".format( + color_channels, channel_list, self.name), 12) - # If we are in development mode, do a robust tree traversal to catch infinite light loops - if not self.machine.options['production']: - tree = [self.name] - prev = self.config['previous'] - while prev: - tree.append(prev.name) - prev = prev.config.get('previous') - if prev is not None and prev.name in tree: - tree.append(prev.name) - self.raise_config_error("Cyclical light chain found: {}".format(" -> ".join(tree)), 9) + channels = {} # type: Dict[str, List[Any]] + for color_letter in color_channels: + full_color_name = self._color_letter_to_name(color_letter) + if full_color_name not in channels: + channels[full_color_name] = [] + + channels[full_color_name].append(channel_list.pop(0)) + + return channels def _apply_color_correction_profile(self, profile_value): profile_name = self._determine_color_correction_profile(profile_value) @@ -370,7 +370,7 @@ def _apply_fade(self, fade_value): def _apply_rgbw_style(self): rgbw_all_present = all(channel in self.hw_drivers - for channel in ['red', 'green', 'blue', 'white']) + for channel in ['red', 'green', 'blue', 'white']) if len(self.hw_drivers) == 4 and rgbw_all_present: self._rbgw_style = self.machine.config['mpf']['rgbw_white_behavior'] From 5fb08f78519a7e096ee3a2df444e6266e7261963 Mon Sep 17 00:00:00 2001 From: Alex Lobascio Date: Thu, 1 Jan 2026 22:36:53 -0800 Subject: [PATCH 6/7] refactor lights and fast light-related private function to better orient the seams with the data structures and relevancy --- mpf/devices/light.py | 6 ++-- mpf/platforms/fast/fast.py | 67 ++++++++++++++++++++++---------------- 2 files changed, 42 insertions(+), 31 deletions(-) diff --git a/mpf/devices/light.py b/mpf/devices/light.py index 79a004b5a..0e0461296 100644 --- a/mpf/devices/light.py +++ b/mpf/devices/light.py @@ -293,10 +293,8 @@ def _derive_platform_channels(self): channel_list[channel]['subtype'] = self.config['subtype'] channel_list[channel]['platform'] = self.config['platform'] channel_list[channel]['platform_settings'] = self.config['platform_settings'] - # map channels to colors - return self._map_channels_to_colors(channel_list) - def _map_channels_to_colors(self, channel_list) -> dict: + # map channels to colors if self.config['type']: color_channels = self.config['type'] else: @@ -306,6 +304,8 @@ def _map_channels_to_colors(self, channel_list) -> dict: elif len(channel_list) == 3: # TODO this seems like a bug waiting to happen -- what if the channel list ISNT R G B ordered? # -- the blind .pop(0) down below combined with the "for letter in 'rgb'" seems risky + # and the channel list order is up to the platform to provide, which doesnt seem clear in the + # parse_light_number_to_channels interface definition at all # for three channels default to RGB color_channels = "rgb" diff --git a/mpf/platforms/fast/fast.py b/mpf/platforms/fast/fast.py index 61bdb7f50..b4b78219f 100644 --- a/mpf/platforms/fast/fast.py +++ b/mpf/platforms/fast/fast.py @@ -740,34 +740,45 @@ def _parse_led_light_number(self, number): if parts[0] in self.exp_boards_by_name: # This is an expansion board LED - if not parts[1].startswith('b'): - # No breakout specified, so we insert a b0 - parts.insert(1, 'b0') - - if len(parts) == 4: - # No channel specified, so we return 3 channels 0,1,2 - return [{'number': '-'.join(parts) + f'-{i}'} for i in range(3)] - - if len(parts) == 5: - # We have a channel specified - channel = int(parts[4]) - if 0 <= channel <= 2: - result = [] - for i in range(3): - working_parts = parts.copy() - absolute_channel = channel + i - if absolute_channel > 2: - # Channel rolls over, increment the LED number - working_parts[3] = str(int(working_parts[3]) + absolute_channel // 3) - working_parts[4] = str(absolute_channel % 3) - else: - working_parts[4] = str(absolute_channel) - result.append({'number': '-'.join(working_parts)}) - return result - raise AssertionError(f"Invalid LED channel: {channel}") - raise AssertionError(f"Invalid LED number: {number}") - - # This is a Nano LED + return self._parse_expansion_board_light_number(number, parts) + + # Nano LEDs do not have expansion board names + return self._parse_nano_light_number(number, parts) + + def _parse_expansion_board_light_number(self, number, parts): + if not parts[1].startswith('b'): + # No breakout specified, so we insert a b0 + parts.insert(1, 'b0') + + if len(parts) == 4: + # No channel specified, so we return 3 channels 0,1,2 + return [{'number': '-'.join(parts) + f'-{i}'} for i in range(3)] + + if len(parts) == 5: + # We have a channel specified + channel = int(parts[4]) + if 0 <= channel <= 2: + result = [] + for i in range(3): + working_parts = parts.copy() + absolute_channel = channel + i + if absolute_channel > 2: + # Channel rolls over, increment the LED number + working_parts[3] = str(int(working_parts[3]) + absolute_channel // 3) + working_parts[4] = str(absolute_channel % 3) + else: + working_parts[4] = str(absolute_channel) + result.append({'number': '-'.join(working_parts)}) + + return result + + # channel out of bounds + raise AssertionError(f"Invalid LED channel: {channel}") + + # wrong number of parts + raise AssertionError(f"Invalid LED number: {number}") + + def _parse_nano_light_number(self, number, parts): if '-' in str(number): # num = list(map(int, str(number).split('-'))) # index = num[0] * 64 + num[1] From dd67090926bcec44d400f3566c432c3883f18203 Mon Sep 17 00:00:00 2001 From: Alex Lobascio Date: Thu, 1 Jan 2026 22:59:22 -0800 Subject: [PATCH 7/7] add absolute_channel helper to FASTLEDChannel and continue lights fixup --- mpf/devices/light.py | 13 +++++++++---- mpf/platforms/fast/fast_led.py | 6 +++++- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/mpf/devices/light.py b/mpf/devices/light.py index 0e0461296..03b5a1a82 100644 --- a/mpf/devices/light.py +++ b/mpf/devices/light.py @@ -213,13 +213,18 @@ def _detect_previous_reference_loop(self): tree.append(prev.name) self.raise_config_error("Cyclical light chain found: {}".format(" -> ".join(tree)), 9) - def _load_hw_driver_sequentially(self, next_channel): - if self.config['number'] or self.config['channels']: - self.raise_config_error("Cannot use start_channel/previous and number or channels.", 3) + def _load_hw_driver_sequentially(self, first_channel): + if self.config['number']: + self.raise_config_error("Cannot use number with start_channel/previous.", 15) + + if self.config['channels']: + self.raise_config_error("Cannot use channels list with start_channel/previous.", 16) + if not self.config['type']: self.raise_config_error("Cannot use previous or start_channel without type. " "Add a type setting to your light.", 2) + next_channel = first_channel for color_letter in self.config['type']: full_color_name = self._color_letter_to_name(color_letter) @@ -229,7 +234,7 @@ def _load_hw_driver_sequentially(self, next_channel): 'platform_settings': self.config['platform_settings'], 'number': next_channel} channel = self.machine.config_validator.validate_config("light_channels", channel) driver = self._load_hw_driver(channel, full_color_name) - next_channel = driver.get_successor_number() + next_channel = driver.get_successor_number() # increment for the next loop self.hw_drivers[full_color_name].append(driver) def _load_hw_drivers(self): diff --git a/mpf/platforms/fast/fast_led.py b/mpf/platforms/fast/fast_led.py index b14503721..df0394bb4 100644 --- a/mpf/platforms/fast/fast_led.py +++ b/mpf/platforms/fast/fast_led.py @@ -143,7 +143,11 @@ def get_board_name(self): def is_successor_of(self, other): """Return true if the other light has the previous number.""" - return self.led.number_int * 3 + self.channel == other.led.number_int * 3 + other.channel + 1 + return self.absolute_channel() == other.absolute_channel() + 1 + + def absolute_channel(self): + """Returns channel offset from 0-0 as a single number.""" + return self.led.number_int * 3 + self.channel def get_successor_number(self): """Return next number. We want this in the config format."""