From 8c8058e752d13a2dd26463bbc4faba5a0a5225a6 Mon Sep 17 00:00:00 2001 From: Son Dinh Date: Fri, 27 Jun 2025 16:10:58 -0500 Subject: [PATCH 01/13] Update selection algo to follow spec --- .../common/ControllerSelector.cpp | 60 +++++++++++++++---- .../common/ControllerSelector.h | 6 +- .../common/TimerHandler.h | 7 +++ 3 files changed, 59 insertions(+), 14 deletions(-) diff --git a/tactical-microgrid-standard/common/ControllerSelector.cpp b/tactical-microgrid-standard/common/ControllerSelector.cpp index dbab885..be9b21b 100644 --- a/tactical-microgrid-standard/common/ControllerSelector.cpp +++ b/tactical-microgrid-standard/common/ControllerSelector.cpp @@ -2,6 +2,8 @@ void ControllerSelector::got_heartbeat(const tms::Heartbeat& hb) { + std::cout << "================== ControllerSelector::got_heartbeat..." << std::endl; + std::cout << "Received heartbeat from MC: " << hb.deviceId() << std::endl; Guard g(lock_); auto it = all_controllers_.find(hb.deviceId()); if (it != all_controllers_.end()) { @@ -9,19 +11,25 @@ void ControllerSelector::got_heartbeat(const tms::Heartbeat& hb) cancel(); if (selected_.empty()) { + std::cout << "No Active MC selected yet." << std::endl; if (!this->get_timer()->active()) { - schedule_once(NewController{hb.deviceId()}, new_controller_delay); + std::cout << "Scheduling a New MC timer to decide a new active MC..." << std::endl; + schedule_once(NewController{hb.deviceId()}, new_active_controller_delay); } } else if (is_selected(hb.deviceId())) { + std::cout << "Received heartbeat is from the current active MC" << std::endl; cancel(); if (this->get_timer()->active()) { + std::cout << "A MissedController timer was scheduled and active. Rescheduling it..." << std::endl; reschedule(); } else { + std::cout << "Schedule a new MissedController" << std::endl; // MissedController was triggered, so we need to schedule it again. - schedule_once(MissedController{}, missed_controller_delay); + schedule_once(MissedController{}, heartbeat_deadline); } } } + std::cout << "..... ======================================\n" << std::endl; } void ControllerSelector::got_device_info(const tms::DeviceInfo& di) @@ -39,19 +47,49 @@ void ControllerSelector::got_device_info(const tms::DeviceInfo& di) void ControllerSelector::timer_fired(Timer& timer) { Guard g(lock_); - const auto& id = timer.arg.id; + const auto& mc_id = timer.arg.id; ACE_DEBUG((LM_INFO, "(%P|%t) INFO: ControllerSelector::timed_event(NewController): " - "\"%C\" -> \"%C\"\n", selected_.c_str(), id.c_str())); - select(id); + "\"%C\" -> \"%C\"\n", selected_.c_str(), mc_id.c_str())); + + // The TMS spec isn't clear to whether the device needs to verify that the last + // heartbeat of this controller was received less than 3s (i.e., heartbeat deadline) ago. + // This check makes sense since if its last heartbeat was more than 3s ago, that means + // the controller is not available and should not be selected as the active controller. + const TimePoint now = Clock::now(); + auto it = all_controllers_.find(mc_id); + if (it == all_controllers_.end()) { + ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ControllerSelector::timed_event(NewController): Controller \"%C\" not found!\n", + mc_id.c_str())); + return; + } + + if (now - it->second < heartbeat_deadline) { + selected_ = mc_id; + // TODO: Send ActiveMicrogridControllerState + } } -void ControllerSelector::timer_fired(Timer&) +void ControllerSelector::timer_fired(Timer& timer) { Guard g(lock_); + const auto& timer_id = timer.id; ACE_DEBUG((LM_INFO, "(%P|%t) INFO: ControllerSelector::timed_event(MissedController): " - "\"%C\"\n", selected_.c_str())); - schedule_once(LostController{}, lost_controller_delay); - schedule_once(NoControllers{}, no_controllers_delay); + "\"%C\". Timer id: %d\n", selected_.c_str(), timer_id)); + schedule_once(LostController{}, lost_active_controller_delay); + + // Start a No MC timer if the device has missed heartbeats from all MCs + const TimePoint now = Clock::now(); + bool no_avail_mc = true; + for (const auto& pair : all_controllers_) { + if (now - pair.second < heartbeat_deadline) { + no_avail_mc = false; + break; + } + } + + if (no_avail_mc) { + schedule_once(NoControllers{}, no_controllers_delay); + } } void ControllerSelector::timer_fired(Timer&) @@ -66,7 +104,7 @@ void ControllerSelector::timer_fired(Timer&) const TimePoint now = Clock::now(); for (auto it = all_controllers_.begin(); it != all_controllers_.end(); ++it) { const auto last_hb = now - it->second; - if (last_hb < missed_controller_delay) { + if (last_hb < heartbeat_deadline) { select(it->first, std::chrono::duration_cast(last_hb)); break; } @@ -84,6 +122,6 @@ void ControllerSelector::select(const tms::Identity& id, Sec last_hb) { ACE_DEBUG((LM_INFO, "(%P|%t) INFO: ControllerSelector::select: \"%C\"\n", id.c_str())); selected_ = id; - schedule_once(MissedController{}, missed_controller_delay - last_hb); + schedule_once(MissedController{}, heartbeat_deadline - last_hb); // TODO: Send ActiveMicrogridControllerState } diff --git a/tactical-microgrid-standard/common/ControllerSelector.h b/tactical-microgrid-standard/common/ControllerSelector.h index 362d25e..d39c0ff 100644 --- a/tactical-microgrid-standard/common/ControllerSelector.h +++ b/tactical-microgrid-standard/common/ControllerSelector.h @@ -63,9 +63,9 @@ class OpenDDS_TMS_Export ControllerSelector : } private: - static constexpr Sec new_controller_delay = Sec(3); - static constexpr Sec missed_controller_delay = Sec(3); - static constexpr Sec lost_controller_delay = Sec(6); + static constexpr Sec heartbeat_deadline = Sec(3); + static constexpr Sec new_active_controller_delay = Sec(3); + static constexpr Sec lost_active_controller_delay = Sec(6); static constexpr Sec no_controllers_delay = Sec(10); void timer_fired(Timer& timer); diff --git a/tactical-microgrid-standard/common/TimerHandler.h b/tactical-microgrid-standard/common/TimerHandler.h index 2b3bbec..0ffe99d 100644 --- a/tactical-microgrid-standard/common/TimerHandler.h +++ b/tactical-microgrid-standard/common/TimerHandler.h @@ -13,6 +13,7 @@ #include #include #include +#include using Sec = std::chrono::seconds; using Clock = std::chrono::system_clock; @@ -109,6 +110,8 @@ class TimerHandler : public ACE_Event_Handler, protected TimerHolder assert_inactive(timer); const TimerId id = reactor_->schedule_timer( this, &timer->id, ACE_Time_Value(timer->delay), ACE_Time_Value(timer->period)); + std::cout << "TimerHandler::schedule(): timer id = " << id << ", name = " << timer->name + << ", delay = " << timer->delay.count() << ", period = " << timer->period.count() << std::endl; timer->id = id; active_timers_[id] = timer; } @@ -188,14 +191,18 @@ class TimerHandler : public ACE_Event_Handler, protected TimerHolder auto timer = active_timers_[*reinterpret_cast(arg)]; any_timer_fired(timer); bool exit_after = false; + int ret = 0; std::visit([&](auto&& value) { if (!value->period.count()) { using EventType = typename std::remove_reference_t::element_type::Arg; timer_wont_run(value); + ret = -1; } exit_after = value->exit_after; }, timer); return end_event_loop(exit_after); + //end_event_loop(exit_after); + //return ret; } protected: From de19e872d60e1bf3a0fad2075079900a3c38a7e5 Mon Sep 17 00:00:00 2001 From: Son Dinh Date: Tue, 1 Jul 2025 11:48:05 -0500 Subject: [PATCH 02/13] Timer hash seems working --- .../common/ControllerSelector.cpp | 52 +++++++++++++------ .../common/ControllerSelector.h | 21 ++++++-- .../common/TimerHandler.h | 15 +++--- .../power_devices/PowerDevice.h | 15 +++++- .../power_devices/Source.cpp | 10 ++-- 5 files changed, 79 insertions(+), 34 deletions(-) diff --git a/tactical-microgrid-standard/common/ControllerSelector.cpp b/tactical-microgrid-standard/common/ControllerSelector.cpp index be9b21b..fe6d5e4 100644 --- a/tactical-microgrid-standard/common/ControllerSelector.cpp +++ b/tactical-microgrid-standard/common/ControllerSelector.cpp @@ -1,9 +1,37 @@ #include "ControllerSelector.h" +#include +#include +#include + +// If caller passes a non-null reactor, use it unmodified. +// Otherwise, create another reactor for this class separated from the one for Handshaking. +ControllerSelector::ControllerSelector(ACE_Reactor* reactor) + : TimerHandler(reactor) +{ + if (!reactor) { + reactor_ = new ACE_Reactor; + + // We had an issue with using ACE_Reactor's default timer queue, which is + // ACE_Timer_Heap, when the rate of timer creation and cancellation is high + // for detecting missed heartbeat deadline from microgrid controllers. + // ACE_Timer_Hash seems working okay. + timer_queue_ = new ACE_Timer_Hash; + reactor_->timer_queue(timer_queue_); + own_reactor_ = true; + } +} + +ControllerSelector::~ControllerSelector() +{ + if (own_reactor_) { + delete timer_queue_; + delete reactor_; + } +} + void ControllerSelector::got_heartbeat(const tms::Heartbeat& hb) { - std::cout << "================== ControllerSelector::got_heartbeat..." << std::endl; - std::cout << "Received heartbeat from MC: " << hb.deviceId() << std::endl; Guard g(lock_); auto it = all_controllers_.find(hb.deviceId()); if (it != all_controllers_.end()) { @@ -11,25 +39,19 @@ void ControllerSelector::got_heartbeat(const tms::Heartbeat& hb) cancel(); if (selected_.empty()) { - std::cout << "No Active MC selected yet." << std::endl; if (!this->get_timer()->active()) { - std::cout << "Scheduling a New MC timer to decide a new active MC..." << std::endl; schedule_once(NewController{hb.deviceId()}, new_active_controller_delay); } } else if (is_selected(hb.deviceId())) { - std::cout << "Received heartbeat is from the current active MC" << std::endl; cancel(); - if (this->get_timer()->active()) { - std::cout << "A MissedController timer was scheduled and active. Rescheduling it..." << std::endl; - reschedule(); + if (this->get_timer()->active()) { + reschedule(); } else { - std::cout << "Schedule a new MissedController" << std::endl; - // MissedController was triggered, so we need to schedule it again. - schedule_once(MissedController{}, heartbeat_deadline); + // MissedHeartbeat was triggered, so we need to schedule it again. + schedule_once(MissedHeartbeat{}, heartbeat_deadline); } } } - std::cout << "..... ======================================\n" << std::endl; } void ControllerSelector::got_device_info(const tms::DeviceInfo& di) @@ -69,11 +91,11 @@ void ControllerSelector::timer_fired(Timer& timer) } } -void ControllerSelector::timer_fired(Timer& timer) +void ControllerSelector::timer_fired(Timer& timer) { Guard g(lock_); const auto& timer_id = timer.id; - ACE_DEBUG((LM_INFO, "(%P|%t) INFO: ControllerSelector::timed_event(MissedController): " + ACE_DEBUG((LM_INFO, "(%P|%t) INFO: ControllerSelector::timed_event(MissedHeartbeat): " "\"%C\". Timer id: %d\n", selected_.c_str(), timer_id)); schedule_once(LostController{}, lost_active_controller_delay); @@ -122,6 +144,6 @@ void ControllerSelector::select(const tms::Identity& id, Sec last_hb) { ACE_DEBUG((LM_INFO, "(%P|%t) INFO: ControllerSelector::select: \"%C\"\n", id.c_str())); selected_ = id; - schedule_once(MissedController{}, heartbeat_deadline - last_hb); + schedule_once(MissedHeartbeat{}, heartbeat_deadline - last_hb); // TODO: Send ActiveMicrogridControllerState } diff --git a/tactical-microgrid-standard/common/ControllerSelector.h b/tactical-microgrid-standard/common/ControllerSelector.h index d39c0ff..7aa44cf 100644 --- a/tactical-microgrid-standard/common/ControllerSelector.h +++ b/tactical-microgrid-standard/common/ControllerSelector.h @@ -11,7 +11,7 @@ struct NewController { tms::Identity id; }; -struct MissedController {}; +struct MissedHeartbeat {}; struct LostController {}; struct NoControllers {}; @@ -30,7 +30,7 @@ class PowerDevice; * | | [S] | * | [6s] | | * | | V V - * +-[A,R]->MissedController<-[3s]-select()->{ActiveMicrogridControllerState} + * +-[A,R]->MissedHeartbeat<-[3s]-select()->{ActiveMicrogridControllerState} * | | * | [10s] * | | @@ -45,8 +45,11 @@ class PowerDevice; * S: If there's a selectable controller with a recent heartbeat */ class OpenDDS_TMS_Export ControllerSelector : - public TimerHandler { + public TimerHandler { public: + explicit ControllerSelector(ACE_Reactor* reactor = nullptr); + ~ControllerSelector(); + void got_heartbeat(const tms::Heartbeat& hb); void got_device_info(const tms::DeviceInfo& di); @@ -62,14 +65,24 @@ class OpenDDS_TMS_Export ControllerSelector : return selected_ == id; } + ACE_Reactor* const get_reactor() const + { + Guard g(lock_); + return reactor_; + } + private: + // Allow using non-default timer queue + ACE_Timer_Queue* timer_queue_ = nullptr; + bool own_reactor_ = false; + static constexpr Sec heartbeat_deadline = Sec(3); static constexpr Sec new_active_controller_delay = Sec(3); static constexpr Sec lost_active_controller_delay = Sec(6); static constexpr Sec no_controllers_delay = Sec(10); void timer_fired(Timer& timer); - void timer_fired(Timer&); + void timer_fired(Timer&); void timer_fired(Timer&); void timer_fired(Timer&); void any_timer_fired(AnyTimer timer) diff --git a/tactical-microgrid-standard/common/TimerHandler.h b/tactical-microgrid-standard/common/TimerHandler.h index 0ffe99d..746bed0 100644 --- a/tactical-microgrid-standard/common/TimerHandler.h +++ b/tactical-microgrid-standard/common/TimerHandler.h @@ -110,8 +110,6 @@ class TimerHandler : public ACE_Event_Handler, protected TimerHolder assert_inactive(timer); const TimerId id = reactor_->schedule_timer( this, &timer->id, ACE_Time_Value(timer->delay), ACE_Time_Value(timer->period)); - std::cout << "TimerHandler::schedule(): timer id = " << id << ", name = " << timer->name - << ", delay = " << timer->delay.count() << ", period = " << timer->period.count() << std::endl; timer->id = id; active_timers_[id] = timer; } @@ -188,26 +186,27 @@ class TimerHandler : public ACE_Event_Handler, protected TimerHolder int handle_timeout(const ACE_Time_Value&, const void* arg) { Guard g(lock_); - auto timer = active_timers_[*reinterpret_cast(arg)]; + auto timer_id = *reinterpret_cast(arg); + if (active_timers_.count(timer_id) == 0) { + ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: TimerHandler::handle_timeout: timer id %d does NOT exist\n", + timer_id)); + } + auto timer = active_timers_[timer_id]; any_timer_fired(timer); bool exit_after = false; - int ret = 0; std::visit([&](auto&& value) { if (!value->period.count()) { using EventType = typename std::remove_reference_t::element_type::Arg; timer_wont_run(value); - ret = -1; } exit_after = value->exit_after; }, timer); return end_event_loop(exit_after); - //end_event_loop(exit_after); - //return ret; } protected: mutable Mutex lock_; - ACE_Reactor* const reactor_; + ACE_Reactor* reactor_; int end_event_loop(bool yes = true) { diff --git a/tactical-microgrid-standard/power_devices/PowerDevice.h b/tactical-microgrid-standard/power_devices/PowerDevice.h index 59bb258..7f5ecca 100644 --- a/tactical-microgrid-standard/power_devices/PowerDevice.h +++ b/tactical-microgrid-standard/power_devices/PowerDevice.h @@ -26,7 +26,7 @@ class PowerSim_Idl_Export PowerDevice : public Handshaking { virtual int run() { - return reactor_->run_reactor_event_loop() == 0 ? 0 : 1; + return run_i(); } powersim::ConnectedDeviceSeq connected_devices_in() const @@ -53,6 +53,19 @@ class PowerSim_Idl_Export PowerDevice : public Handshaking { } protected: + virtual int run_i() + { + if (controller_selector_.reactor() == reactor_) { + // Same reactor instance for both handshaking and controller selection + return reactor_->run_reactor_event_loop() == 0 ? 0 : 1; + } + + std::thread handshaking_thr([&] { reactor_->run_reactor_event_loop(); }); + const int ret = controller_selector_.get_reactor()->run_reactor_event_loop() == 0 ? 0 : 1; + handshaking_thr.join(); + return ret; + } + // Concrete power device should override this function depending on their role. virtual tms::DeviceInfo populate_device_info() const; diff --git a/tactical-microgrid-standard/power_devices/Source.cpp b/tactical-microgrid-standard/power_devices/Source.cpp index 79eb55c..97ca710 100644 --- a/tactical-microgrid-standard/power_devices/Source.cpp +++ b/tactical-microgrid-standard/power_devices/Source.cpp @@ -241,12 +241,10 @@ class SourceDevice : public PowerDevice { int run() override { - std::thread thr(&SourceDevice::simulate_power_flow, this); - if (reactor_->run_reactor_event_loop() != 0) { - return 1; - } - thr.join(); - return 0; + std::thread sim_thr(&SourceDevice::simulate_power_flow, this); + const int ret = run_i(); + sim_thr.join(); + return ret; } tms::ReplyDataWriter_var reply_dw() const From c076b390acb002168db65db59ef942573d231a0a Mon Sep 17 00:00:00 2001 From: Son Dinh Date: Tue, 1 Jul 2025 12:07:29 -0500 Subject: [PATCH 03/13] Missed thread header --- tactical-microgrid-standard/power_devices/PowerDevice.h | 1 + 1 file changed, 1 insertion(+) diff --git a/tactical-microgrid-standard/power_devices/PowerDevice.h b/tactical-microgrid-standard/power_devices/PowerDevice.h index 7f5ecca..d396dbc 100644 --- a/tactical-microgrid-standard/power_devices/PowerDevice.h +++ b/tactical-microgrid-standard/power_devices/PowerDevice.h @@ -7,6 +7,7 @@ #include "PowerSim_Idl_export.h" #include +#include class PowerSim_Idl_Export PowerDevice : public Handshaking { public: From eb4dc00aca3014a1b76cb2728a1200ffdf9d2a4c Mon Sep 17 00:00:00 2001 From: Son Dinh Date: Tue, 1 Jul 2025 17:53:22 -0500 Subject: [PATCH 04/13] Add handling for ActiveMicrogridControllerState topic --- tactical-microgrid-standard/CMakeLists.txt | 1 + ...dControllerStateDataReaderListenerImpl.cpp | 23 ++++++++ ...ridControllerStateDataReaderListenerImpl.h | 21 ++++++++ tactical-microgrid-standard/cli/CLIClient.cpp | 54 +++++++++++++++++++ tactical-microgrid-standard/cli/CLIClient.h | 13 ++++- .../common/ControllerSelector.cpp | 21 ++++++-- .../common/ControllerSelector.h | 14 ++++- .../common/QosHelper.cpp | 6 ++- .../power_devices/PowerDevice.cpp | 51 ++++++++++++++++++ .../power_devices/PowerDevice.h | 1 + 10 files changed, 197 insertions(+), 8 deletions(-) create mode 100644 tactical-microgrid-standard/cli/ActiveMicrogridControllerStateDataReaderListenerImpl.cpp create mode 100644 tactical-microgrid-standard/cli/ActiveMicrogridControllerStateDataReaderListenerImpl.h diff --git a/tactical-microgrid-standard/CMakeLists.txt b/tactical-microgrid-standard/CMakeLists.txt index 022c070..f1033b4 100644 --- a/tactical-microgrid-standard/CMakeLists.txt +++ b/tactical-microgrid-standard/CMakeLists.txt @@ -63,6 +63,7 @@ target_link_libraries(Controller PRIVATE Commands_Idl PowerSim_Idl) add_executable(CLI cli/main.cpp cli/CLIClient.cpp + cli/ActiveMicrogridControllerStateDataReaderListenerImpl.cpp ) target_include_directories(CLI PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) target_link_libraries(CLI PRIVATE Commands_Idl PowerSim_Idl) diff --git a/tactical-microgrid-standard/cli/ActiveMicrogridControllerStateDataReaderListenerImpl.cpp b/tactical-microgrid-standard/cli/ActiveMicrogridControllerStateDataReaderListenerImpl.cpp new file mode 100644 index 0000000..c18d8da --- /dev/null +++ b/tactical-microgrid-standard/cli/ActiveMicrogridControllerStateDataReaderListenerImpl.cpp @@ -0,0 +1,23 @@ +#include "ActiveMicrogridControllerStateDataReaderListenerImpl.h" + +void ActiveMicrogridControllerStateDataReaderListenerImpl::on_data_available(DDS::DataReader_ptr reader) +{ + tms::ActiveMicrogridControllerStateSeq data; + DDS::SampleInfoSeq info_seq; + tms::ActiveMicrogridControllerStateDataReader_var typed_reader = tms::ActiveMicroogridControllerStateDataReader::_narrow(reader); + DDS::ReturnCode_t rc = typed_reader->take(data, info_seq, DDS::LENGTH_UNLIMITED, + DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE); + if (rc != DDS::RETCODE_OK) { + ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: ActiveMicrogridControllerStateDataReaderListenerImpl::on_data_available: " + "take data failed: %C\n", OpenDDS::DCPS::retcode_to_string(rc))); + return; + } + + for (CORBA::ULong i = 0; i < data.length(); ++i) { + if (info_seq[i].valid_data) { + const tms::Identity& device_id = data[i].deviceId(); + auto master_id = data[i].masterId(); + cli_client_.set_active_controller(device_id, master_id); + } + } +} diff --git a/tactical-microgrid-standard/cli/ActiveMicrogridControllerStateDataReaderListenerImpl.h b/tactical-microgrid-standard/cli/ActiveMicrogridControllerStateDataReaderListenerImpl.h new file mode 100644 index 0000000..fc65746 --- /dev/null +++ b/tactical-microgrid-standard/cli/ActiveMicrogridControllerStateDataReaderListenerImpl.h @@ -0,0 +1,21 @@ +#ifndef CLI_ACTIVE_MICROGRID_CONTROLLER_STATE_DATA_READER_LISTENER_IMPL_H +#define CLI_ACTIVE_MICROGRID_CONTROLLER_STATE_DATA_READER_LISTENER_IMPL_H + +#include "common/DataReaderListenerBase.h" +#include "CLIClient.h" + +class ActiveMicrogridControllerStateDataReaderListenerImpl : public DataReaderListenerBase { +public: + explicit ActiveMicrogridControllerStateDataReaderListenerImpl(CLIClient& cli_client) + : DataReaderListenerBase("tms::ActiveMicrogridControllerState - DataReaderListenerImpl") + , cli_client_(cli_client) {} + + virtual ~ActiveMicrogridControllerStateDataReaderListenerImpl() = default; + + void on_data_available(DDS::DataReader_ptr reader) final; + +private: + CLIClient& cli_client_; +}; + +#endif diff --git a/tactical-microgrid-standard/cli/CLIClient.cpp b/tactical-microgrid-standard/cli/CLIClient.cpp index 9a36cbc..e3f96b1 100644 --- a/tactical-microgrid-standard/cli/CLIClient.cpp +++ b/tactical-microgrid-standard/cli/CLIClient.cpp @@ -1,6 +1,7 @@ #include "CLIClient.h" #include "common/QosHelper.h" #include "common/Utils.h" +#include "ActiveMicrogridControllerStateDataReaderListenerImpl.h" #include #include @@ -90,6 +91,46 @@ DDS::ReturnCode_t CLIClient::init_tms(DDS::DomainId_t domain_id, int argc, char* return DDS::RETCODE_ERROR; } + // Subscribe to the tms::ActiveMicrogridControllerState topic + tms::ActiveMicrogridControllerStateTypeSupport_var amcs_ts = new tms::ActiveMicrogridControllerStateTypeSupportImpl; + if (DDS::RETCODE_OK != amcs_ts->register_type(dp, "")) { + ACE_ERROR((LM_ERROR, "(%P|%t) CLIClient::init: register_type ActiveMicrogridControllerState failed\n")); + return DDS::RETCODE_ERROR; + } + + CORBA::String_var amcs_type_name = amcs_ts->get_type_name(); + DDS::Topic_var amcs_topic = dp->create_topic(tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE.c_str(), + amcs_type_name, + TOPIC_QOS_DEFAULT, + nullptr, + ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!amcs_topic) { + ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: CLIClient::init: create_topic \"%C\" failed\n", + tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE.c_str())); + return DDS::RETCODE_ERROR; + } + + const DDS::SubscriberQos tms_sub_qos = Qos::Subscriber::get_qos(); + DDS::Subscriber_var tms_sub = dp->create_subscriber(tms_sub_qos, + nullptr, + ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!tms_sub) { + ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: CLIClient::init: create_subscriber with TMS QoS failed\n")); + return DDS::RETCODE_ERROR; + } + + const DDS::DataReaderQos& amcs_dr_qos = Qos::DataReader::fn_map.at(tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE)(device_id); + DDS::DataReaderListener_var amcs_listener(new ActiveMicrogridControllerStateDataReaderListenerImpl(*this)); + DDS::DataReader_var amcs_dr_base = tms_sub->create_datareader(amcs_topic, + amcs_dr_qos, + amcs_listener, + ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!amcs_dr_base) { + ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: CLIClient::init: create_datareader for topic \"%C\" failed\n", + tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE.c_str())); + return DDS::RETCODE_ERROR; + } + return DDS::RETCODE_OK; } @@ -328,6 +369,19 @@ void CLIClient::run() thr.join(); } +void CLIClient::set_active_controller(const tms::Identity& device_id, + const OPENDDS_OPTIONAL_NS::optional& master_id) +{ + std::lock_guard guard(active_controller_m_); + if (master_id.has_value()) { + active_controllers_[device_id] = master_id.value(); + } + else { + // The device has lost its active controller or hasn't selected one yet. + active_controllers_[device_id] = ""; + } +} + void CLIClient::tolower(std::string& s) const { for (size_t i = 0; i < s.size(); ++i) { diff --git a/tactical-microgrid-standard/cli/CLIClient.h b/tactical-microgrid-standard/cli/CLIClient.h index 337ddba..849a9bf 100644 --- a/tactical-microgrid-standard/cli/CLIClient.h +++ b/tactical-microgrid-standard/cli/CLIClient.h @@ -1,5 +1,5 @@ -#ifndef CONTROLLER_CLI_CLIENT_H -#define CONTROLLER_CLI_CLIENT_H +#ifndef CLI_CLI_CLIENT_H +#define CLI_CLI_CLIENT_H #include "common/Handshaking.h" #include "controller/Common.h" @@ -36,8 +36,11 @@ class CLIClient : public TimerHandler { ~CLIClient() {} DDS::ReturnCode_t init(DDS::DomainId_t domain_id, int argc = 0, char* argv[] = nullptr); + void run(); + void set_active_controller(const tms::Identity& device_id, const OPENDDS_OPTIONAL_NS::optional& master_id); + private: // Initialize DDS entities in the TMS domain DDS::ReturnCode_t init_tms(DDS::DomainId_t tms_domain_id, int argc = 0, char* argv[] = nullptr); @@ -127,6 +130,12 @@ class CLIClient : public TimerHandler { // The current microgrid controller with which the CLI client is interacting tms::Identity curr_controller_; + + mutable std::mutex active_controller_m_; + + // Active controller selected by each power device (power device => its controller). + // Can be used to check that all power devices will eventually select the same active controller. + std::map active_controllers_; }; #endif diff --git a/tactical-microgrid-standard/common/ControllerSelector.cpp b/tactical-microgrid-standard/common/ControllerSelector.cpp index fe6d5e4..9e35a1c 100644 --- a/tactical-microgrid-standard/common/ControllerSelector.cpp +++ b/tactical-microgrid-standard/common/ControllerSelector.cpp @@ -6,8 +6,9 @@ // If caller passes a non-null reactor, use it unmodified. // Otherwise, create another reactor for this class separated from the one for Handshaking. -ControllerSelector::ControllerSelector(ACE_Reactor* reactor) +ControllerSelector::ControllerSelector(const tms::Identity& device_id, ACE_Reactor* reactor) : TimerHandler(reactor) + , device_id_(device_id) { if (!reactor) { reactor_ = new ACE_Reactor; @@ -87,7 +88,7 @@ void ControllerSelector::timer_fired(Timer& timer) if (now - it->second < heartbeat_deadline) { selected_ = mc_id; - // TODO: Send ActiveMicrogridControllerState + send_controller_state(); } } @@ -144,6 +145,20 @@ void ControllerSelector::select(const tms::Identity& id, Sec last_hb) { ACE_DEBUG((LM_INFO, "(%P|%t) INFO: ControllerSelector::select: \"%C\"\n", id.c_str())); selected_ = id; + send_controller_state(); schedule_once(MissedHeartbeat{}, heartbeat_deadline - last_hb); - // TODO: Send ActiveMicrogridControllerState +} + +void ControllerSelector::send_controller_state() +{ + tms::ActiveMicrogridControllerState amcs; + amcs.deviceId() = device_id_; + if (!selected_.empty()) { + amcs.masterId() = selected_; + } + + const DDS::ReturnCode_t rc = amcs_dw_->write(amcs, DDS::HANDLE_NIL); + if (rc != DDS::RETCODE_OK) { + ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: ControllerSelector::send_controller_state: write ActiveMicrogridControllerState failed\n")); + } } diff --git a/tactical-microgrid-standard/common/ControllerSelector.h b/tactical-microgrid-standard/common/ControllerSelector.h index 7aa44cf..64153de 100644 --- a/tactical-microgrid-standard/common/ControllerSelector.h +++ b/tactical-microgrid-standard/common/ControllerSelector.h @@ -47,7 +47,7 @@ class PowerDevice; class OpenDDS_TMS_Export ControllerSelector : public TimerHandler { public: - explicit ControllerSelector(ACE_Reactor* reactor = nullptr); + explicit ControllerSelector(const tms::Identity& device_id, ACE_Reactor* reactor = nullptr); ~ControllerSelector(); void got_heartbeat(const tms::Heartbeat& hb); @@ -71,6 +71,11 @@ class OpenDDS_TMS_Export ControllerSelector : return reactor_; } + void set_ActiveMicrogridControllerState_writer(tms::ActiveMicrogridControllerStateDataWriter_var amcs_dw) + { + amcs_dw_ = amcs_dw; + } + private: // Allow using non-default timer queue ACE_Timer_Queue* timer_queue_ = nullptr; @@ -92,8 +97,15 @@ class OpenDDS_TMS_Export ControllerSelector : void select(const tms::Identity& id, Sec last_hb = Sec(0)); + void send_controller_state(); + tms::Identity selected_; std::map all_controllers_; + + // Device ID to which this controller selector belong. + tms::Identity device_id_; + + tms::ActiveMicrogridControllerStateDataWriter_var amcs_dw_; }; #endif diff --git a/tactical-microgrid-standard/common/QosHelper.cpp b/tactical-microgrid-standard/common/QosHelper.cpp index f3a2d81..32bf7cf 100644 --- a/tactical-microgrid-standard/common/QosHelper.cpp +++ b/tactical-microgrid-standard/common/QosHelper.cpp @@ -18,7 +18,8 @@ const FnMap fn_map = { {tms::topic::TOPIC_DEVICE_INFO, get_PublishLast}, {tms::topic::TOPIC_OPERATOR_INTENT_REQUEST, get_Command}, {tms::topic::TOPIC_ENERGY_START_STOP_REQUEST, get_Command}, - {tms::topic::TOPIC_REPLY, get_Reply}}; + {tms::topic::TOPIC_REPLY, get_Reply}, + {tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE, get_PublishLast} }; } namespace DataWriter { @@ -28,7 +29,8 @@ const FnMap fn_map = { {tms::topic::TOPIC_DEVICE_INFO, get_PublishLast}, {tms::topic::TOPIC_OPERATOR_INTENT_REQUEST, get_Command}, {tms::topic::TOPIC_ENERGY_START_STOP_REQUEST, get_Command}, - {tms::topic::TOPIC_REPLY, get_Reply}}; + {tms::topic::TOPIC_REPLY, get_Reply}, + {tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE, get_PublishLast} }; } } diff --git a/tactical-microgrid-standard/power_devices/PowerDevice.cpp b/tactical-microgrid-standard/power_devices/PowerDevice.cpp index 2f16c8d..10d4bf2 100644 --- a/tactical-microgrid-standard/power_devices/PowerDevice.cpp +++ b/tactical-microgrid-standard/power_devices/PowerDevice.cpp @@ -1,6 +1,7 @@ #include "PowerDevice.h" #include "PowerConnectionDataReaderListenerImpl.h" #include "common/Utils.h" +#include "common/QosHelper.h" #include @@ -23,6 +24,56 @@ DDS::ReturnCode_t PowerDevice::init(DDS::DomainId_t domain, int argc, char* argv return rc; } + DDS::DomainParticipant_var dp = get_domain_participant(); + + // Publish to the tms::ActiveMicrogridControllerState topic + tms::ActiveMicrogridControllerStateTypeSupport_var amcs_ts = new tms::ActiveMicrogridControllerStateTypeSupportImpl(); + rc = amcs_ts->register_type(participant_, ""); + if (DDS::RETCODE_OK != rc) { + ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: PowerDevice::init: register_type for ActiveMicrogridControllerState failed\n")); + return rc; + } + + CORBA::String_var amcs_type_name = amcs_ts->get_type_name(); + DDS::Topic_var amcs_topic = participant_->create_topic(tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE.c_str(), + amcs_type_name, + TOPIC_QOS_DEFAULT, + nullptr, + ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (amcs_topic) { + ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: PowerDevice::init: create topic '%C' failed\n", + tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE.c_str())); + return DDS::RETCODE_ERROR; + } + + const DDS::PublisherQos tms_pub_qos = Qos::Publisher::get_qos(); + DDS::Publisher_var tms_pub = dp->create_publisher(tms_pub_qos, + nullptr, + ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!tms_pub) { + ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: PowerDevice::init: create_publisher with TMS QoS failed\n")); + return DDS::RETCODE_ERROR; + } + + const DDS::DataWriterQos& amcs_dw_qos = Qos::DataWriter::fn_map.at(tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE)(device_id_); + DDS::DataWriter_var amcs_dw_base = tms_pub->create_datawriter(amcs_topic, + amcs_dw_qos, + nullptr, + ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!amcs_dw_base) { + ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: PowerDevice::init: create_datawriter for topic \"%C\" failed\n", + tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE.c_str())); + return DDS::RETCODE_ERROR; + } + + tms::ActiveMicrogridControllerStateDataWriter_var amcs_dw = tms::ActiveMicrogridControllerStateDataWriter::_narrow(amcs_dw_base); + if (!amcs_dw) { + ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: PowerDevice::init: ActiveMicrogridControllerStateDataWriter narrow failed\n")); + return DDS::RETCODE_ERROR; + } + + controller_selector_.set_ActiveMicrogridControllerState_writer(amcs_dw); + // Subscribe to the PowerConnection topic const DDS::DomainId_t sim_domain_id = Utils::get_sim_domain_id(domain); sim_participant_ = get_participant_factory()->create_participant(sim_domain_id, diff --git a/tactical-microgrid-standard/power_devices/PowerDevice.h b/tactical-microgrid-standard/power_devices/PowerDevice.h index d396dbc..aa8b79e 100644 --- a/tactical-microgrid-standard/power_devices/PowerDevice.h +++ b/tactical-microgrid-standard/power_devices/PowerDevice.h @@ -14,6 +14,7 @@ class PowerSim_Idl_Export PowerDevice : public Handshaking { explicit PowerDevice(const tms::Identity& id, tms::DeviceRole role = tms::DeviceRole::ROLE_SOURCE, bool verbose = false) : Handshaking(id) , verbose_(verbose) + , controller_selector_(id) , role_(role) { } From 49558bcaf205d9d543717d147870909fb3c6b709 Mon Sep 17 00:00:00 2001 From: Son Dinh Date: Tue, 1 Jul 2025 18:15:25 -0500 Subject: [PATCH 05/13] Fix typo --- .../ActiveMicrogridControllerStateDataReaderListenerImpl.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tactical-microgrid-standard/cli/ActiveMicrogridControllerStateDataReaderListenerImpl.cpp b/tactical-microgrid-standard/cli/ActiveMicrogridControllerStateDataReaderListenerImpl.cpp index c18d8da..91fe6dc 100644 --- a/tactical-microgrid-standard/cli/ActiveMicrogridControllerStateDataReaderListenerImpl.cpp +++ b/tactical-microgrid-standard/cli/ActiveMicrogridControllerStateDataReaderListenerImpl.cpp @@ -4,7 +4,7 @@ void ActiveMicrogridControllerStateDataReaderListenerImpl::on_data_available(DDS { tms::ActiveMicrogridControllerStateSeq data; DDS::SampleInfoSeq info_seq; - tms::ActiveMicrogridControllerStateDataReader_var typed_reader = tms::ActiveMicroogridControllerStateDataReader::_narrow(reader); + tms::ActiveMicrogridControllerStateDataReader_var typed_reader = tms::ActiveMicrogridControllerStateDataReader::_narrow(reader); DDS::ReturnCode_t rc = typed_reader->take(data, info_seq, DDS::LENGTH_UNLIMITED, DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE); if (rc != DDS::RETCODE_OK) { From 63ca9688b48e799949b99f537c48f438f102cd55 Mon Sep 17 00:00:00 2001 From: Son Dinh Date: Wed, 2 Jul 2025 11:10:21 -0500 Subject: [PATCH 06/13] Print active controller from CLI client --- tactical-microgrid-standard/cli/CLIClient.cpp | 22 +++++++++++++++---- tactical-microgrid-standard/cli/CLIClient.h | 2 +- .../power_devices/PowerDevice.cpp | 2 +- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/tactical-microgrid-standard/cli/CLIClient.cpp b/tactical-microgrid-standard/cli/CLIClient.cpp index e3f96b1..0a1d63f 100644 --- a/tactical-microgrid-standard/cli/CLIClient.cpp +++ b/tactical-microgrid-standard/cli/CLIClient.cpp @@ -10,6 +10,7 @@ #include #include +#include CLIClient::CLIClient(const tms::Identity& id) : handshaking_(id) @@ -372,7 +373,7 @@ void CLIClient::run() void CLIClient::set_active_controller(const tms::Identity& device_id, const OPENDDS_OPTIONAL_NS::optional& master_id) { - std::lock_guard guard(active_controller_m_); + std::lock_guard guard(active_controllers_m_); if (master_id.has_value()) { active_controllers_[device_id] = master_id.value(); } @@ -469,9 +470,22 @@ void CLIClient::display_power_devices() const std::cout << "Number of Connected Power Devices: " << power_devices_.size() << std::endl; size_t i = 1; for (auto it = power_devices_.begin(); it != power_devices_.end(); ++it) { - std::cout << i << ". Device Id: " << it->first << - ". Type: " << Utils::device_role_to_string(it->second.device_info().role()) << - ". Energy Level: " << energy_level_to_string(it->second.essl()) << std::endl; + std::string selected_controller; + { + std::lock_guard guard(active_controllers_m_); + auto ac_it = active_controllers_.find(it->first); + if (ac_it != active_controllers_.end()) { + selected_controller = "\"" + ac_it->second + "\""; + } else { + selected_controller = "\"Undetermined\""; + } + } + const std::string formated_id = "\"" + it->first + "\""; + std::cout << std::setfill(' ') << std::setw(3) << i++ + << ". Id: " << std::left << std::setw(15) << formated_id + << "| Type: " << std::left << std::setw(18) << Utils::device_role_to_string(it->second.device_info().role()) + << "| Energy Level: " << std::left << std::setw(15) << energy_level_to_string(it->second.essl()) + << "| Active Controller: " << std::left << selected_controller << std::endl; } } diff --git a/tactical-microgrid-standard/cli/CLIClient.h b/tactical-microgrid-standard/cli/CLIClient.h index 849a9bf..32f638a 100644 --- a/tactical-microgrid-standard/cli/CLIClient.h +++ b/tactical-microgrid-standard/cli/CLIClient.h @@ -131,7 +131,7 @@ class CLIClient : public TimerHandler { // The current microgrid controller with which the CLI client is interacting tms::Identity curr_controller_; - mutable std::mutex active_controller_m_; + mutable std::mutex active_controllers_m_; // Active controller selected by each power device (power device => its controller). // Can be used to check that all power devices will eventually select the same active controller. diff --git a/tactical-microgrid-standard/power_devices/PowerDevice.cpp b/tactical-microgrid-standard/power_devices/PowerDevice.cpp index 10d4bf2..74f0e14 100644 --- a/tactical-microgrid-standard/power_devices/PowerDevice.cpp +++ b/tactical-microgrid-standard/power_devices/PowerDevice.cpp @@ -40,7 +40,7 @@ DDS::ReturnCode_t PowerDevice::init(DDS::DomainId_t domain, int argc, char* argv TOPIC_QOS_DEFAULT, nullptr, ::OpenDDS::DCPS::DEFAULT_STATUS_MASK); - if (amcs_topic) { + if (!amcs_topic) { ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: PowerDevice::init: create topic '%C' failed\n", tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE.c_str())); return DDS::RETCODE_ERROR; From 5fedee53e1e00eec2a9ea7b427ce66522cb5cc0f Mon Sep 17 00:00:00 2001 From: Son Dinh Date: Wed, 2 Jul 2025 12:51:49 -0500 Subject: [PATCH 07/13] Update tactical-microgrid-standard/cli/CLIClient.cpp Co-authored-by: Adam Mitz --- tactical-microgrid-standard/cli/CLIClient.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tactical-microgrid-standard/cli/CLIClient.cpp b/tactical-microgrid-standard/cli/CLIClient.cpp index 0a1d63f..628e1c6 100644 --- a/tactical-microgrid-standard/cli/CLIClient.cpp +++ b/tactical-microgrid-standard/cli/CLIClient.cpp @@ -376,8 +376,7 @@ void CLIClient::set_active_controller(const tms::Identity& device_id, std::lock_guard guard(active_controllers_m_); if (master_id.has_value()) { active_controllers_[device_id] = master_id.value(); - } - else { + } else { // The device has lost its active controller or hasn't selected one yet. active_controllers_[device_id] = ""; } From c21840b68199bc76a8b9c57dbbd31eb819aa1b28 Mon Sep 17 00:00:00 2001 From: Son Dinh Date: Wed, 2 Jul 2025 12:52:05 -0500 Subject: [PATCH 08/13] Update tactical-microgrid-standard/common/ControllerSelector.h Co-authored-by: Adam Mitz --- tactical-microgrid-standard/common/ControllerSelector.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tactical-microgrid-standard/common/ControllerSelector.h b/tactical-microgrid-standard/common/ControllerSelector.h index 64153de..d21b747 100644 --- a/tactical-microgrid-standard/common/ControllerSelector.h +++ b/tactical-microgrid-standard/common/ControllerSelector.h @@ -65,7 +65,7 @@ class OpenDDS_TMS_Export ControllerSelector : return selected_ == id; } - ACE_Reactor* const get_reactor() const + ACE_Reactor* get_reactor() const { Guard g(lock_); return reactor_; From 55e1e05cc5e631f8fe250aa8902649308a4bde82 Mon Sep 17 00:00:00 2001 From: Son Dinh Date: Mon, 7 Jul 2025 11:22:52 -0500 Subject: [PATCH 09/13] From review --- tactical-microgrid-standard/cli/CLIClient.cpp | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/tactical-microgrid-standard/cli/CLIClient.cpp b/tactical-microgrid-standard/cli/CLIClient.cpp index 628e1c6..403a98a 100644 --- a/tactical-microgrid-standard/cli/CLIClient.cpp +++ b/tactical-microgrid-standard/cli/CLIClient.cpp @@ -371,15 +371,11 @@ void CLIClient::run() } void CLIClient::set_active_controller(const tms::Identity& device_id, - const OPENDDS_OPTIONAL_NS::optional& master_id) + const std::optional& master_id) { std::lock_guard guard(active_controllers_m_); - if (master_id.has_value()) { - active_controllers_[device_id] = master_id.value(); - } else { - // The device has lost its active controller or hasn't selected one yet. - active_controllers_[device_id] = ""; - } + // The value is absent if the device has lost its active controller or hasn't selected one yet. + active_controllers_[device_id] = master_id.value_or(""); } void CLIClient::tolower(std::string& s) const From ccc4c9cf3871acec89792ee6e2f8ac396d900636 Mon Sep 17 00:00:00 2001 From: Son Dinh Date: Mon, 7 Jul 2025 13:23:27 -0500 Subject: [PATCH 10/13] Manage reactor instance and timer queue in TimerHandler --- .../common/ControllerSelector.cpp | 26 ++----------------- .../common/ControllerSelector.h | 6 +---- .../common/Handshaking.h | 3 ++- .../common/TimerHandler.h | 26 ++++++++++++++++++- 4 files changed, 30 insertions(+), 31 deletions(-) diff --git a/tactical-microgrid-standard/common/ControllerSelector.cpp b/tactical-microgrid-standard/common/ControllerSelector.cpp index 9e35a1c..8510364 100644 --- a/tactical-microgrid-standard/common/ControllerSelector.cpp +++ b/tactical-microgrid-standard/common/ControllerSelector.cpp @@ -1,34 +1,12 @@ #include "ControllerSelector.h" -#include -#include -#include - -// If caller passes a non-null reactor, use it unmodified. -// Otherwise, create another reactor for this class separated from the one for Handshaking. -ControllerSelector::ControllerSelector(const tms::Identity& device_id, ACE_Reactor* reactor) - : TimerHandler(reactor) - , device_id_(device_id) +ControllerSelector::ControllerSelector(const tms::Identity& device_id) + : device_id_(device_id) { - if (!reactor) { - reactor_ = new ACE_Reactor; - - // We had an issue with using ACE_Reactor's default timer queue, which is - // ACE_Timer_Heap, when the rate of timer creation and cancellation is high - // for detecting missed heartbeat deadline from microgrid controllers. - // ACE_Timer_Hash seems working okay. - timer_queue_ = new ACE_Timer_Hash; - reactor_->timer_queue(timer_queue_); - own_reactor_ = true; - } } ControllerSelector::~ControllerSelector() { - if (own_reactor_) { - delete timer_queue_; - delete reactor_; - } } void ControllerSelector::got_heartbeat(const tms::Heartbeat& hb) diff --git a/tactical-microgrid-standard/common/ControllerSelector.h b/tactical-microgrid-standard/common/ControllerSelector.h index d21b747..ccc9563 100644 --- a/tactical-microgrid-standard/common/ControllerSelector.h +++ b/tactical-microgrid-standard/common/ControllerSelector.h @@ -47,7 +47,7 @@ class PowerDevice; class OpenDDS_TMS_Export ControllerSelector : public TimerHandler { public: - explicit ControllerSelector(const tms::Identity& device_id, ACE_Reactor* reactor = nullptr); + explicit ControllerSelector(const tms::Identity& device_id); ~ControllerSelector(); void got_heartbeat(const tms::Heartbeat& hb); @@ -77,10 +77,6 @@ class OpenDDS_TMS_Export ControllerSelector : } private: - // Allow using non-default timer queue - ACE_Timer_Queue* timer_queue_ = nullptr; - bool own_reactor_ = false; - static constexpr Sec heartbeat_deadline = Sec(3); static constexpr Sec new_active_controller_delay = Sec(3); static constexpr Sec lost_active_controller_delay = Sec(6); diff --git a/tactical-microgrid-standard/common/Handshaking.h b/tactical-microgrid-standard/common/Handshaking.h index 1936063..1009086 100644 --- a/tactical-microgrid-standard/common/Handshaking.h +++ b/tactical-microgrid-standard/common/Handshaking.h @@ -13,7 +13,8 @@ class OpenDDS_TMS_Export Handshaking : public TimerHandler { public: explicit Handshaking(const tms::Identity& device_id) - : device_id_(device_id) + : TimerHandler(ACE_Reactor::instance()) + , device_id_(device_id) , seq_num_(0) {} diff --git a/tactical-microgrid-standard/common/TimerHandler.h b/tactical-microgrid-standard/common/TimerHandler.h index 746bed0..ecd0262 100644 --- a/tactical-microgrid-standard/common/TimerHandler.h +++ b/tactical-microgrid-standard/common/TimerHandler.h @@ -15,6 +15,8 @@ #include #include +#include + using Sec = std::chrono::seconds; using Clock = std::chrono::system_clock; using TimePoint = std::chrono::time_point; @@ -79,14 +81,32 @@ class TimerHandler : public ACE_Event_Handler, protected TimerHolder public: using AnyTimer = std::variant::Ptr...>; + // Create a new ACE_Reactor with ACE_Timer_Hash if @a reactor is null. + // Otherwise, use the provided reactor. explicit TimerHandler(ACE_Reactor* reactor = nullptr) - : reactor_(reactor ? reactor : ACE_Reactor::instance()) + : reactor_(reactor) { + if (!reactor) { + reactor_ = new ACE_Reactor; + + // We had an issue with using ACE_Reactor's default timer queue, which is + // ACE_Timer_Heap, when the rate of timer creation and cancellation is high + // for detecting missed heartbeat deadline from microgrid controllers. + // ACE_Timer_Hash seems working okay. + timer_queue_ = new ACE_Timer_Hash; + reactor_->timer_queue(timer_queue_); + own_reactor_ = true; + } } virtual ~TimerHandler() { cancel_all(); + + if (own_reactor_) { + delete timer_queue_; + delete reactor_; + } } template @@ -208,6 +228,10 @@ class TimerHandler : public ACE_Event_Handler, protected TimerHolder mutable Mutex lock_; ACE_Reactor* reactor_; + // Allow using non-default timer queue + ACE_Timer_Queue* timer_queue_ = nullptr; + bool own_reactor_ = false; + int end_event_loop(bool yes = true) { if (yes) { From 66756226bf94a1606100ddacfe11d72c464a8115 Mon Sep 17 00:00:00 2001 From: Son Dinh Date: Mon, 7 Jul 2025 14:40:03 -0500 Subject: [PATCH 11/13] Fix CI --- tactical-microgrid-standard/common/TimerHandler.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tactical-microgrid-standard/common/TimerHandler.h b/tactical-microgrid-standard/common/TimerHandler.h index ecd0262..6797127 100644 --- a/tactical-microgrid-standard/common/TimerHandler.h +++ b/tactical-microgrid-standard/common/TimerHandler.h @@ -4,6 +4,8 @@ #include #include #include +#include +#include #include #include @@ -15,8 +17,6 @@ #include #include -#include - using Sec = std::chrono::seconds; using Clock = std::chrono::system_clock; using TimePoint = std::chrono::time_point; From 8f78c635bfd174ec7642b545b5ab117c452a3073 Mon Sep 17 00:00:00 2001 From: Son Dinh Date: Mon, 7 Jul 2025 17:34:22 -0500 Subject: [PATCH 12/13] From review and fixed a crash when connecting devices --- tactical-microgrid-standard/cli/CLIClient.cpp | 2 +- tactical-microgrid-standard/common/TimerHandler.h | 2 +- tactical-microgrid-standard/power_devices/Load.cpp | 4 +++- tactical-microgrid-standard/power_devices/PowerDevice.cpp | 8 ++++++-- 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/tactical-microgrid-standard/cli/CLIClient.cpp b/tactical-microgrid-standard/cli/CLIClient.cpp index 403a98a..e54e080 100644 --- a/tactical-microgrid-standard/cli/CLIClient.cpp +++ b/tactical-microgrid-standard/cli/CLIClient.cpp @@ -476,7 +476,7 @@ void CLIClient::display_power_devices() const } } const std::string formated_id = "\"" + it->first + "\""; - std::cout << std::setfill(' ') << std::setw(3) << i++ + std::cout << std::right << std::setfill(' ') << std::setw(3) << i++ << ". Id: " << std::left << std::setw(15) << formated_id << "| Type: " << std::left << std::setw(18) << Utils::device_role_to_string(it->second.device_info().role()) << "| Energy Level: " << std::left << std::setw(15) << energy_level_to_string(it->second.essl()) diff --git a/tactical-microgrid-standard/common/TimerHandler.h b/tactical-microgrid-standard/common/TimerHandler.h index 6797127..ead909b 100644 --- a/tactical-microgrid-standard/common/TimerHandler.h +++ b/tactical-microgrid-standard/common/TimerHandler.h @@ -208,7 +208,7 @@ class TimerHandler : public ACE_Event_Handler, protected TimerHolder Guard g(lock_); auto timer_id = *reinterpret_cast(arg); if (active_timers_.count(timer_id) == 0) { - ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: TimerHandler::handle_timeout: timer id %d does NOT exist\n", + ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: TimerHandler::handle_timeout: timer id %q does NOT exist\n", timer_id)); } auto timer = active_timers_[timer_id]; diff --git a/tactical-microgrid-standard/power_devices/Load.cpp b/tactical-microgrid-standard/power_devices/Load.cpp index 0ed44e6..fd3c29e 100644 --- a/tactical-microgrid-standard/power_devices/Load.cpp +++ b/tactical-microgrid-standard/power_devices/Load.cpp @@ -138,7 +138,9 @@ void ElectricCurrentDataReaderListenerImpl::on_data_available(DDS::DataReader_pt const tms::Identity& to = power_path[path_length - 1]; if (from == load_dev_.connected_dev_id() && to == load_dev_.get_device_id()) { - ACE_DEBUG((LM_INFO, "=== Receiving power from \"%C\" -- %f Amps ...\n", from.c_str(), ec.amperage())); + if (load_dev_.verbose()) { + ACE_DEBUG((LM_INFO, "=== Receiving power from \"%C\" -- %f Amps ...\n", from.c_str(), ec.amperage())); + } break; } } diff --git a/tactical-microgrid-standard/power_devices/PowerDevice.cpp b/tactical-microgrid-standard/power_devices/PowerDevice.cpp index 74f0e14..c2879f7 100644 --- a/tactical-microgrid-standard/power_devices/PowerDevice.cpp +++ b/tactical-microgrid-standard/power_devices/PowerDevice.cpp @@ -178,16 +178,20 @@ void PowerDevice::connected_devices(const powersim::ConnectedDeviceSeq& devices) if (!connected_devices_out_.empty()) { ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: PowerDevice::connected_devices: Source \"%C\" already connects to \"%C\". Replace with \"%C\"\n", get_device_id().c_str(), connected_devices_out_[0].id().c_str(), devices[i].id().c_str())); + connected_devices_out_[0] = devices[i]; + } else { + connected_devices_out_.push_back(devices[i]); } - connected_devices_out_[0] = devices[i]; break; case tms::DeviceRole::ROLE_LOAD: // Load device has a single in port. if (!connected_devices_in_.empty()) { ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: PowerDevice::connected_devices: Load \"%C\" already connects to \"%C\". Replace with \"%C\"\n", get_device_id().c_str(), connected_devices_in_[0].id().c_str(), devices[i].id().c_str())); + connected_devices_in_[0] = devices[i]; + } else { + connected_devices_in_.push_back(devices[i]); } - connected_devices_in_[0] = devices[i]; break; case tms::DeviceRole::ROLE_DISTRIBUTION: { From 0aec3af9320c4bc15866ee59c0cd891ddcc87e5d Mon Sep 17 00:00:00 2001 From: Son Dinh Date: Tue, 8 Jul 2025 09:47:05 -0500 Subject: [PATCH 13/13] Update tactical-microgrid-standard/common/TimerHandler.h Co-authored-by: Adam Mitz --- tactical-microgrid-standard/common/TimerHandler.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tactical-microgrid-standard/common/TimerHandler.h b/tactical-microgrid-standard/common/TimerHandler.h index ead909b..b3117ab 100644 --- a/tactical-microgrid-standard/common/TimerHandler.h +++ b/tactical-microgrid-standard/common/TimerHandler.h @@ -209,7 +209,7 @@ class TimerHandler : public ACE_Event_Handler, protected TimerHolder auto timer_id = *reinterpret_cast(arg); if (active_timers_.count(timer_id) == 0) { ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: TimerHandler::handle_timeout: timer id %q does NOT exist\n", - timer_id)); + static_cast(timer_id))); } auto timer = active_timers_[timer_id]; any_timer_fired(timer);