diff --git a/spec/broker_sync_spec.cr b/spec/broker_sync_spec.cr new file mode 100644 index 0000000..87eae3d --- /dev/null +++ b/spec/broker_sync_spec.cr @@ -0,0 +1,173 @@ +require "./spec_helper" + +module PlaceOS::Source + describe "Broker State Sync" do + it "callback is invoked when new broker is added after startup" do + # Create initial broker + test_broker + + # Setup MQTT broker manager + mqtt_manager = MqttBrokerManager.new + + # Track if callback was invoked + callback_invoked = false + callback_broker_id = "" + + mqtt_manager.on_broker_ready = ->(broker_id : String) { + callback_invoked = true + callback_broker_id = broker_id + } + + # Start the manager to mark startup as finished + mqtt_manager.start + + # Wait for startup to complete + sleep 200.milliseconds + + # Create a new broker after startup + new_broker = PlaceOS::Model::Broker.new( + name: "new-broker-#{Time.utc.to_unix}", + host: ENV["MQTT_HOST"]?.presence || "mqtt", + port: ENV["MQTT_PORT"]?.presence.try(&.to_i?) || 1883, + auth_type: :no_auth, + ).save! + + # Trigger the broker creation event + event = Resource::Event(PlaceOS::Model::Broker).new(:created, new_broker) + mqtt_manager.@event_channel.send(event) + + # Wait for broker to be processed + sleep 300.milliseconds + + # Verify the broker was created successfully + mqtt_manager.@publishers[new_broker.id.as(String)]?.should_not be_nil + + # Verify the callback was invoked + callback_invoked.should be_true + callback_broker_id.should eq new_broker.id.as(String) + + # Cleanup + mqtt_manager.stop + new_broker.destroy + end + + it "callback is not invoked for brokers created during startup" do + # Setup MQTT broker manager + mqtt_manager = MqttBrokerManager.new + + # Track if callback was invoked + callback_invoked = false + + mqtt_manager.on_broker_ready = ->(_broker_id : String) { + callback_invoked = true + } + + # Create broker before starting (simulating existing broker) + startup_broker = test_broker + + # Start the manager (this will load existing brokers) + mqtt_manager.start + + # Wait for startup to complete + sleep 200.milliseconds + + # Verify the broker was loaded + mqtt_manager.@publishers[startup_broker.id.as(String)]?.should_not be_nil + + # Verify the callback was NOT invoked during startup + callback_invoked.should be_false + + # Cleanup + mqtt_manager.stop + end + + it "resync_state only runs after initial sync completes" do + mock_mappings_state = mock_state(module_id: "mod-test") + mock_mappings = Mappings.new(mock_mappings_state) + mock_publisher = MockManager.new + + status_events = StatusEvents.new(mock_mappings, [mock_publisher] of PublisherManager) + + # Before initial sync, resync should not run + status_events.resync_state + mock_publisher.messages.size.should eq 0 + + # Start to trigger initial sync + spawn { status_events.start } + + # Wait for initial sync + sleep 300.milliseconds + + # Clear messages from initial sync + mock_publisher.messages.clear + + # Now resync should work + status_events.resync_state + + # Wait for resync to process + sleep 200.milliseconds + + # Cleanup + status_events.stop + end + + it "full integration: new broker receives state via resync" do + # Create initial broker + test_broker + + # Setup mock publisher to track messages + mock_publisher = MockManager.new + publisher_managers = [mock_publisher] of PublisherManager + + # Add MQTT broker manager + mqtt_manager = MqttBrokerManager.new + publisher_managers << mqtt_manager + + # Mock data with a module that has proper mappings + module_id = "mod-integration-test" + status_key = "power" + mock_mappings_state = mock_state(module_id: module_id) + mock_mappings = Mappings.new(mock_mappings_state) + + # Start application manager + manager = Manager.new(publisher_managers, mock_mappings) + manager.start + + # Wait for initial sync to complete + sleep 300.milliseconds + + # Store module state in Redis + Redis.open(url: REDIS_URL) do |client| + client.set("status/#{module_id}/#{status_key}", "on".to_json) + end + + # Clear any messages from initial sync + mock_publisher.messages.clear + + # Create a new broker after startup + new_broker = PlaceOS::Model::Broker.new( + name: "integration-broker-#{Time.utc.to_unix}", + host: ENV["MQTT_HOST"]?.presence || "mqtt", + port: ENV["MQTT_PORT"]?.presence.try(&.to_i?) || 1883, + auth_type: :no_auth, + ).save! + + # Trigger the broker creation event + event = Resource::Event(PlaceOS::Model::Broker).new(:created, new_broker) + mqtt_manager.@event_channel.send(event) + + # Wait for broker to be processed and state resync to occur + sleep 500.milliseconds + + # Verify the broker was created + mqtt_manager.@publishers[new_broker.id.as(String)]?.should_not be_nil + + # Verify the callback was wired up by the manager + mqtt_manager.on_broker_ready.should_not be_nil + + # Cleanup + manager.stop + new_broker.destroy + end + end +end diff --git a/src/source/manager.cr b/src/source/manager.cr index c35e7f9..014ffcb 100644 --- a/src/source/manager.cr +++ b/src/source/manager.cr @@ -34,6 +34,7 @@ module PlaceOS::Source return if started? @started = true + # Start publisher managers first to establish connections Log.info { "registering Publishers" } publisher_managers.each(&.start) @@ -53,6 +54,16 @@ module PlaceOS::Source Log.info { "starting Driver router" } driver_router.start + # Setup callback for new broker connections to trigger state resync + publisher_managers.each do |manager| + if manager.is_a?(MqttBrokerManager) + manager.on_broker_ready = ->(broker_id : String) { + Log.info { "triggering state resync for new Broker<#{broker_id}>" } + spawn { status_events.resync_state } + } + end + end + Log.info { "listening for Module state events" } spawn { status_events.start } diff --git a/src/source/publishing/mqtt_broker_manager.cr b/src/source/publishing/mqtt_broker_manager.cr index 871ca97..d4f8521 100644 --- a/src/source/publishing/mqtt_broker_manager.cr +++ b/src/source/publishing/mqtt_broker_manager.cr @@ -11,6 +11,9 @@ module PlaceOS::Source Log = ::Log.for(self) + # Callback to trigger state sync when new brokers are added + property on_broker_ready : Proc(String, Nil)? + class_getter instance : self { new } # Broadcast a message to each MQTT Broker @@ -61,6 +64,12 @@ module PlaceOS::Source publisher.start + # Trigger state sync callback if this is a new broker after startup + if startup_finished? + Log.info { "new broker connected after startup, triggering state sync for Broker<#{broker_id}>" } + on_broker_ready.try &.call(broker_id) + end + Resource::Result::Success end diff --git a/src/source/publishing/mqtt_publisher.cr b/src/source/publishing/mqtt_publisher.cr index 73cce99..466838c 100644 --- a/src/source/publishing/mqtt_publisher.cr +++ b/src/source/publishing/mqtt_publisher.cr @@ -85,6 +85,9 @@ module PlaceOS::Source password: broker.password, ) + # Small delay to ensure connection is fully established + sleep 100.milliseconds + close_channel = Channel(Nil).new(1) repeating_task = Tasker.every((keep_alive // 3).seconds) do diff --git a/src/source/status_events.cr b/src/source/status_events.cr index c8abe0e..83fdaba 100644 --- a/src/source/status_events.cr +++ b/src/source/status_events.cr @@ -22,6 +22,7 @@ module PlaceOS::Source private getter publisher_managers : Array(PublisherManager) private property? stopped : Bool = true + private property? initial_sync_complete : Bool = false private getter sync_lock = Mutex.new(:reentrant) @@ -95,6 +96,46 @@ module PlaceOS::Source modules: mods_mapped.to_s, values: status_updated.to_s, } } + self.initial_sync_complete = true + end + + # Trigger a state resync - useful when new brokers are added + def resync_state + return unless initial_sync_complete? + + Log.info { "resyncing state for new broker connection" } + + mods_mapped = 0_u64 + status_updated = 0_u64 + pattern = "broker_resync" + + PlaceOS::Model::Module.order(id: :asc).all.in_groups_of(64, reuse: true) do |modules| + modules.each do |mod| + next unless mod + mods_mapped += 1_u64 + module_id = mod.id.to_s + store = PlaceOS::Driver::RedisStorage.new(module_id) + store.each do |key, value| + status_updated += 1_u64 + add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc}) + end + + # Backpressure if event container is growing too fast + if event_container.size >= MAX_CONTAINER_SIZE / 2 + until event_container.size < MAX_CONTAINER_SIZE / 4 + sleep 10.milliseconds + end + end + rescue error + Log.warn(exception: error) { "error resyncing #{mod.try(&.id)}" } + end + end + + Log.info { { + message: "state resync complete", + modules: mods_mapped.to_s, + values: status_updated.to_s, + } } end protected def handle_pevent(pattern : String, channel : String, payload : String)