-
Notifications
You must be signed in to change notification settings - Fork 0
feat: implemented Retained and Will Messages #50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements retained and will messages for the MQTT broker, addressing issue #19. The implementation adds comprehensive support for retained message storage, matching, and lifecycle management with automatic cleanup of expired messages.
- Added a new RetainedManager that handles retained message operations with configurable cleanup intervals
- Implemented topic matching functionality with wildcard support (+ and #) for message filtering
- Created a RetainedStore that provides thread-safe storage with expiration handling and context cancellation support
Reviewed Changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| topic/subscription.go | Added RetainedMessage struct and imported message package |
| topic/router.go | Integrated RetainedManager into Router with new methods for retained message operations |
| topic/retained.go | Implemented RetainedManager with cleanup loop and configuration options |
| topic/matcher.go | Added TopicMatcher for wildcard topic filtering |
| store/retained.go | Created RetainedStore with thread-safe operations and expiration handling |
| topic/router_retained_test.go | Comprehensive tests for Router retained message functionality |
| topic/retained_test.go | Unit tests for RetainedManager including concurrent operations |
| topic/matcher_test.go | Tests and benchmarks for topic matching functionality |
| store/retained_test.go | Extensive tests for RetainedStore including edge cases |
| topic/retained_bench_test.go | Performance benchmarks for RetainedManager operations |
| store/retained_bench_test.go | Performance benchmarks for RetainedStore operations |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces an implementation for retained messages, a core MQTT feature. It adds an in-memory RetainedStore, a RetainedManager with periodic cleanup of expired messages, and integrates this into the Router. The changes are well-structured and include comprehensive tests and benchmarks.
However, I've found a few critical issues that need to be addressed:
- The topic matching logic for topics starting with
$does not adhere to the MQTT specification, which could lead to incorrect message routing for system topics. - The
RetainedMessagestruct has a redundantTopicfield, which can cause inconsistencies and bugs when setting and getting retained messages. I've provided suggestions to refactor this.
Additionally, the pull request title mentions "Will Messages", but the implementation seems to only cover "Retained Messages". You might want to update the title to accurately reflect the changes.
Once these issues are resolved, this will be a great addition.
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 1 comment.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a robust implementation for retained messages, a crucial MQTT feature. The changes are well-structured, with clear separation of concerns between the storage layer (RetainedStore) and the management layer (RetainedManager), and good integration into the existing Router. The code is accompanied by a comprehensive set of unit tests and benchmarks. However, I have identified two significant performance issues in the RetainedStore that could affect scalability and latency under load, particularly concerning topic matching and the cleanup of expired messages. Addressing these points will greatly improve the performance and resilience of the retained message handling.
…e management methods
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a comprehensive implementation for handling retained MQTT messages, including storage, matching, and expiration. The implementation is based on a trie data structure for efficient topic handling. The changes are well-structured across store and topic packages, with good test coverage.
However, I've identified several critical issues that need to be addressed:
- A flawed locking strategy in
RetainedStorethat leads to unnecessary complexity and performance overhead. - A memory leak in the expired message cleanup logic.
- A
Matchfunction that incorrectly ignores itsmatcherparameter. - A missing function dependency that will cause a compilation error.
My detailed comments below provide suggestions to resolve these issues.
| return nil | ||
| } | ||
|
|
||
| func (r *RetainedStore) Match(ctx context.Context, topicFilter string, matcher TopicMatcher) ([]*message.Message, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Match function's signature includes a matcher TopicMatcher parameter, but the implementation in matchRecursive completely ignores it, using its own hardcoded MQTT wildcard matching logic. This violates the function's contract, is misleading, and prevents callers from using custom matching logic. To fix this, you should either update the implementation to use the provided matcher, or remove the unused parameter from the function signature across the call chain to make the code's behavior clear.
| func (r *RetainedStore) Match(ctx context.Context, topicFilter string, matcher TopicMatcher) ([]*message.Message, error) { | |
| func (r *RetainedStore) Match(ctx context.Context, topicFilter string) ([]*message.Message, error) { |
| // cleanupExpiredRecursive recursively removes expired messages | ||
| func (r *RetainedStore) cleanupExpiredRecursive(node *retainedTrieNode, now time.Time, count *int) { | ||
| node.mu.Lock() | ||
|
|
||
| if node.message != nil && !node.message.ExpiresAt.IsZero() && now.After(node.message.ExpiresAt) { | ||
| node.message = nil | ||
| *count++ | ||
| r.count-- | ||
| } | ||
|
|
||
| children := make([]*retainedTrieNode, 0, len(node.children)) | ||
| for _, child := range node.children { | ||
| children = append(children, child) | ||
| } | ||
| node.mu.Unlock() | ||
|
|
||
| for _, child := range children { | ||
| r.cleanupExpiredRecursive(child, now, count) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cleanupExpiredRecursive function does not prune empty trie nodes after a message expires. This will cause a memory leak as the trie will grow indefinitely with empty branches. The function should be modified to perform a post-order traversal, allowing it to identify and remove nodes that become empty after their messages and all their children are gone.
This requires changing the function to return a boolean indicating if it can be pruned. The call site at line 332 will then need to be updated to r.cleanupExpiredRecursive(r.root, now, &count), ignoring the returned boolean, which is correct for the root node.
// cleanupExpiredRecursive recursively removes expired messages and returns true if the node should be pruned.
// This assumes the store-level lock is held and per-node locks are removed.
func (r *RetainedStore) cleanupExpiredRecursive(node *retainedTrieNode, now time.Time, count *int) (shouldPrune bool) {
if node.message != nil && !node.message.ExpiresAt.IsZero() && now.After(node.message.ExpiresAt) {
node.message = nil
*count++
r.count--
}
keysToDelete := []string{}
for key, child := range node.children {
if r.cleanupExpiredRecursive(child, now, count) {
keysToDelete = append(keysToDelete, key)
}
}
for _, key := range keysToDelete {
delete(node.children, key)
}
return node.message == nil && len(node.children) == 0
}| filterLevels := splitTopicLevels(filter) | ||
| topicLevels := splitTopicLevels(topic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function matchTopicFilter calls splitTopicLevels on lines 26 and 27. However, splitTopicLevels is not defined within the topic package. It appears to be an unexported function in the store package, which is inaccessible from here. This will result in a compilation error. To resolve this, you should make splitTopicLevels available to this package, for example by duplicating it here or moving it to a shared utility package.
| node.mu.Lock() | ||
| if node.children[level] == nil { | ||
| node.children[level] = newRetainedTrieNode() | ||
| } | ||
| nextNode := node.children[level] | ||
| node.mu.Unlock() | ||
| node = nextNode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current locking strategy is inefficient and overly complex. The RetainedStore methods (like this Set method) acquire a store-level RWMutex, which already guarantees thread safety for the trie's structure. However, the methods then proceed to acquire fine-grained locks on each retainedTrieNode during traversal. This per-node locking is redundant due to the store-level lock and adds unnecessary performance overhead and code complexity.
The implementation should be simplified to use only the store-level lock. This would also make fixing other issues like the memory leak in cleanupExpiredRecursive much simpler. You should remove the mu field from retainedTrieNode and all its usages throughout the file.
| node.mu.Lock() | |
| if node.children[level] == nil { | |
| node.children[level] = newRetainedTrieNode() | |
| } | |
| nextNode := node.children[level] | |
| node.mu.Unlock() | |
| node = nextNode | |
| if node.children[level] == nil { | |
| node.children[level] = newRetainedTrieNode() | |
| } | |
| node = node.children[level] |
Fixes #19