diff --git a/src/rov/rov_led_controller/rov_led_controller/SK6812RGBW.py b/src/rov/rov_led_controller/rov_led_controller/SK6812RGBW.py new file mode 100644 index 0000000..6c67905 --- /dev/null +++ b/src/rov/rov_led_controller/rov_led_controller/SK6812RGBW.py @@ -0,0 +1,100 @@ +import spidev +import sys +import time + +class SPItoSK(): + def __init__(self, numLeds, bus = 1, device = 0): + self.ledCount = numLeds + self.ledBrightness = 1.0 + self.spi = spidev.SpiDev() + self.spi.open(bus, device) + self.spi.max_speed_hz = 2500000 + # string of bits to send to spi + self.binMsg = 0b100100100100100100100100100100100100100100100100100100100100100100100100100100100100100100100100 + for i in range(numLeds - 1): + # 96 bits per led, 3 bits per real bit + self.binMsg = (self.binMsg << 96) | 0b100100100100100100100100100100100100100100100100100100100100100100100100100100100100100100100100 + + def __del__(self): + self.spi.close() + print("destructor") + + def set_LED_color(self, ledNum, R, G, B, W): + """ + Adjust self.binMsg to the color desired + :param ledNum: the led to change starting with index 0 + :param R, G, B, W: RGBW values from 0-255 + """ + if (R > 255) or (G > 255) or (B > 255) or (W > 255) or (R < 0) or (G < 0) or (B < 0) or (W < 0): + print("RGBW values must be in the range 0-255") + sys.exit(-1) + msgStartPos = (ledNum * 96) + 2 + # Set G in binary message + self._formatBinMsg(msgStartPos, int(round(G * self.ledBrightness))) + # Set R in binary message + self._formatBinMsg(msgStartPos + 24, int(round(R * self.ledBrightness))) + # Set B in binary message + self._formatBinMsg(msgStartPos + (24 * 2), int(round(B * self.ledBrightness))) + # Set W in binary message + self._formatBinMsg(msgStartPos + (24 * 3), int(round(W * self.ledBrightness))) + + # debug + # print("led_num: %d, R: %d G: %d B: %d W: %d" % (ledNum, round(R * self.ledBrightness), round(G * self.ledBrightness), round(B * self.ledBrightness), round(W * self.ledBrightness))) + + def LED_show(self): + """ + Signals the LEDs + """ + bytesToSend = [] + binMsgCpy = self.binMsg + while binMsgCpy: + bytesToSend.insert(0, binMsgCpy & ((1 << 8) - 1)) # Extract lowest 8 bits + binMsgCpy >>= 8 # Shift right by 8 bits + + self.spi.xfer3(bytesToSend, 2500000, 0, 8) + + # debug + # print(bin(self.binMsg)) + # offString = 100100100100100100100100100100100100100100100100100100100100100100100100100100100100100100100100 + # print() + # for i in bytesToSend: + # print(bin(i)) + # time.sleep(80e-6) + + def set_brightness(self, brightness): + """ + Set the global brightness of LEDs + :param brightness: Brightness of LEDs 0-1, clamps if values are outside the range + """ + brightness = max(0.0, min(brightness, 1.0)) + self.ledBrightness = brightness + + def LED_OFF_ALL(self): + """ + Turns off LEDs + """ + self.binMsg = 0b100100100100100100100100100100100100100100100100100100100100100100100100100100100100100100100100 + for i in range(self.ledCount - 1): + # 96 bits per led, 3 bits per real bit + self.binMsg = (self.binMsg << 96) | 0b100100100100100100100100100100100100100100100100100100100100100100100100100100100100100100100100 + self.LED_show() + + def _formatBinMsg(self, startPos, colorNum): + number = format(colorNum, '08b') + for i in range(8): + msgIndexPos = startPos + (3 * i) + colorBit = number[i] + if (colorBit == "0") and bin(self.binMsg)[msgIndexPos:msgIndexPos + 3] == "110": + self._flipBit(msgIndexPos) + elif (colorBit == "1") and bin(self.binMsg)[msgIndexPos:msgIndexPos + 3] == "100": + self._flipBit(msgIndexPos) + + def _flipBit(self, position): + mask = 1 << len(bin(self.binMsg)[2:]) - position + self.binMsg = self.binMsg ^ mask + + +if __name__ == "__main__": + ledStrip = SPItoSK(1) + ledStrip.set_LED_color(0, 255, 255, 255, 255) + ledStrip.LED_show() \ No newline at end of file diff --git a/src/rov/rov_led_controller/rov_led_controller/WS2812.py b/src/rov/rov_led_controller/rov_led_controller/WS2812.py new file mode 100644 index 0000000..762ef6d --- /dev/null +++ b/src/rov/rov_led_controller/rov_led_controller/WS2812.py @@ -0,0 +1,74 @@ +# lifted from this lovely repo: https://github.com/seitomatsubara/Jetson-nano-WS2812-LED- + +import spidev +import sys + +class SPItoWS(): + def __init__(self, ledc, bus=1, device=0): + self.led_count = ledc + self.X = '' # X is signal of WS281x + for i in range(self.led_count): + self.X = self.X + "100100100100100100100100100100100100100100100100100100100100100100100100" + self.led_brightness = 1.0 + self.spi = spidev.SpiDev() + self.spi.open(bus, device) + self.spi.max_speed_hz = 2400000 + + def __del__(self): + self.spi.close() + print("destructor") + + def _Bytesto3Bytes(self, num, RGB): # num is number of signal, RGB is 8 bits (1 byte) str + for i in range(8): + if RGB[i] == '0': + self.X = self.X[:num * 3 * 8 + i * 3] + '100' + self.X[num * 3 * 8 + i * 3 + 3:] + elif RGB[i] == '1': + self.X = self.X[:num * 3 * 8 + i * 3] + '110' + self.X[num * 3 * 8 + i * 3 + 3:] + + def _BytesToHex(self, Bytes): + return ''.join(["0x%02X " % x for x in Bytes]).strip() + + def LED_show(self): + Y = [] + for i in range(self.led_count * 9): + Y.append(int(self.X[i*8:(i+1)*8],2)) + WS = self._BytesToHex(Y) + self.spi.xfer3(Y, 2400000,0,8) + + def set_LED_color(self, led_num, R, G, B): + if (R > 255 or G > 255 or B > 255): + print("Invalid Value: RGB is over 255\n") + sys.exit(1) + if (led_num > self.led_count - 1): + print("Invalid Value: The number is over the number of LED") + sys.exit(1) + RR = format(round(R * self.led_brightness), '08b') + GG = format(round(G * self.led_brightness), '08b') + BB = format(round(B * self.led_brightness), '08b') + self._Bytesto3Bytes(led_num * 3, GG) + self._Bytesto3Bytes(led_num * 3 + 1, RR) + self._Bytesto3Bytes(led_num * 3 + 2, BB) + + # debug + # print("led_num: %d, R: %d G: %d B: %d" % (led_num, round(R * self.led_brightness), round(G * self.led_brightness), round(B * self.led_brightness))) + + def LED_OFF_ALL(self): + self.X = '' + for i in range(self.led_count): + self.X = self.X + "100100100100100100100100100100100100100100100100100100100100100100100100" + self.LED_show() + + def set_brightness(self, brightness): + self.led_brightness = brightness + + +if __name__ == "__main__": + import time + LED_COUNT = 3 + sig = SPItoWS(LED_COUNT) + sig.RGBto3Bytes(0, 255, 0, 0) + sig.RGBto3Bytes(1, 0, 255, 0) + sig.RGBto3Bytes(2, 0, 0, 255) + sig.LED_show() + time.sleep(1) + sig.LED_OFF_ALL() \ No newline at end of file diff --git a/src/rov/rov_led_controller/rov_led_controller/led_controller.py b/src/rov/rov_led_controller/rov_led_controller/led_controller.py index a8ca065..4533f57 100644 --- a/src/rov/rov_led_controller/rov_led_controller/led_controller.py +++ b/src/rov/rov_led_controller/rov_led_controller/led_controller.py @@ -1,128 +1,359 @@ -from common.csm_common_interfaces.msg import PinState from std_msgs.msg import String -import Jetson.GPIO as GPIO import rclpy from rclpy.node import Node -from rainbowio import colorwheel - -import board -import neopixel_spi import time import random +import colorsys +import threading +from rov_led_controller.SK6812RGBW import SPItoSK + -NUM_LIGHTS = 60 -LED_PIN = board.SPI()._pins[0] -BLACK = (0,0,0) -WHITE = (255,255,255) -RED = (255, 0, 0) -YELLOW = (255, 150, 0) -GREEN = (0, 255, 0) -CYAN = (0, 255, 255) -BLUE = (0, 0, 255) -PURPLE = (180, 0, 255) -PIXELS = neopixel_spi.NeoPixel_SPI(board.SPI(), NUM_LIGHTS) -global ledState -global ledColor +# Ring leds go first, then strip leds +NUM_RING_LEDS = 24 +NUM_STRIP_LEDS = 48 +RGBW_PIXELS = SPItoSK(NUM_RING_LEDS + NUM_STRIP_LEDS) class LEDControllerNode(Node): def __init__(self): super().__init__(node_name="LEDController") - self.state_subscriber = self.create_subscription(PinState, "set_rov_gpio", self.on_set_rov_gpio, 10) - self.publisher_ = self.create_publisher(String, 'preprogrammed_animations', 10) - self.animations = [ - "color_chase", - "rainbow_cycle", - "pulse", - "solid", - "seizure_disco" - ] - - self.colors = [ - "WHITE", - "BLACK", - "RED", - "YELLOW", - "GREEN", - "CYAN", - "BLUE", - "PURPLE" - ] + self.animations = { + "color_chase": self.color_chase, + "rainbow_cycle": self.rainbow_cycle, + "pulse": self.pulse, + "solid": self.solid, + "seizure_disco": self.seizure_disco, + "off": self.off, + "boot": self.boot + } + + self.colors = { + "WHITE": (255,255,255), + "BLACK": (0,0,0), + "RED": (255, 0, 0), + "YELLOW": (255, 150, 0), + "GREEN": (0, 255, 0), + "CYAN": (0, 255, 255), + "BLUE": (0, 0, 255), + "PURPLE": (180, 0, 255) + } + + # Calculate rainbow RGB values + self.rainbowValues = [] + hueIncrement = 1.0 / 256 + for i in range (1, 257): + rgb = colorsys.hsv_to_rgb(i * hueIncrement, 1, 1) + scaled_rgb = tuple(int(value * 255) for value in rgb) + self.rainbowValues.append(scaled_rgb) + + # color value and current animation + self.ringColor = "WHITE" + self.ringCurrAnim = "off" + self.stripColor = "WHITE" + self.stripCurrAnim = "off" # Listen for requests from the UI self.ui_subscriber = self.create_subscription(String, 'ui_requests', self.ui_request_callback, 10) - PIXELS.fill(BLACK) + # SK6812 Mutex + self.LEDMutex = threading.Lock() + + # Thread management + # Can't believe I need to wrap StopAnim in a list to pass by reference + self.ringStopAnim = [False] + self.stripStopAnim = [False] + self.stripStopMutex = threading.Lock() + + # Set leds to off when starting + self.ringAnimThread = threading.Thread(target=self.animations[self.ringCurrAnim], args=(self.colors[self.ringColor], self._set_ring_LED)) + self.stripAnimThread = threading.Thread(target=self.animations[self.stripCurrAnim], args=(self.colors[self.stripColor], self._set_strip_LED)) + self.ringAnimThread.start() + self.stripAnimThread.start() def ui_request_callback(self, msg): - if msg.data == 'get_animations': + # Split message + split = msg.data.split(",") + msgs = [s.strip() for s in split] + + # Change brightness + if len(msgs) == 3: + try: + # set and clamp brightness between 0 and 1 + brightness = max(0.0, min(float(msgs[2]), 1.0)) + RGBW_PIXELS.set_brightness(brightness) + self.get_logger().info("Changed brightness to: %f" % brightness) + except: + self.get_logger().info("Third argument, brightness, should be a number from 0-1.") + else: + RGBW_PIXELS.set_brightness(0.25) + + # Change animation and colors + if msgs[0] == 'get_animations': # Send the list of preprogrammed animations to the UI animations_msg = String() animations_msg.data = "\n".join(animation for animation in self.animations) - self.publisher_.publish(animations_msg) - self.get_logger().info('Sent preprogrammed animations to UI') - elif msg.data == 'get_colors': + self.get_logger().info(animations_msg) + elif msgs[0] == 'get_colors': colors_msg = String() colors_msg.data = "\n".join(color for color in self.colors) - self.publisher_.publish(colors_msg) - self.get_logger().info('Sent preprogrammed colors to UI') - else: + self.get_logger().info(colors_msg) + elif len(msgs) >= 2: + # Check for which leds the user wants to modify + setStripFunc = self._set_ring_LED + if msgs[0] == "strip": + setStripFunc = self._set_strip_LED + elif msgs[0] != "ring": + self.get_logger().info("First argument must be \"ring\" or \"strip\", defaulting to ring") + msgs[0] = "ring" # Check if the requested animation exists - if msg.data in self.animations: + if msgs[1] in self.animations: + # Stop current animation + if msgs[0] == "ring": + self.ringStopAnim[0] = True + self.ringAnimThread.join() + self.ringStopAnim[0] = False + elif msgs[0] == "strip": + with self.stripStopMutex: + self.stripStopAnim[0] = True + self.stripAnimThread.join() + self.stripStopAnim[0] = False # Call the method corresponding to the requested animation - - if ledState and (lastAnimation != msg.data): - exec(f"self.{msg.data}({ledColor}, 0.1)")() - else: - PIXELS.brightness(0) - lastAnimation = msg.data - elif msg.data in self.colors: - lastColor = ledColor - if ledState and (lastColor != msg.data): - exec(f"self.{lastAnimation}({msg.data}, 0.1)")() - - - def on_set_rov_gpio(self, message: PinState): - if message.pin not in LED_PIN: - pass - ledState = message.state + if msgs[0] == "ring": + self.ringCurrAnim = msgs[1] + self.get_logger().info('New ring animation received: %s' % msgs[1]) + self.ringAnimThread = threading.Thread(target=self.animations[msgs[1]], args=(self.colors[self.ringColor], setStripFunc)) + self.ringAnimThread.start() + elif msgs[0] == "strip": + self.stripCurrAnim = msgs[1] + self.get_logger().info('New strip animation received: %s' % msgs[1]) + self.stripAnimThread = threading.Thread(target=self.animations[msgs[1]], args=(self.colors[self.stripColor], setStripFunc)) + self.stripAnimThread.start() + elif msgs[1] in self.colors: + # Stop current animation + if msgs[0] == "ring": + self.ringStopAnim[0] = True + self.ringAnimThread.join() + self.ringStopAnim[0] = False + elif msgs[0] == "strip": + with self.stripStopMutex: + self.stripStopAnim[0] = True + self.stripAnimThread.join() + self.stripStopAnim[0] = False + # Change the color if message changes colors + if msgs[0] == "ring": + self.ringColor = msgs[1] + self.get_logger().info('New ring color received: %s' % msgs[1]) + self.ringAnimThread = threading.Thread(target=self.animations[self.ringCurrAnim], args=(self.colors[self.ringColor], setStripFunc)) + self.ringAnimThread.start() + elif msgs[0] == "strip": + self.stripColor = msgs[1] + self.get_logger().info('New strip color received: %s' % msgs[1]) + self.stripAnimThread = threading.Thread(target=self.animations[self.stripCurrAnim], args=(self.colors[self.stripColor], setStripFunc)) + self.stripAnimThread.start() + else: + self.get_logger().info("Second argument must be an animation or color. Send \"get_animations\" or \"get_colors\" to view options.") + else: + self.get_logger().info("Request most be formatted: \"ring\"/\"strip\", animation/color, [brightness]. Send \"get_animations\" or \"get_colors\" to view options.") + + - def color_chase(color, wait=0.1): - for i in range(NUM_LIGHTS): - PIXELS[i] = color - time.sleep(wait) - PIXELS.show() - time.sleep(0.5) - - def rainbow_cycle(color, wait=0.1): - for j in range(255): - for i in range(NUM_LIGHTS): - rc_index = (i * 256 // NUM_LIGHTS) + j - PIXELS[i] = colorwheel(rc_index & 255) - PIXELS.show() - time.sleep(wait) - - def pulse(color, wait=0.1): - PIXELS.fill(color) - for i in range(255): - PIXELS.setBrightness(i) - time.sleep(wait) - PIXELS.show() - time.sleep(wait) - - def solid(color, wait=0.1): - PIXELS.fill(color) - PIXELS.show() - - def seizure_disco(color, wait=0.1): - for i in range(NUM_LIGHTS): + def color_chase(self, color, setStripFunc): + """ + Animates a single color across the LED strip + """ + numLEDs = 0 + if setStripFunc == self._set_ring_LED: + numLEDs = NUM_RING_LEDS + stopAnim = self.ringStopAnim + else: + numLEDs = NUM_STRIP_LEDS + stopAnim = self.stripStopAnim + wait = 1/numLEDs + while stopAnim == [False]: + for i in range(numLEDs): + setStripFunc(i, color[0], color[1], color[2]) + time.sleep(wait) + self._LED_show() + if stopAnim == [True]: + break + for i in range(numLEDs): + setStripFunc(i, 0, 0, 0) + time.sleep(wait) + self._LED_show() + if stopAnim == [True]: + break + + + def rainbow_cycle(self, color, setStripFunc): + """ + Cycles a wave of rainbow over the LEDs + """ + numLEDs = 0 + if setStripFunc == self._set_ring_LED: + numLEDs = NUM_RING_LEDS + stopAnim = self.ringStopAnim + else: + numLEDs = NUM_STRIP_LEDS + stopAnim = self.stripStopAnim + wait = 0.05 + while stopAnim == [False]: + for i in range(256): + for j in range (numLEDs): + R = self.rainbowValues[(j + i) % 256][0] + G = self.rainbowValues[(j + i) % 256][1] + B = self.rainbowValues[(j + i) % 256][2] + setStripFunc(j, R, G, B) + self._LED_show() + time.sleep(wait) + if stopAnim == [True]: + break + + + def pulse(self, color, setStripFunc): + """ + Fades a single color in and out + """ + if setStripFunc == self._set_ring_LED: + stopAnim = self.ringStopAnim + else: + stopAnim = self.stripStopAnim + wait = 0.005 + while stopAnim == [False]: + for i in range(255): + R = color[0] * (i / 255.0) + G = color[1] * (i / 255.0) + B = color[2] * (i / 255.0) + self._set_all_pixels((R, G, B)) + time.sleep(wait) + self._LED_show() + if stopAnim == [True]: + break + for i in range(255): + R = color[0] * ((255 - i) / 255.0) + G = color[1] * ((255 - i) / 255.0) + B = color[2] * ((255 - i) / 255.0) + self._set_all_pixels((R, G, B)) + time.sleep(wait) + self._LED_show() + if stopAnim == [True]: + break + + + def solid(self, color, setStripFunc): + """ + Changes all LEDs to solid color + """ + self._set_all_pixels((color[0], color[1], color[2]), setStripFunc) + self._LED_show() + + + def seizure_disco(self, color, setStripFunc): + """ + Changes all LEDs to a random color + """ + if setStripFunc == self._set_ring_LED: + numLEDs = NUM_RING_LEDS + else: + numLEDs = NUM_STRIP_LEDS + for i in range(numLEDs): R = random.randint(0,255) G = random.randint(0,255) B = random.randint(0,255) - PIXELS[i].fill(R, G, B) - PIXELS.show() - time.sleep(wait) + setStripFunc(i, R, G, B) + self._LED_show() + + def off(self, color, setStripFunc): + if setStripFunc == self._set_ring_LED: + numLEDs = NUM_RING_LEDS + else: + numLEDs = NUM_STRIP_LEDS + for i in range(numLEDs): + setStripFunc(i, 0, 0, 0) + self._LED_show() + + def boot(self, color, setStripFunc): + """ + Booting animation + """ + if setStripFunc == self._set_ring_LED: + numLEDs = NUM_RING_LEDS + else: + numLEDs = NUM_STRIP_LEDS + for i in range(numLEDs): + setStripFunc(i, 0, 255, 0) + self._LED_show() + time.sleep(1/numLEDs) + time.sleep(0.3) + time.sleep(0.3) + self._set_all_pixels(self.colors["GREEN"]) + time.sleep(1) + + + def _set_all_pixels(self, color, setStripFunc): + """ + Sets all pixels to a single + """ + if setStripFunc == self._set_ring_LED: + for i in range (NUM_RING_LEDS): + setStripFunc(i, color[0], color[1], color[2]) + elif setStripFunc == self._set_strip_LED: + for i in range (NUM_STRIP_LEDS): + setStripFunc(i, color[0], color[1], color[2]) + + def _set_ring_LED(self, ledNum, R, G, B): + if ledNum <= NUM_RING_LEDS - 1 and ledNum >= 0: + self._set_RGBW_LED(ledNum, R, G, B) + + def _set_strip_LED(self, ledNum, R, G, B): + if ledNum <= NUM_STRIP_LEDS - 1 and ledNum >= 0: + self._set_RGBW_LED(NUM_RING_LEDS + ledNum, R, G, B) + + def _LED_show(self): + with self.LEDMutex: + RGBW_PIXELS.LED_show() + + def _set_RGBW_LED(self, ledNum, R, G, B): + """ + Convert RGB value to RGBW value and set RGBW LEDs to new color + """ + # percentage of RGB in old colorspace + RGBTotal = R + G + B + if RGBTotal == 0: + RGBW_PIXELS.set_LED_color(ledNum, 0, 0, 0, 0) + return + percentR = R/RGBTotal + percentG = G/RGBTotal + percentB = B/RGBTotal + # set white value as lowest RGB value + newW = min(R, G, B) + # To preserve max brightness, set new highest RGB value to corresponding RGBW value + # Then scale the value of the 2 other RGB values to preserve original color percentage + newR = 0 + newG = 0 + newB = 0 + if max(R, G, B) == R: + newR = R + RTotal = newR + newW + RGBWTotal = RTotal / percentR + newG = (RGBWTotal * percentG) - newW + newB = (RGBWTotal * percentB) - newW + elif max(R, G, B) == G: + newG = G + GTotal = newG + newW + RGBWTotal = GTotal / percentG + newR = (RGBWTotal * percentR) - newW + newB = (RGBWTotal * percentB) - newW + elif max(R, G, B) == B: + newB = B + BTotal = newB + newW + RGBWTotal = BTotal / percentB + newR = (RGBWTotal * percentR) - newW + newG = (RGBWTotal * percentG) - newW + with self.LEDMutex: + RGBW_PIXELS.set_LED_color(ledNum, newR, newG, newB, newW) + + def main(args=None): diff --git a/src/rov/rov_led_controller/setup.cfg b/src/rov/rov_led_controller/setup.cfg index 8ac3298..df03f45 100644 --- a/src/rov/rov_led_controller/setup.cfg +++ b/src/rov/rov_led_controller/setup.cfg @@ -1,4 +1,4 @@ [develop] -script_dir=$base/lib/src/rov/rov_led_controller +script_dir=$base/lib/rov_led_controller [install] -install_scripts=$base/lib/src/rov/rov_led_controller +install_scripts=$base/lib/rov_led_controller diff --git a/src/rov/rov_led_controller/setup.py b/src/rov/rov_led_controller/setup.py index 479e134..0d652b1 100644 --- a/src/rov/rov_led_controller/setup.py +++ b/src/rov/rov_led_controller/setup.py @@ -1,6 +1,6 @@ from setuptools import find_packages, setup -package_name = 'src/rov/rov_led_controller' +package_name = 'rov_led_controller' setup( name=package_name, @@ -15,12 +15,12 @@ zip_safe=True, maintainer='catfishjw', maintainer_email='james.wiley@live.com', - description='TODO: Package description', + description='It controls LEDs', license='TODO: License declaration', tests_require=['pytest'], entry_points={ 'console_scripts': [ - 'led_controller = src/rov/rov_led_controller.led_controller:main' + 'led_controller = rov_led_controller.led_controller:main' ], }, ) diff --git a/src/rov/rov_led_controller/src/rov/led_controller/led_controller.py b/src/rov/rov_led_controller/src/rov/led_controller/led_controller.py deleted file mode 100644 index a8ca065..0000000 --- a/src/rov/rov_led_controller/src/rov/led_controller/led_controller.py +++ /dev/null @@ -1,136 +0,0 @@ -from common.csm_common_interfaces.msg import PinState -from std_msgs.msg import String -import Jetson.GPIO as GPIO -import rclpy -from rclpy.node import Node -from rainbowio import colorwheel - -import board -import neopixel_spi -import time -import random - -NUM_LIGHTS = 60 -LED_PIN = board.SPI()._pins[0] -BLACK = (0,0,0) -WHITE = (255,255,255) -RED = (255, 0, 0) -YELLOW = (255, 150, 0) -GREEN = (0, 255, 0) -CYAN = (0, 255, 255) -BLUE = (0, 0, 255) -PURPLE = (180, 0, 255) -PIXELS = neopixel_spi.NeoPixel_SPI(board.SPI(), NUM_LIGHTS) -global ledState -global ledColor - -class LEDControllerNode(Node): - - def __init__(self): - super().__init__(node_name="LEDController") - self.state_subscriber = self.create_subscription(PinState, "set_rov_gpio", self.on_set_rov_gpio, 10) - self.publisher_ = self.create_publisher(String, 'preprogrammed_animations', 10) - - self.animations = [ - "color_chase", - "rainbow_cycle", - "pulse", - "solid", - "seizure_disco" - ] - - self.colors = [ - "WHITE", - "BLACK", - "RED", - "YELLOW", - "GREEN", - "CYAN", - "BLUE", - "PURPLE" - ] - - # Listen for requests from the UI - self.ui_subscriber = self.create_subscription(String, 'ui_requests', self.ui_request_callback, 10) - - PIXELS.fill(BLACK) - - def ui_request_callback(self, msg): - if msg.data == 'get_animations': - # Send the list of preprogrammed animations to the UI - animations_msg = String() - animations_msg.data = "\n".join(animation for animation in self.animations) - self.publisher_.publish(animations_msg) - self.get_logger().info('Sent preprogrammed animations to UI') - elif msg.data == 'get_colors': - colors_msg = String() - colors_msg.data = "\n".join(color for color in self.colors) - self.publisher_.publish(colors_msg) - self.get_logger().info('Sent preprogrammed colors to UI') - else: - # Check if the requested animation exists - if msg.data in self.animations: - # Call the method corresponding to the requested animation - - if ledState and (lastAnimation != msg.data): - exec(f"self.{msg.data}({ledColor}, 0.1)")() - else: - PIXELS.brightness(0) - lastAnimation = msg.data - elif msg.data in self.colors: - lastColor = ledColor - if ledState and (lastColor != msg.data): - exec(f"self.{lastAnimation}({msg.data}, 0.1)")() - - - def on_set_rov_gpio(self, message: PinState): - if message.pin not in LED_PIN: - pass - ledState = message.state - - def color_chase(color, wait=0.1): - for i in range(NUM_LIGHTS): - PIXELS[i] = color - time.sleep(wait) - PIXELS.show() - time.sleep(0.5) - - def rainbow_cycle(color, wait=0.1): - for j in range(255): - for i in range(NUM_LIGHTS): - rc_index = (i * 256 // NUM_LIGHTS) + j - PIXELS[i] = colorwheel(rc_index & 255) - PIXELS.show() - time.sleep(wait) - - def pulse(color, wait=0.1): - PIXELS.fill(color) - for i in range(255): - PIXELS.setBrightness(i) - time.sleep(wait) - PIXELS.show() - time.sleep(wait) - - def solid(color, wait=0.1): - PIXELS.fill(color) - PIXELS.show() - - def seizure_disco(color, wait=0.1): - for i in range(NUM_LIGHTS): - R = random.randint(0,255) - G = random.randint(0,255) - B = random.randint(0,255) - PIXELS[i].fill(R, G, B) - PIXELS.show() - time.sleep(wait) - - -def main(args=None): - rclpy.init(args=args) - led_controller = LEDControllerNode() - rclpy.spin(led_controller) - led_controller.destroy_node() - rclpy.shutdown() - -if __name__ == '__main__': - main()