-
Notifications
You must be signed in to change notification settings - Fork 0
Communication Interface
The following page assumes knowledge of the Python programming language.
Simply put, a communication interface provides a way to take a message, format it, and send it to a connected device.
A communication interface enables the user to control almost any device using any protocol using FreeRGB's interface.
Using a communication interface, it is possible to add support for a range of devices, from communicating with devices connected over serial, to sending messages to MQTT devices on a network and even existing products such as Yeelight.
Each communication interface is created as a class with at least 5 methods:
- An
__init__constructor. - A static
runmethod. - A static
writemethod. - A static
readmethod. - A class
getBoardInfomethod*.
Note for the getBoardInfo method, either it or a connected method must be a class method that registers the communication interface. More on this later.
Ensure within this module you have imported __globals__:
from src import __globals__This method is called whenever communication needs to begin with a new device i.e. when a device that accepts a specific communication is selected, that communication interface's constructor will be called.
Inside this method, communication with the selected device must be initialised, after which the class' run method must be called with 'getBoardInfo' as an argument (more on this later).
This method must be a static method, and must accept parameters target_method, *args, **kwargs.
Essentially, this method takes a string input target_method, and will call its class' method by that string using getattr.
This addition of this method allows setting up something each each time before calling any write or read methods or similar.
One use case for this is to create a new thread for communication, allowing the main window to stay alive while communication between devices occurs.
This method must be a static method, and must accept parameters message, out_type=None, *args, **kwargs.
It may optionally accept additional parameters.
The purpose of this method is to receive a message, apply formatting on the message based on out_type, and send this message to a connected device.
This method must be a static method, and must accept parameters *args, **kwargs.
It may optionally accept additional parameters.
The purpose of this method is to receive data from a connected device.
This method should return a string if reading is successful.
This method must be either a static method or class method, and must accept parameters *args, **kwargs.
It may optionally accept additional parameters.
There are two options for this method:
- The method can immediately read from a connected device. In this case, the method will not call another method, and must be a classmethod that registers itself as the global communication module. This may not always be a good idea since you may need to wait for the message to receive, such as waiting for serial input before reading it.
- The method waits for input to be available before reading. In this case, the method will be a staticmethod that registers a callback classmethod that runs when input is available. This classmethod must register the global communication module.
If a classmethod, this method must also accept parameter cls.
This method should set __globals__.board_data to a dictionary with relevant data (see examples).
This method should also update the current position of the brightness slider. To do this, ensure you are receiving brightness data from the device, then
__globals__.parent.ui.slider_brightness.setValue(brightness)where brightness is some integer.
If the communication interface supports multiple LED strips in one device, you'll need to register your strips menu.
from src.ui.strips_menu import initStripsMenu
__globals__.strips_menu.clicked.connect(lambda: initStripsMenu(data))Before the communication interface can be used, it has to be registered.
Within the selectDevice method inside src/ui/button_actions.py, you must add the interface to available communication objects.
To do this:
- Import your communication module inside the
selectDevicemethod. - Add your object to the
comm_objectsdictionary. The key should be the name of the communication type (defined by the device finder), and the value should be your communication class.
Upon successfully establishing a connection with a device and obtaining data through a board data getter classmethod, you must update the current global communication type for future communications with this device.
Within the classmethod which accepts parameter cls, add the following line after relevant data integrity checks:
__globals__.comm_module = [__name__, cls.__name__]To be able to connect to a device using your communication interface, you'll need to create a static connectSerial method that accepts parameters *args, **kwargs. This method should be placed in src/ui/button_actions.py.
Additional parameters are supported.
This method needs to create a new object for your communication interface and run the constructor with relevant arguments.
This method is tied to an accompanying finder.
If your communication interface needs to store an object in memory, e.g. serial stores a QSerial object in memory, you'll need to register this.
In src/__globals__.py, you'll need to add a variable, initialised as None, to store the object.
Within src/ui/button_actions.py, you'll need to add the name of this variable as a string inside the comm_globals list.
Inside your communication class constructor is where you'll initialise this object.
This example is adapted from FreeRGB's serialio.
Firstly, we're going to create a new class and its constructor method. This method can take any parameters.
class SerialIO():
def __init__(self, port, baudrate=None):
__globals__.serial = QSerialPort(port, baudRate=baudrate)
try:
if not __globals__.serial.isOpen():
__globals__.serial.open(QIODevice.ReadWrite)
except:
pass
SerialIO.run('getBoardInfo')The first part of the constructor creates a new serial object and stores it inside __globals__.
This allows methods in any module to access this serial object.
The second part is simply testing if the serial object is busy. This can be adapted for any other communication interface, where checking for existing communications should be accomplished prior to attempting to start a new communication.
In the final part of the constructor, we call the class' run method with 'getBoardInfo' as an argument.
We don't need to initialise an object here as we're running a staticmethod.
This must be present in your constructor.
In this next part, we define the run method.
Some specifics are glossed over in this example, so if you'd like to see the full implementation, please visit the source file.
@staticmethod
def run(target_method, *args, **kwargs):
if not __globals__.serial_thread.isRunning():
...
method = getattr(SerialIO, target_method)
worker.ready.connect(lambda: method(serial, *args, **kwargs))
worker.moveToThread(thread)
worker.finished.connect(thread.quit)
thread.started.connect(worker.procCounter)
__globals__.serial_thread = thread
thread.start()All run methods in communication interfaces must accept target_method, *args, and **kwargs as parameters.
A check is made to ensure the current serial thread is not running. This may not apply to all communication interfaces;
if using a form of threading, using python's built-in threading module is preferred over using QThreads due to its lowered complexity. QThreads are used here to allow moving a QSerial object across threads; if not using PyQt specific objects, you shouldn't need to use QThreads. In the case you are using a QThread for communication, you may need to add a __globals__ variable to prevent the thread from destruction when its caller is destroyed.
target_method is a string that instructs getattr which method to call, and this is stored as method.
This method is moved to the serial worker. In the case of using Python's threading, you would just need to set the target to this method.
moveToThread is used to move the thread worker to the new thread, and along with finished.connect and started.connect are QThread specific instructions for connecting start and finish signals to the thread. __globals__.serial_thread = thread stores the QThread object in memory. These steps are not necessary when using Python's threading.
After all this, the thread is started.
Next, we need to make a write method.
Some specifics are glossed over in this example, so if you'd like to see the full implementation, please visit the source file.
@staticmethod
def write(serial, message, out_type=None, *args, **kwargs):
if out_type is None:
serial.write(message)
elif out_type == 'Integer':
...
serial.write(input_array)
elif out_type == 'Integer-Char':
...
serial.write(bytearray(output_bytes))
elif out_type == 'ArduRGB':
...
serial.write(bytearray(output_bytes))
elif out_type == 'String':
serial.write(message.encode())
elif settings.do_logs:
__globals__.logger.warn(
f'Message output type {out_type} not defined')Here we take care of 2 things.
First; we need to format the message according to rules based on input types.
Second; we call a 'write' method for our underlying communication architecture. In the case of this example, we're using a QSerial object stored as our variable serial, and this object has an underlying method write, so this what is called here.
Remember, the write method must accept parameters message and out_type, as this is how button data is fed to the communication interface.
We now need to create a read method
@staticmethod
def read(serial, *args, **kwargs):
data = data = serial.readAll().data()
try:
data = data.decode()
return data
except:
if settings.do_logs:
__globals__.logger.warn(
f'Could not decode serial input {data}')Here we need to get data from a connected device.
In this example, a QSerial object has an underlying method readAll, and so this is used to obtain the data.
If no errors are met, this method should return the data read to its caller.
With these next example, we'll be implementing the getBoardInfo method.
Some specifics are glossed over in this example, so if you'd like to see the full implementation, please visit the source file.
@staticmethod
def getBoardInfo(serial, *args, **kwargs):
serial.write(bytearray(b'\xfe\x00\tboardinfo\xff'))
...
serial.readyRead.connect(SerialIO.setBoardInfo)
@classmethod
def setBoardInfo(cls):
...
__globals__.serial.readyRead.disconnect(SerialIO.setBoardInfo)
serial_data = __globals__.board_data_buffer
serial_data = serial_data.strip('\r\n').split(',')
try:
if len(serial_data) == int(serial_data[0]) + 1:
board_data = {'name': serial_data[1],
'type': serial_data[2],
'version': serial_data[3],
'physical_strips': serial_data[4],
'virtual_strips': serial_data[5],
'default_brightness': serial_data[6],
'port': __globals__.serial.portName()}
__globals__.board_data = board_data
__globals__.comm_module = [__name__, cls.__name__]
command = {'type': 'serial',
'payload': __globals__.serial.portName()}
JsonIO('devices.json').writeEntry('known_devices', 'devices',
serial_data[1], serial_data[1], command, sort_keys=True)
__globals__.parent.ui.slider_brightness.setValue(
int(serial_data[6]))
__globals__.strips_menu.clicked.connect(lambda: initStripsMenu(int(serial_data[5])))In this example we're using a 'ready read' approach, where the call to getBoardInfo will only register a callback when a message is received.
This approach is not always necessary.
getBoardInfo simply sends a request for data to a connected device, and registers a callback to read any data received once the device responds to the request.
The important part of this is the classmethod; only when the actual data received from the device can be verified should any communication interface be registered. The classmethod is responsible for registering this.
The first part of setBoardInfo deregisters the callback to prevent additional calls during normal communications. This is important if using this 'ready read' approach.
The reset of this method simply reads data received from the connected device in lines and adds it to a total buffer, after which it creates a board_data dictionary.
This dictionary is required to be made for all communication interfaces, however any data can be stored in there.
The global board data is then stored for other modules to access, and the communication module is also registered.
The next step is to create a command that is called when the now added device is used. The type entry relates to the key used in the comm_objects dictionary in the selectDevice method in src/ui/button_actions.py.
The payload entry is used as a way to connect to the device as a method argument. In the case of this example, the serial port is used, so when communication is established with this device, it connects through this port.
This data is then written to the device entry inside known_devices in preferences/devices.json.
After this, the brightness slider value is updated with values obtained from device communication.
As a last step in this method, the led strips menu is connected to the current data obtained from the device. This is only required if your communication interface supports multiple led strips.
Finally, we'll register a connector method inside src/ui/button_actions.py.
This is called from our communication finder.
@staticmethod
def connectSerial(port, *args, **kwargs):
from src.serial.serialio import SerialIO
SerialIO(port).run('getBoardInfo')This method must accept *args, **kwargs parameters, but can also accept any other parameters.
All this method does is creates a new communication interface object and runs the getBoardInfo method through the run method.
FreeRGB | License | Code of Conduct | Contributing | Wiki | Code