-
Notifications
You must be signed in to change notification settings - Fork 1
MQTT producer app and IPC context cancellation #54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
feat: mqtt command line arguments and packet support
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements an MQTT producer application that reads telemetry data from GSW via shared memory and publishes it to an MQTT broker. The changes also refactor the IPC Reader interface to support context-based cancellation for graceful shutdowns, and separate service configurations into a dedicated docker-compose file for more flexible deployment options.
Key changes:
- Added context support to the IPC Reader interface enabling cancellation and graceful shutdown
- Implemented a new MQTT producer app that publishes telemetry to topic patterns
gsw/[packet]/[measurement] - Split Docker Compose configuration to allow running services independently from the main GSW application
Reviewed changes
Copilot reviewed 15 out of 16 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
| cmd/mqtt_producer/main.go | New MQTT producer app that reads from shared memory and publishes telemetry to MQTT broker with per-measurement topics |
| lib/ipc/ipc.go | Updated Reader interface to accept context parameter for cancellation support |
| lib/ipc/shm.go | Implemented context-aware wait() and Read() methods with futex wake on cancellation |
| lib/ipc/futex.go | Added timeout parameter to futexWait and improved error wrapping with %w |
| cmd/telem_view/telem_view.go | Updated to pass context.TODO() to new Read() signature |
| cmd/mem_view/mem_view.go | Updated to pass context.TODO() to new Read() signature |
| cmd/ipc_benchmark/reader.go | Updated to pass context to Read() for proper cancellation support |
| cmd/grafana_live/grafana_live.go | Updated to pass context.TODO() to new Read() signature in two locations |
| compose.yaml | Moved service definitions to separate file and added include statement |
| compose-services.yaml | New file containing Grafana, InfluxDB, and Mosquitto service definitions |
| data/mosquitto/mosquitto.conf | Configuration file for Mosquitto broker with anonymous authentication |
| data/mosquitto/Containerfile | Dockerfile for building custom Mosquitto container with config |
| cmd/Containerfile | Added mqtt_producer to the list of binaries to build |
| go.mod | Added paho.mqtt.golang dependency and updated Go version to 1.24.0 with toolchain 1.24.9 |
| go.sum | Updated checksums for new and updated dependencies |
| README.md | Updated documentation to mention Mosquitto and new compose-services.yaml usage |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
I believe this is a slight performance regression due to the extra context checks. |
I wouldn't worry too much about it. You have to remember that most of our use cases is testing backplane (~120 pps over Ethernet) and like a packet every few seconds (radio). There will be cases where we may need to deal with higher rates (i.e. DAQ at either 8 khz or 43 khz), but also this is for producing visualization apps where we cannot perceive changes that fast. As long as we can log most of the data meaningfully which is done by the service we should be fine. Optimization is good, but lets not fixate on it unnecessarily (obviously GSW shouldnt be railing the CPU though lol) |
AarC10
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good! Thanks for consulting and taking my context solution instead of doing polling lol. Im ok with this code, but I think mqtt_producer could see a few improvements down the line
| continue | ||
| } | ||
| // qos=0 delivery not guaranteed | ||
| client.Publish(fmt.Sprintf("%s/%s/%s", *topicPrefix, packet.Name, name), 0, false, jsonStr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we handle the Token return type here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nvm read the QOS comment
| val, err := tlm.InterpretMeasurementValue(meas, data[offset:offset+meas.Size]) | ||
| if err != nil { | ||
| pLog.Printf("error interpreting measurement: %v\n", err) | ||
| continue | ||
| } | ||
| jsonStr, err := json.Marshal(val) | ||
| if err != nil { | ||
| pLog.Printf("error marshaling measurement: %v\n", err) | ||
| continue | ||
| } | ||
| // qos=0 delivery not guaranteed | ||
| client.Publish(fmt.Sprintf("%s/%s/%s", *topicPrefix, packet.Name, name), 0, false, jsonStr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Im ok with fleshing this out for now, but I think that measurements should be grouped since this would scale poorly. We should avoid having to JSON serializin gand TCP sending to a different topic for each single measurement. This would get expensive fast with more measurements and higher rates
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeahhh- I still want to use different topics for every measurement because it's easiest to consume down the line (with most stuff it will just work ™️. I do want to make better serialization for measurements that isn't running them through JSON- maybe like a generic .String() that will be implemented for every measurement type.
| topicPrefix = flag.String("topic_prefix", "gsw", "mqtt topic prefix") | ||
| ) | ||
|
|
||
| func main() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should document this is a best effort bridge
| client.Disconnect(250) | ||
| } | ||
|
|
||
| func packetWriter(ctx context.Context, packet tlm.TelemetryPacket, client mqtt.Client) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thing I would suggest is adding a way to configure downsampling in the future. If this is mainly for sending to visualization apps that read over MQTT we may not want to be publishing certain things at an insane rate that we can't perceive fast enough or to avoid nuking a computer trying to render millions of data points on a graph
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep! that's the plan for when I end up adding that API.
closes #43 and #52
Description
Implements a simple MQTT producer app that reads from GSW and publishes telemetry in the
gsw/[packet]/[measurement]topic. This also separates service configuration into a separate docker-compose file to be able to run services in docker without running gsw_service in docker.How Has This Been Tested?
Fake data generated using
ipc_benchmarkand mosquitto'smosquitto_subutility. To test websockets, https://www.hivemq.com/demos/websocket-client/ was used.TODO