Skip to content

Commit 4591e79

Browse files
committed
add topic monitor
1 parent da8a079 commit 4591e79

File tree

11 files changed

+154
-123
lines changed

11 files changed

+154
-123
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ if (NOT CMAKE_BUILD_TYPE)
1111
endif ()
1212

1313
add_subdirectory(infinite_sense_core)
14+
add_subdirectory(tools)
1415
add_subdirectory(example)

example/GigeCam/main.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ int main() {
1515
synchronizer.Start();
1616

1717
// 4.接收数据
18-
Synchronizer::PrintSummary();
1918
zmq::context_t context(1);
2019
zmq::socket_t subscriber(context, zmq::socket_type::sub);
2120
subscriber.connect("tcp://localhost:5555");

infinite_sense_core/include/infinite_sense.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,6 @@ class Synchronizer {
7575
*/
7676
static bool GetLastTriggerTime(TriggerDevice dev, uint64_t time);
7777

78-
/**
79-
* @brief 打印当前系统的配置与状态摘要信息。
80-
*/
81-
static void PrintSummary();
82-
8378
private:
8479
/// 网络地址
8580
std::string net_ip_;

infinite_sense_core/include/messenger.h

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -27,52 +27,4 @@ class Messenger {
2727
zmq::socket_t publisher_{};
2828
std::string endpoint_{};
2929
};
30-
31-
class TopicMonitor {
32-
public:
33-
static TopicMonitor& GetInstance() {
34-
static TopicMonitor instance;
35-
return instance;
36-
}
37-
TopicMonitor(const TopicMonitor&) = delete;
38-
TopicMonitor(TopicMonitor&&) = delete;
39-
TopicMonitor& operator=(const TopicMonitor&) = delete;
40-
TopicMonitor& operator=(TopicMonitor&&) = delete;
41-
std::unordered_set<std::string> GetTopics() const;
42-
friend std::ostream& operator<<(std::ostream& os, const TopicMonitor& monitor) {
43-
std::lock_guard lock(monitor.topics_mutex_);
44-
os << "\n--- Topic Monitor ---\n";
45-
if (monitor.topic_frequencies_.empty()) {
46-
os << " No active topics\n";
47-
} else {
48-
os << " Active Topics (" << monitor.topic_frequencies_.size() << "):\n";
49-
std::vector<std::pair<std::string, size_t>> sorted_topics(monitor.topic_frequencies_.begin(),
50-
monitor.topic_frequencies_.end());
51-
std::sort(sorted_topics.begin(), sorted_topics.end(),
52-
[](const auto& a, const auto& b) { return b.second < a.second; });
53-
54-
for (const auto& [topic, count] : sorted_topics) {
55-
os << "" << topic << " (num: " << count << ")\n";
56-
}
57-
}
58-
return os << "---------------------\n";
59-
}
60-
// 启动监控线程
61-
void Start();
62-
// 停止监控
63-
void Stop();
64-
65-
private:
66-
TopicMonitor();
67-
~TopicMonitor();
68-
// 监控线程主循环
69-
void MonitorLoop();
70-
std::unordered_map<std::string, size_t> topic_frequencies_; // 新增:topic频率统计
71-
zmq::context_t context_;
72-
zmq::socket_t subscriber_;
73-
std::unordered_set<std::string> topics_;
74-
mutable std::mutex topics_mutex_;
75-
std::thread monitor_thread_;
76-
std::atomic<bool> should_run_;
77-
};
7830
} // namespace infinite_sense

infinite_sense_core/src/infinite_sense.cpp

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,4 @@ void Synchronizer::Stop() const {
5757
LOG(INFO) << "Synchronizer stopped";
5858
}
5959

60-
void Synchronizer::PrintSummary() {
61-
TopicMonitor::GetInstance().Start();
62-
std::this_thread::sleep_for(std::chrono::milliseconds{1001});
63-
TopicMonitor::GetInstance().Stop();
64-
LOG(INFO) << TopicMonitor::GetInstance();
65-
}
6660
} // namespace infinite_sense

infinite_sense_core/src/messenger.cpp

Lines changed: 0 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -47,67 +47,4 @@ void Messenger::PubStruct(const std::string &topic, const void *data, const size
4747
}
4848
std::string Messenger::GetPubEndpoint() const { return endpoint_; }
4949

50-
51-
std::unordered_set<std::string> TopicMonitor::GetTopics() const {
52-
std::lock_guard lock(topics_mutex_);
53-
return topics_;
54-
}
55-
void TopicMonitor::Start() {
56-
if (monitor_thread_.joinable()) {
57-
return;
58-
}
59-
should_run_.store(true);
60-
monitor_thread_ = std::thread(&TopicMonitor::MonitorLoop, this);
61-
}
62-
63-
void TopicMonitor::Stop() {
64-
if (!should_run_.load()) {
65-
return;
66-
}
67-
should_run_.store(false);
68-
if (monitor_thread_.joinable()) {
69-
monitor_thread_.join();
70-
}
71-
}
72-
TopicMonitor::TopicMonitor() : context_(1), subscriber_(context_, ZMQ_SUB), should_run_(false) {
73-
try {
74-
subscriber_.connect(Messenger::GetInstance().GetPubEndpoint());
75-
subscriber_.set(zmq::sockopt::subscribe, "");
76-
} catch (const zmq::error_t &e) {
77-
LOG(ERROR) << "[TopicMonitor] Initialization failed: " << e.what();
78-
throw;
79-
}
80-
}
81-
TopicMonitor::~TopicMonitor() {
82-
Stop();
83-
try {
84-
subscriber_.close();
85-
context_.close();
86-
} catch (const zmq::error_t &e) {
87-
LOG(ERROR) << "[TopicMonitor] Cleanup error: " << e.what();
88-
}
89-
}
90-
void TopicMonitor::MonitorLoop() {
91-
zmq::message_t msg;
92-
93-
while (should_run_.load()) {
94-
try {
95-
if (subscriber_.recv(msg, zmq::recv_flags::dontwait)) {
96-
{
97-
std::string topic(static_cast<char *>(msg.data()), msg.size());
98-
std::lock_guard lock(topics_mutex_);
99-
topic_frequencies_[topic]++;
100-
}
101-
if (subscriber_.get(zmq::sockopt::rcvmore)) {
102-
zmq::message_t dummy;
103-
subscriber_.recv(dummy);
104-
}
105-
}
106-
} catch (const zmq::error_t &e) {
107-
if (e.num() != ETERM) {
108-
}
109-
}
110-
}
111-
}
112-
11350
} // namespace infinite_sense

tools/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
cmake_minimum_required(VERSION 3.10)
2+
project(example)
3+
4+
add_subdirectory(topic_monitor)

tools/topic_monitor/CMakeLists.txt

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
cmake_minimum_required(VERSION 3.16)
2+
3+
project(monitor VERSION 1.0)
4+
5+
set(CMAKE_CXX_STANDARD 17)
6+
if (NOT CMAKE_BUILD_TYPE)
7+
set(CMAKE_BUILD_TYPE Release)
8+
set(CMAKE_CXX_FLAGS "${CMAKE_C_FLAGS} -O3 -Wall")
9+
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O3 -Wall")
10+
endif ()
11+
12+
add_executable(${PROJECT_NAME}
13+
main.cpp
14+
monitor.cpp
15+
monitor.h
16+
)
17+
target_link_libraries(${PROJECT_NAME} PRIVATE
18+
infinite_sense_core
19+
)

tools/topic_monitor/main.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#include "infinite_sense.h"
2+
#include "monitor.h"
3+
using namespace infinite_sense;
4+
int main() {
5+
TopicMonitor::GetInstance().Start();
6+
std::this_thread::sleep_for(std::chrono::milliseconds{1001});
7+
TopicMonitor::GetInstance().Stop();
8+
LOG(INFO) << TopicMonitor::GetInstance();
9+
return 0;
10+
}

tools/topic_monitor/monitor.cpp

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#include "monitor.h"
2+
#include "infinite_sense.h"
3+
std::unordered_set<std::string> TopicMonitor::GetTopics() const {
4+
std::lock_guard lock(topics_mutex_);
5+
return topics_;
6+
}
7+
void TopicMonitor::Start() {
8+
if (monitor_thread_.joinable()) {
9+
return;
10+
}
11+
should_run_.store(true);
12+
monitor_thread_ = std::thread(&TopicMonitor::MonitorLoop, this);
13+
}
14+
15+
void TopicMonitor::Stop() {
16+
if (!should_run_.load()) {
17+
return;
18+
}
19+
should_run_.store(false);
20+
if (monitor_thread_.joinable()) {
21+
monitor_thread_.join();
22+
}
23+
}
24+
TopicMonitor::TopicMonitor() : context_(1), subscriber_(context_, ZMQ_SUB), should_run_(false) {
25+
try {
26+
subscriber_.connect(infinite_sense::Messenger::GetInstance().GetPubEndpoint());
27+
subscriber_.set(zmq::sockopt::subscribe, "");
28+
} catch (const zmq::error_t &e) {
29+
LOG(ERROR) << "[TopicMonitor] Initialization failed: " << e.what();
30+
throw;
31+
}
32+
}
33+
TopicMonitor::~TopicMonitor() {
34+
Stop();
35+
try {
36+
subscriber_.close();
37+
context_.close();
38+
} catch (const zmq::error_t &e) {
39+
LOG(ERROR) << "[TopicMonitor] Cleanup error: " << e.what();
40+
}
41+
}
42+
void TopicMonitor::MonitorLoop() {
43+
zmq::message_t msg;
44+
45+
while (should_run_.load()) {
46+
try {
47+
if (subscriber_.recv(msg, zmq::recv_flags::dontwait)) {
48+
{
49+
std::string topic(static_cast<char *>(msg.data()), msg.size());
50+
std::lock_guard lock(topics_mutex_);
51+
topic_frequencies_[topic]++;
52+
}
53+
if (subscriber_.get(zmq::sockopt::rcvmore)) {
54+
zmq::message_t dummy;
55+
subscriber_.recv(dummy);
56+
}
57+
}
58+
} catch (const zmq::error_t &e) {
59+
if (e.num() != ETERM) {
60+
}
61+
}
62+
}
63+
}

0 commit comments

Comments
 (0)