Skip to content

Commit 986da28

Browse files
committed
add topic asker
1 parent 4591e79 commit 986da28

File tree

3 files changed

+29
-3
lines changed

3 files changed

+29
-3
lines changed

infinite_sense_core/include/messenger.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@ class Messenger {
1919
std::string GetPubEndpoint() const;
2020
void Pub(const std::string& topic, const std::string& metadata);
2121
void PubStruct(const std::string& topic, const void* data, size_t size);
22-
22+
[[noreturn]] void WaitAsk();
2323
private:
2424
Messenger();
2525
~Messenger();
2626
zmq::context_t context_{};
2727
zmq::socket_t publisher_{};
2828
std::string endpoint_{};
29+
private:
30+
zmq::socket_t asker_{};
31+
std::thread ask_thread_;
2932
};
3033
} // namespace infinite_sense

infinite_sense_core/src/messenger.cpp

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,22 @@ Messenger::Messenger() {
1111
try {
1212
publisher_.bind("tcp://*:0");
1313
endpoint_ = publisher_.get(zmq::sockopt::last_endpoint);
14-
LOG(INFO) << "ZMQ PUB: " << endpoint_;
14+
LOG(INFO) << "Publisher: " << endpoint_;
1515
} catch (const zmq::error_t &e) {
1616
LOG(ERROR) << "Failed to bind ZMQ publisher: " << e.what();
1717
context_.close();
1818
publisher_.close();
1919
}
20+
asker_ = zmq::socket_t(context_, zmq::socket_type::rep);
21+
asker_.bind("tcp://*:4565");
22+
ask_thread_ = std::thread([this] { WaitAsk(); });
23+
}
24+
Messenger::~Messenger() {
25+
publisher_.close();
26+
while (ask_thread_.joinable()) {
27+
ask_thread_.join();
28+
}
2029
}
21-
Messenger::~Messenger() { publisher_.close(); }
2230
void Messenger::Pub(const std::string &topic, const std::string &metadata) {
2331
try {
2432
zmq::message_t topic_msg(topic.size());
@@ -47,4 +55,18 @@ void Messenger::PubStruct(const std::string &topic, const void *data, const size
4755
}
4856
std::string Messenger::GetPubEndpoint() const { return endpoint_; }
4957

58+
[[noreturn]] void Messenger::WaitAsk() {
59+
while (true) {
60+
zmq::message_t request;
61+
asker_.recv(request, zmq::recv_flags::none);
62+
std::string received_msg(static_cast<char *>(request.data()), request.size());
63+
if (received_msg == "ask_endpoint") {
64+
std::string reply_msg = GetPubEndpoint();
65+
zmq::message_t reply(reply_msg.size());
66+
memcpy(reply.data(), reply_msg.data(), reply_msg.size());
67+
asker_.send(reply, zmq::send_flags::none);
68+
}
69+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
70+
}
71+
}
5072
} // namespace infinite_sense

tools/topic_monitor/monitor.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ void TopicMonitor::Stop() {
2222
}
2323
}
2424
TopicMonitor::TopicMonitor() : context_(1), subscriber_(context_, ZMQ_SUB), should_run_(false) {
25+
2526
try {
2627
subscriber_.connect(infinite_sense::Messenger::GetInstance().GetPubEndpoint());
2728
subscriber_.set(zmq::sockopt::subscribe, "");

0 commit comments

Comments
 (0)