Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 173 additions & 0 deletions spec/broker_sync_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
require "./spec_helper"

module PlaceOS::Source
describe "Broker State Sync" do
it "callback is invoked when new broker is added after startup" do
# Create initial broker
test_broker

# Setup MQTT broker manager
mqtt_manager = MqttBrokerManager.new

# Track if callback was invoked
callback_invoked = false
callback_broker_id = ""

mqtt_manager.on_broker_ready = ->(broker_id : String) {
callback_invoked = true
callback_broker_id = broker_id
}

# Start the manager to mark startup as finished
mqtt_manager.start

# Wait for startup to complete
sleep 200.milliseconds

# Create a new broker after startup
new_broker = PlaceOS::Model::Broker.new(
name: "new-broker-#{Time.utc.to_unix}",
host: ENV["MQTT_HOST"]?.presence || "mqtt",
port: ENV["MQTT_PORT"]?.presence.try(&.to_i?) || 1883,
auth_type: :no_auth,
).save!

# Trigger the broker creation event
event = Resource::Event(PlaceOS::Model::Broker).new(:created, new_broker)
mqtt_manager.@event_channel.send(event)

# Wait for broker to be processed
sleep 300.milliseconds

# Verify the broker was created successfully
mqtt_manager.@publishers[new_broker.id.as(String)]?.should_not be_nil

# Verify the callback was invoked
callback_invoked.should be_true
callback_broker_id.should eq new_broker.id.as(String)

# Cleanup
mqtt_manager.stop
new_broker.destroy
end

it "callback is not invoked for brokers created during startup" do
# Setup MQTT broker manager
mqtt_manager = MqttBrokerManager.new

# Track if callback was invoked
callback_invoked = false

mqtt_manager.on_broker_ready = ->(_broker_id : String) {
callback_invoked = true
}

# Create broker before starting (simulating existing broker)
startup_broker = test_broker

# Start the manager (this will load existing brokers)
mqtt_manager.start

# Wait for startup to complete
sleep 200.milliseconds

# Verify the broker was loaded
mqtt_manager.@publishers[startup_broker.id.as(String)]?.should_not be_nil

# Verify the callback was NOT invoked during startup
callback_invoked.should be_false

# Cleanup
mqtt_manager.stop
end

it "resync_state only runs after initial sync completes" do
mock_mappings_state = mock_state(module_id: "mod-test")
mock_mappings = Mappings.new(mock_mappings_state)
mock_publisher = MockManager.new

status_events = StatusEvents.new(mock_mappings, [mock_publisher] of PublisherManager)

# Before initial sync, resync should not run
status_events.resync_state
mock_publisher.messages.size.should eq 0

# Start to trigger initial sync
spawn { status_events.start }

# Wait for initial sync
sleep 300.milliseconds

# Clear messages from initial sync
mock_publisher.messages.clear

# Now resync should work
status_events.resync_state

# Wait for resync to process
sleep 200.milliseconds

# Cleanup
status_events.stop
end

it "full integration: new broker receives state via resync" do
# Create initial broker
test_broker

# Setup mock publisher to track messages
mock_publisher = MockManager.new
publisher_managers = [mock_publisher] of PublisherManager

# Add MQTT broker manager
mqtt_manager = MqttBrokerManager.new
publisher_managers << mqtt_manager

# Mock data with a module that has proper mappings
module_id = "mod-integration-test"
status_key = "power"
mock_mappings_state = mock_state(module_id: module_id)
mock_mappings = Mappings.new(mock_mappings_state)

# Start application manager
manager = Manager.new(publisher_managers, mock_mappings)
manager.start

# Wait for initial sync to complete
sleep 300.milliseconds

# Store module state in Redis
Redis.open(url: REDIS_URL) do |client|
client.set("status/#{module_id}/#{status_key}", "on".to_json)
end

# Clear any messages from initial sync
mock_publisher.messages.clear

# Create a new broker after startup
new_broker = PlaceOS::Model::Broker.new(
name: "integration-broker-#{Time.utc.to_unix}",
host: ENV["MQTT_HOST"]?.presence || "mqtt",
port: ENV["MQTT_PORT"]?.presence.try(&.to_i?) || 1883,
auth_type: :no_auth,
).save!

# Trigger the broker creation event
event = Resource::Event(PlaceOS::Model::Broker).new(:created, new_broker)
mqtt_manager.@event_channel.send(event)

# Wait for broker to be processed and state resync to occur
sleep 500.milliseconds

# Verify the broker was created
mqtt_manager.@publishers[new_broker.id.as(String)]?.should_not be_nil

# Verify the callback was wired up by the manager
mqtt_manager.on_broker_ready.should_not be_nil

# Cleanup
manager.stop
new_broker.destroy
end
end
end
11 changes: 11 additions & 0 deletions src/source/manager.cr
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ module PlaceOS::Source
return if started?
@started = true

# Start publisher managers first to establish connections
Log.info { "registering Publishers" }
publisher_managers.each(&.start)

Expand All @@ -53,6 +54,16 @@ module PlaceOS::Source
Log.info { "starting Driver router" }
driver_router.start

# Setup callback for new broker connections to trigger state resync
publisher_managers.each do |manager|
if manager.is_a?(MqttBrokerManager)
manager.on_broker_ready = ->(broker_id : String) {
Log.info { "triggering state resync for new Broker<#{broker_id}>" }
spawn { status_events.resync_state }
}
end
end

Log.info { "listening for Module state events" }
spawn { status_events.start }

Expand Down
9 changes: 9 additions & 0 deletions src/source/publishing/mqtt_broker_manager.cr
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ module PlaceOS::Source

Log = ::Log.for(self)

# Callback to trigger state sync when new brokers are added
property on_broker_ready : Proc(String, Nil)?

class_getter instance : self { new }

# Broadcast a message to each MQTT Broker
Expand Down Expand Up @@ -61,6 +64,12 @@ module PlaceOS::Source

publisher.start

# Trigger state sync callback if this is a new broker after startup
if startup_finished?
Log.info { "new broker connected after startup, triggering state sync for Broker<#{broker_id}>" }
on_broker_ready.try &.call(broker_id)
end

Resource::Result::Success
end

Expand Down
3 changes: 3 additions & 0 deletions src/source/publishing/mqtt_publisher.cr
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ module PlaceOS::Source
password: broker.password,
)

# Small delay to ensure connection is fully established
sleep 100.milliseconds

close_channel = Channel(Nil).new(1)

repeating_task = Tasker.every((keep_alive // 3).seconds) do
Expand Down
41 changes: 41 additions & 0 deletions src/source/status_events.cr
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ module PlaceOS::Source
private getter publisher_managers : Array(PublisherManager)

private property? stopped : Bool = true
private property? initial_sync_complete : Bool = false

private getter sync_lock = Mutex.new(:reentrant)

Expand Down Expand Up @@ -95,6 +96,46 @@ module PlaceOS::Source
modules: mods_mapped.to_s,
values: status_updated.to_s,
} }
self.initial_sync_complete = true
end

# Trigger a state resync - useful when new brokers are added
def resync_state
return unless initial_sync_complete?

Log.info { "resyncing state for new broker connection" }

mods_mapped = 0_u64
status_updated = 0_u64
pattern = "broker_resync"

PlaceOS::Model::Module.order(id: :asc).all.in_groups_of(64, reuse: true) do |modules|
modules.each do |mod|
next unless mod
mods_mapped += 1_u64
module_id = mod.id.to_s
store = PlaceOS::Driver::RedisStorage.new(module_id)
store.each do |key, value|
status_updated += 1_u64
add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc})
end

# Backpressure if event container is growing too fast
if event_container.size >= MAX_CONTAINER_SIZE / 2
until event_container.size < MAX_CONTAINER_SIZE / 4
sleep 10.milliseconds
end
end
rescue error
Log.warn(exception: error) { "error resyncing #{mod.try(&.id)}" }
end
end

Log.info { {
message: "state resync complete",
modules: mods_mapped.to_s,
values: status_updated.to_s,
} }
end

protected def handle_pevent(pattern : String, channel : String, payload : String)
Expand Down
Loading