From 3730202ca48f1c256fedad03fcbcdd3e1837d9a5 Mon Sep 17 00:00:00 2001 From: Marcus O'Flaherty Date: Mon, 24 Nov 2025 10:18:15 +0000 Subject: [PATCH 1/5] add mutex locking to multicast sockets --- src/ServiceDiscovery/ServicesBackend.cpp | 16 ++++++++++++++-- src/ServiceDiscovery/ServicesBackend.h | 5 ++++- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/src/ServiceDiscovery/ServicesBackend.cpp b/src/ServiceDiscovery/ServicesBackend.cpp index d07c2b3..3e86878 100644 --- a/src/ServiceDiscovery/ServicesBackend.cpp +++ b/src/ServiceDiscovery/ServicesBackend.cpp @@ -386,8 +386,18 @@ bool ServicesBackend::SendMulticast(MulticastType type, std::string command, std // only immediately evident errors are reported. receipt is not confirmed. if(verbosity>10) std::cout<<"ServicesBackend::SendMulticast invoked with command '"<lock(); int cnt = sendto(multicast_socket, command.c_str(), command.length()+1, 0, (struct sockaddr*)multicast_addr, multicast_addrlen); + socket_mtx->unlock(); if(cnt < 0){ std::string errmsg = "Error sending multicast message: "+std::string{strerror(errno)}; Log(errmsg,v_error,verbosity); diff --git a/src/ServiceDiscovery/ServicesBackend.h b/src/ServiceDiscovery/ServicesBackend.h index 814df73..00726f8 100644 --- a/src/ServiceDiscovery/ServicesBackend.h +++ b/src/ServiceDiscovery/ServicesBackend.h @@ -92,6 +92,9 @@ class ServicesBackend { // multicast socket file descriptors int log_socket=-1; int mon_socket=-1; + // mutexes to lock them + std::mutex log_socket_mtx; + std::mutex mon_socket_mtx; // multicast destination address structure struct sockaddr_in log_addr; struct sockaddr_in mon_addr; @@ -103,7 +106,7 @@ class ServicesBackend { std::map> waiting_recipients; void Log(std::string msg, int msg_verb, int verbosity); //?? generalise private - bool InitZMQ(); //private + bool InitZMQ(); //private bool InitMulticast(); // private bool RegisterServices(); //private // wrapper funtion; add command to outgoing queue, receive response. ~30s timeout. From 0b5414815d850075b1c4d2f087a7c8fc2dc7023e Mon Sep 17 00:00:00 2001 From: Marcus O'Flaherty Date: Sat, 20 Dec 2025 09:56:39 +0000 Subject: [PATCH 2/5] Services signature updates --- src/ServiceDiscovery/Services.cpp | 695 +++++++++++++---------- src/ServiceDiscovery/Services.h | 33 +- src/ServiceDiscovery/ServicesBackend.cpp | 24 +- src/ServiceDiscovery/ServicesBackend.h | 20 +- 4 files changed, 442 insertions(+), 330 deletions(-) diff --git a/src/ServiceDiscovery/Services.cpp b/src/ServiceDiscovery/Services.cpp index 0589034..2890dc7 100644 --- a/src/ServiceDiscovery/Services.cpp +++ b/src/ServiceDiscovery/Services.cpp @@ -63,7 +63,7 @@ bool Services::Ready(const unsigned int timeout){ // Write Functions // --------------- -bool Services::SendAlarm(const std::string& message, unsigned int level, const std::string& device, unsigned int timestamp, const unsigned int timeout){ +bool Services::SendAlarm(const std::string& message, unsigned int level, const std::string& device, const uint64_t timestamp, const unsigned int timeout){ const std::string& name = (device=="") ? m_name : device; @@ -82,9 +82,10 @@ bool Services::SendAlarm(const std::string& message, unsigned int level, const s if(!ok){ std::clog<<"SendAlarm error: "< { trace }, + layout, + version, + timestamp, + timeout + ); +} + +// ««-------------- ≪ °◇◆◇° ≫ --------------»» + +// multiple traces version +bool Services::SendPlotlyPlot( + const std::string& name, + const std::vector& traces, + const std::string& layout, + int* version, + const uint64_t timestamp, + const unsigned int timeout +) { + std::stringstream ss; + ss << "{\"name\":\"" << name << "\",\"layout\":" << layout; + if (timestamp) ss << ",\"time\":" << timestamp; + ss << ",\"data\":["; + bool first = true; + for (auto& trace : traces) { + if (first) + first = false; + else + ss << ','; + ss << trace; + }; + ss << "]}"; + + std::string response=""; + + std::string err; + if (!m_backend_client.SendCommand("W_PLOTLYPLOT", ss.str(), &response, &timeout, &err)){ + std::clog << "SendPlotlyPlot error: " << err << std::endl; + return false; + }; + + if(response.empty()){ + std::clog<<"SendPlotlyPlot error: empty response"<& responses, const unsigned int timeout){ + + responses.clear(); + + //const std::string& db = (database=="") ? m_dbname : database; + + std::string err=""; + + if(!m_backend_client.SendCommand("W_QUERY", query, &responses, &timeout, &err)){ + std::clog<<"SQLQuery error: "< responses; + + bool ok = SQLQuery(/*db,*/ query, responses, timeout); + + if(responses.size()!=0){ + response = responses.front(); + if(responses.size()>1){ + std::clog<<"Warning: SQLQuery returned multiple rows, only first returned"<"}' - strip out contents - if(json_data.length()==0){ + if(json_data.empty()){ // if we got an empty response but the command succeeded, // the query worked but matched no records - run config not found err = "GetDeviceConfig error: config for device "+name+" version "+std::to_string(version)+" not found"; @@ -260,16 +486,15 @@ bool Services::GetDeviceConfig(std::string& json_data, const int version, const json_data = err; return false; } + + // response format '{"data":""}' - strip out contents Store tmp; tmp.JsonParser(json_data); - int tmp_version; - bool ok = tmp.Get("version",tmp_version); - if(ok){ - //version = tmp_version; // cannot pass back... without a more complex signature - ok = tmp.Get("data", json_data); - } + bool ok = tmp.Get("data", json_data); if(!ok){ - std::clog<<"GetDeviceConfig error: invalid response: '"<}' - strip out contents Store tmp; tmp.JsonParser(json_data); bool ok = tmp.Get("data", json_data); if(!ok){ - std::clog<<"GetRunConfig error: invalid response: '"<"}' - strip out contents - if(json_data.length()==0){ + if(json_data.empty()){ // if we got an empty response but the command succeeded, // the query worked but matched no records - run config not found - std::clog<<"GetRunConfig error: config "<"}' - strip out contents Store tmp; tmp.JsonParser(json_data); - int tmp_version; - bool ok = tmp.Get("version",tmp_version); - if(ok){ - //version = tmp_version; // cannot pass back - ok = tmp.Get("data", json_data); - } + bool ok = tmp.Get("data", json_data); if(!ok){ err="GetRunConfig error: invalid response: '"+json_data+"'"; std::clog<>'"+name+"' FROM run_config WHERE config_id="+std::to_string(runconfig_id)+")"; + + std::string err=""; - if(!get_ok){ - // redundant as GetRunConfig prints the error - but is this still useful as calling context? - // TODO we should be using exceptions, of course - //std::clog<<"GetRunDeviceConfig error getting run config id "<"}' - strip out contents + Store tmp; + tmp.JsonParser(json_data); + int tmp_version; + bool ok = tmp.Get("version",tmp_version); + if(ok && version){ + *version = tmp_version; + } + ok = ok && tmp.Get("data", json_data); + if(!ok){ + err="GetRunDeviceConfig error: invalid response: '"+json_data+"'"; + std::clog<>'"+name+"' FROM run_config WHERE name='"+runconfig_name+"' AND version="+std::to_string(runconfig_version)+")"; - if(!get_ok){ - // redundant as GetRunConfig prints the error - but is this still useful as calling context? - // TODO we should be using exceptions, of course - //std::clog<<"GetRunDeviceConfig error getting run config '"<"}' - strip out contents + Store tmp; + tmp.JsonParser(json_data); + int tmp_version; + bool ok = tmp.Get("version",tmp_version); + if(ok && version){ + *version = tmp_version; + } + ok = ok && tmp.Get("data", json_data); + if(!ok){ + err="GetRunDeviceConfig error: invalid response: '"+json_data+"'"; + std::clog<", "timestamp":, "version": , "data":""}' - size_t pos1=0, pos2=0; - std::string key; - std::map vals; - while(true){ - pos1=response.find('"',pos2); - if(pos1==std::string::npos) break; - pos2=response.find('"',++pos1); - if(pos2==std::string::npos) break; - std::string str = response.substr(pos1,(pos2-pos1)); - ++pos2; - if(key.empty()){ key = str; } - else if(key=="data"){ break; } - else { vals[key] = str; key=""; } - } - vals[key] = response.substr(pos1,response.find_last_of('"')-pos1); + Store plot; + plot.JsonParser(response); - try{ - draw_options = vals["draw_options"]; - if(timestamp) *timestamp = vals["timestamp"]; - version = std::stoi(vals["version"]); - json_data = vals["data"]; - } catch(...){ - std::clog<<"GetROOTplot error: failed to parse response '"<& responses, const unsigned int timeout){ - - responses.clear(); - - const std::string& db = (database=="") ? m_dbname : database; +// Get a device configuration from a *run* configuration ID + +bool Services::GetPlotlyPlot( + const std::string& name, + int& version, + std::string& trace, + std::string& layout, + std::string* timestamp, + unsigned int timeout +) { - const std::string command = "{ \"database\":\""+db+"\"" - + ", \"query\":\""+ query+"\" }"; + std::string cmd_string; + if(version<0){ + cmd_string = "SELECT row_to_json(version, time, data, layout), FROM plotlyplots WHERE name='"+name+"' AND version="+std::to_string(version); + } else { + // https://stackoverflow.com/questions/tagged/greatest-n-per-group for faster + cmd_string = "SELECT row_to_json(version, time, data, layout) FROM plotlyplots WHERE name='"+name+"' ORDER BY version DESC LIMIT 1"; + } - std::string err=""; + std::string err; + std::string response; + if (!m_backend_client.SendCommand("R_PLOTLYPLOT", cmd_string, &response, &timeout, &err)){ + std::clog << "GetPlotlyPlot error: " << err << std::endl; + json_data = err; + return false; + }; - if(!m_backend_client.SendCommand("R_QUERY", command, &responses, &timeout, &err)){ - std::clog<<"SQLQuery error: "< responses; + trace = plot.Get("data"); + layout = plot.Get("layout"); + version = plot.Get("version"); - bool ok = SQLQuery(db, query, responses, timeout); + bool ok = plot.Get("data", trace); + ok = ok && plot.Get("layout", layout); + ok = ok && plot.Get("version", version); + if(timestamp && ok) ok = ok && plot.Get("time", *timestamp); - if(responses.size()!=0){ - response = responses.front(); - if(responses.size()>1){ - std::clog<<"Warning: SQLQuery returned multiple rows, only first returned"<12){ - // FIXME change to Store parsing so we can check this is the right key - response.replace(0,response.find_first_of(':')+1,""); - response.replace(response.find_last_of('}'),std::string::npos,""); - try { - if(version) *version = std::stoi(response); - } catch (...){ - std::clog<<"SendROOTplot error: invalid response '"<= 0) cmd.Set("version", version); - - std::string cmd_string; - cmd >> cmd_string; - - std::string err; - std::string response; - if (!m_backend_client.SendCommand( - "R_PLOTLYPLOT", cmd_string, &response, &timeout, &err - )) - { - std::clog << "GetPlotlyPlot error: " << err << std::endl; - return false; - }; - - Store plot; - plot.JsonParser(response); - - trace = plot.Get("trace"); - layout = plot.Get("layout"); - version = plot.Get("version"); - if (timestamp) *timestamp = plot.Get("time"); - - return true; -} - -bool Services::SendPlotlyPlot( - const std::string& name, - const std::string& trace, - const std::string& layout, - int* version, - unsigned int timestamp, - unsigned int timeout -) { - return SendPlotlyPlot( - name, - std::vector { trace }, - layout, - version, - timestamp, - timeout - ); -} - -bool Services::SendPlotlyPlot( - const std::string& name, - const std::vector& traces, - const std::string& layout, - int* version, - unsigned int timestamp, - unsigned int timeout -) { - std::stringstream ss; - ss - << "{\"name\":\"" << name - << "\",\"layout\":" << layout; - if (version) ss << ",\"version\":" << *version; - if (timestamp) ss << ",\"timestamp\":" << timestamp; - ss << ",\"traces\":["; - bool first = true; - for (auto& trace : traces) { - if (first) - first = false; - else - ss << ','; - ss << trace; - }; - ss << "]}"; - - std::string err; - if (!m_backend_client.SendCommand( - "W_PLOTLYPLOT", ss.str(), static_cast(nullptr), &timeout, &err - )) - { - std::clog << "SendPlotlyPlot error: " << err << std::endl; - return false; - }; - - return true; -} - // =========================================================================== // Other functions // --------------- diff --git a/src/ServiceDiscovery/Services.h b/src/ServiceDiscovery/Services.h index 5554f83..d1e3d67 100644 --- a/src/ServiceDiscovery/Services.h +++ b/src/ServiceDiscovery/Services.h @@ -30,29 +30,28 @@ namespace ToolFramework { bool Init(Store &m_variables, zmq::context_t* context_in, SlowControlCollection* sc_vars_in, bool new_service=false); bool Ready(const unsigned int timeout=10000); // default service discovery broadcast period is 5s, middleman also checks intermittently, compound total time should be <10s... - bool SQLQuery(const std::string& database, const std::string& query, std::vector& responses, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SQLQuery(const std::string& database, const std::string& query, std::string& response, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SQLQuery(const std::string& database, const std::string& query, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SQLQuery(/*const std::string& database,*/ const std::string& query, std::vector& responses, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SQLQuery(/*const std::string& database,*/ const std::string& query, std::string& response, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SQLQuery(/*const std::string& database,*/ const std::string& query, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SendLog(const std::string& message, unsigned int severity=2, const std::string& device="", const unsigned int timestamp=0); - bool SendAlarm(const std::string& message, unsigned int level=0, const std::string& device="", const unsigned int timestamp=0, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SendMonitoringData(const std::string& json_data, const std::string& subject, const std::string& device="", unsigned int timestamp=0); - bool SendCalibrationData(const std::string& json_data, const std::string& description, const std::string& device="", unsigned int timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SendLog(const std::string& message, unsigned int severity=2, const std::string& device="", const uint64_t timestamp=0); + bool SendAlarm(const std::string& message, unsigned int level=0, const std::string& device="", const uint64_t timestamp=0, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SendMonitoringData(const std::string& json_data, const std::string& subject, const std::string& device="", uint64_t timestamp=0); + bool SendCalibrationData(const std::string& json_data, const std::string& description, const std::string& device="", uint64_t timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetCalibrationData(std::string& json_data, int version=-1, const std::string& device="", const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SendDeviceConfig(const std::string& json_data, const std::string& author, const std::string& description, const std::string& device="", unsigned int timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SendRunConfig(const std::string& json_data, const std::string& name, const std::string& author, const std::string& description, unsigned int timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SendDeviceConfig(const std::string& json_data, const std::string& author, const std::string& description, const std::string& device="", uint64_t timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SendRunConfig(const std::string& json_data, const std::string& name, const std::string& author, const std::string& description, uint64_t timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetDeviceConfig(std::string& json_data, const int version=-1, const std::string& device="", const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetRunConfig(std::string& json_data, const int config_id, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetRunConfig(std::string& json_data, const std::string& name, const int version=-1, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool GetRunDeviceConfig(std::string& json_data, const int runconfig_id, const std::string& device="", int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool GetRunDeviceConfig(std::string& json_data, const std::string& runconfig_name, const int runconfig_version=-1, const std::string& device="", int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SendROOTplot(const std::string& plot_name, const std::string& draw_options, const std::string& json_data, bool persistent=false, int* version=nullptr, const unsigned int timestamp=0, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SendTemporaryROOTplot(const std::string& plot_name, const std::string& draw_options, const std::string& json_data, int* version=nullptr, const unsigned int timestamp=0); - bool SendPersistentROOTplot(const std::string& plot_name, const std::string& draw_options, const std::string& json_data, int* version=nullptr, const unsigned int timestamp=0, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool GetRunDeviceConfig(std::string& json_data, const int runconfig_id, const std::string& device="", /*int* version=nullptr,*/ const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool GetRunDeviceConfig(std::string& json_data, const std::string& runconfig_name, const int runconfig_version=-1, const std::string& device="", /*int* version=nullptr, */ const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SendROOTplotZmq(const std::string& plot_name, const std::string& draw_options, const std::string& json_data, const unsigned int keep_until=0, int* version=nullptr, const uint64_t timestamp=0, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SendROOTplotMulticast(const std::string& plot_name, const std::string& draw_options, const std::string& json_data, const uint64_t timestamp=SERVICES_DEFAULT_TIMEOUT); bool GetROOTplot(const std::string& plot_name, int& version, std::string& draw_option, std::string& json_data, std::string* timestamp=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SendPlotlyPlot(const std::string& name, const std::string& json_trace, const std::string& json_layout="{}", int* version=nullptr, unsigned int timestamp=0, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SendPlotlyPlot(const std::string& name, const std::vector& json_traces, const std::string& json_layout="{}", int* version=nullptr, unsigned int timestamp=0, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool GetPlotlyPlot(const std::string& name, int& version, std::string& json_trace, std::string& json_layout, unsigned int* timestamp=nullptr, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SendPlotlyPlot(const std::string& name, const std::string& json_trace, const std::string& json_layout="{}", int* version=nullptr, uint64_t timestamp=0, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SendPlotlyPlot(const std::string& name, const std::vector& json_traces, const std::string& json_layout="{}", int* version=nullptr, uint64_t timestamp=0, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool GetPlotlyPlot(const std::string& name, int& version, std::string& json_trace, std::string& json_layout, std::string* timestamp=nullptr, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); SlowControlCollection* GetSlowControlCollection(); SlowControlElement* GetSlowControlVariable(std::string key); diff --git a/src/ServiceDiscovery/ServicesBackend.cpp b/src/ServiceDiscovery/ServicesBackend.cpp index 3e86878..c089eea 100644 --- a/src/ServiceDiscovery/ServicesBackend.cpp +++ b/src/ServiceDiscovery/ServicesBackend.cpp @@ -2,9 +2,9 @@ using namespace ToolFramework; -Command::Command(std::string command_in, char type_in, std::string topic_in, const unsigned int timeout_ms_in){ +Command::Command(std::string command_in, char type_in, std::string topic_in, const uint32_t timeout_ms_in){ command = command_in; - type = type_in; + type = type_in; // TODO type is unnecessary, could just use topic[0] topic=topic_in; success=0; response=std::vector{}; @@ -421,12 +421,12 @@ bool ServicesBackend::SendMulticast(MulticastType type, std::string command, std return true; } -bool ServicesBackend::SendCommand(const std::string& topic, const std::string& command, std::vector* results, const unsigned int* timeout_ms, std::string* err){ +bool ServicesBackend::SendCommand(const std::string& topic, const std::string& command, std::vector* results, const uint32_t* timeout_ms, std::string* err){ // send a command and receive response. // This is a wrapper that ensures we always return within the requested timeout. if(verbosity>10) std::cout<<"ServicesBackend::SendCommand invoked with command '"< resultsvec; @@ -492,7 +492,7 @@ bool ServicesBackend::SendCommand(const std::string& topic, const std::string& c return ret; } -bool ServicesBackend::DoCommand(Command& cmd, int timeout_ms){ +bool ServicesBackend::DoCommand(Command& cmd, uint32_t timeout_ms){ if(verbosity>10) std::cout<<"ServicesBackend::DoCommand received command"<(send_end - send_start).count(); - timeout_ms -= send_time_ms; - // juuuust in case, ensure our remaining time is not negative. :) - if(timeout_ms<0) timedout=true; + auto send_time_ms = std::chrono::duration_cast(send_end - send_start).count(); + // juuuust in case, make sure we actually still have time left to wait for the response + if(send_time_ms > timeout_ms) timedout=true; // shouldn't be possible really + else timeout_ms -= send_time_ms; } // did we get a response in time? @@ -898,7 +898,7 @@ bool ServicesBackend::Send(zmq::socket_t* sock, bool more, std::vector& outputs){ +int ServicesBackend::PollAndReceive(zmq::socket_t* sock, zmq::pollitem_t poll, uint32_t timeout, std::vector& outputs){ // poll the input socket for messages try { diff --git a/src/ServiceDiscovery/ServicesBackend.h b/src/ServiceDiscovery/ServicesBackend.h index 00726f8..cceed33 100644 --- a/src/ServiceDiscovery/ServicesBackend.h +++ b/src/ServiceDiscovery/ServicesBackend.h @@ -27,7 +27,7 @@ namespace ToolFramework { struct Command { - Command(std::string command_in, char cmd_type_in, std::string topic_in, const unsigned int timeout_ms_in); + Command(std::string command_in, char cmd_type_in, std::string topic_in, const uint32_t timeout_ms_in); Command(const Command& cmd_in); // copy constructor Command(Command&& cmd_in); // move constructor @@ -60,8 +60,8 @@ class ServicesBackend { bool Finalise(); // interfaces called by clients. These return within timeout. - bool SendCommand(const std::string& topic, const std::string& command, std::vector* results=nullptr, const unsigned int* timeout_ms=nullptr, std::string* err=nullptr); - bool SendCommand(const std::string& topic, const std::string& command, std::string* results=nullptr, const unsigned int* timeout_ms=nullptr, std::string* err=nullptr); + bool SendCommand(const std::string& topic, const std::string& command, std::vector* results=nullptr, const uint32_t* timeout_ms=nullptr, std::string* err=nullptr); + bool SendCommand(const std::string& topic, const std::string& command, std::string* results=nullptr, const uint32_t* timeout_ms=nullptr, std::string* err=nullptr); // multicasts bool SendMulticast(MulticastType type, std::string command, std::string* err=nullptr); @@ -110,7 +110,7 @@ class ServicesBackend { bool InitMulticast(); // private bool RegisterServices(); //private // wrapper funtion; add command to outgoing queue, receive response. ~30s timeout. - bool DoCommand(Command& cmd, int timeout_ms); //private + bool DoCommand(Command& cmd, uint32_t timeout_ms); //private // actual send/receive functions bool SendNextCommand(); //private bool GetNextResponse(); //priavte @@ -121,9 +121,9 @@ class ServicesBackend { // TODO add retrying int max_retries; - int inpoll_timeout; - int outpoll_timeout; - int command_timeout; + uint32_t inpoll_timeout; + uint32_t outpoll_timeout; + uint32_t command_timeout; // TODO add stats reporting boost::posix_time::time_duration resend_period; // time between resends if not acknowledged @@ -158,7 +158,7 @@ class ServicesBackend { // zmq helper functions // TODO move to separate class as these are shared by middleman - int PollAndReceive(zmq::socket_t* sock, zmq::pollitem_t poll, int timeout, std::vector& outputs); + int PollAndReceive(zmq::socket_t* sock, zmq::pollitem_t poll, uint32_t timeout, std::vector& outputs); bool Receive(zmq::socket_t* sock, std::vector& outputs); // base cases; send single (final) message part @@ -201,7 +201,7 @@ class ServicesBackend { // wrapper to do polling if required // version if one part template - int PollAndSend(zmq::socket_t* sock, zmq::pollitem_t poll, int timeout, T&& message){ + int PollAndSend(zmq::socket_t* sock, zmq::pollitem_t poll, uint32_t timeout, T&& message){ if(verbosity>10) std::cout<<__PRETTY_FUNCTION__<<" called"< - int PollAndSend(zmq::socket_t* sock, zmq::pollitem_t poll, int timeout, T&& message, Rest&&... rest){ + int PollAndSend(zmq::socket_t* sock, zmq::pollitem_t poll, uint32_t timeout, T&& message, Rest&&... rest){ if(verbosity>10) std::cout<<__PRETTY_FUNCTION__<<" called"< Date: Mon, 29 Dec 2025 15:17:07 +0000 Subject: [PATCH 3/5] fix DAQLogging to bind logging multicast listener to the specific group, rather than INADDR_ANY, as joining the group is not restricted to the socket (but binding is). Also add linger settings to zmq sockets --- src/DAQLogging/DAQLogging.cpp | 1 - src/ServiceDiscovery/ServiceDiscovery.cpp | 1017 +++++++++++---------- 2 files changed, 511 insertions(+), 507 deletions(-) diff --git a/src/DAQLogging/DAQLogging.cpp b/src/DAQLogging/DAQLogging.cpp index 4042443..35f6807 100644 --- a/src/DAQLogging/DAQLogging.cpp +++ b/src/DAQLogging/DAQLogging.cpp @@ -342,7 +342,6 @@ src/DAQLogging/DAQLogging.{h,cpp} -nw bzero((char *)&addr, sizeof(addr)); addr.sin_family = AF_INET; - addr.sin_addr.s_addr = htonl(INADDR_ANY); addr.sin_port = htons(log_port); addrlen = sizeof(addr); diff --git a/src/ServiceDiscovery/ServiceDiscovery.cpp b/src/ServiceDiscovery/ServiceDiscovery.cpp index 607a537..c51e288 100644 --- a/src/ServiceDiscovery/ServiceDiscovery.cpp +++ b/src/ServiceDiscovery/ServiceDiscovery.cpp @@ -4,25 +4,25 @@ using namespace ToolFramework; ServiceDiscovery::ServiceDiscovery(bool Send, bool Receive, int remoteport, std::string address, int multicastport, zmq::context_t * incontext, boost::uuids::uuid UUID, std::string service, int pubsec, int kicksec){ - + std::vector in_address; std::vector in_multicastport; - + in_address.push_back(address); in_multicastport.push_back(multicastport); - + Init(Send, Receive, remoteport, in_address, in_multicastport, incontext, UUID, service, pubsec, kicksec); - + } ServiceDiscovery::ServiceDiscovery(bool Send, bool Receive, int remoteport, std::vector address, std::vector multicastport, zmq::context_t * incontext, boost::uuids::uuid UUID, std::string service, int pubsec, int kicksec){ Init(Send, Receive, remoteport, address, multicastport, incontext, UUID, service, pubsec, kicksec); - + } void ServiceDiscovery::Init(bool Send, bool Receive, int remoteport, std::vector address, std::vector multicastport, zmq::context_t * incontext, boost::uuids::uuid UUID, std::string service, int pubsec, int kicksec){ - - + + m_UUID=UUID; context=incontext; m_multicastport=multicastport; @@ -31,12 +31,12 @@ void ServiceDiscovery::Init(bool Send, bool Receive, int remoteport, std::vector m_remoteport=remoteport; m_send=Send; m_receive=Receive; - + args= new thread_args(m_UUID, context, m_multicastaddress, m_multicastport, m_service, m_remoteport, pubsec, kicksec); - - if (Receive) pthread_create (&thread[0], NULL, ServiceDiscovery::MulticastListenThread, args); - if (Send) pthread_create (&thread[1], NULL, ServiceDiscovery::MulticastPublishThread, args); + if (Receive) pthread_create (&thread[0], NULL, ServiceDiscovery::MulticastListenThread, args); + + if (Send) pthread_create (&thread[1], NULL, ServiceDiscovery::MulticastPublishThread, args); //sleep(2); @@ -51,15 +51,15 @@ void ServiceDiscovery::Init(bool Send, bool Receive, int remoteport, std::vector } ServiceDiscovery::ServiceDiscovery( std::string address, int multicastport, zmq::context_t * incontext, int kicksec){ - + std::vector in_address; std::vector in_multicastport; - + in_address.push_back(address); in_multicastport.push_back(multicastport); - + Init(in_address, in_multicastport, incontext, kicksec); - + } ServiceDiscovery::ServiceDiscovery( std::vector address, std::vector multicastport, zmq::context_t * incontext, int kicksec){ @@ -76,20 +76,20 @@ void ServiceDiscovery::Init( std::vector address, std::vector m_UUID=boost::uuids::random_generator()(); m_receive=true; m_send=false; - + args= new thread_args(m_UUID, context, m_multicastaddress, m_multicastport, m_service, m_remoteport, 0 , kicksec); - + pthread_create (&thread[0], NULL, ServiceDiscovery::MulticastListenThread, args); - + // sleep(2); - + } void* ServiceDiscovery::MulticastPublishThread(void* arg){ - - + + thread_args* args= static_cast(arg); zmq::context_t * context = args->context; boost::uuids::uuid m_UUID=args->UUID; @@ -97,20 +97,20 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ int m_multicastport=args->multicastport.at(0); std::string m_service=args->service; int m_remoteport=args->remoteport; - + long msg_id=0; - - zmq::socket_t Ireceive (*context, ZMQ_PULL); + + zmq::socket_t Ireceive (*context, ZMQ_PULL); int linger = 0; //Ireceive.setsockopt(ZMQ_IMMEDIATE, 1); - Ireceive.setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); - Ireceive.bind("inproc://ServicePublish"); + Ireceive.setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); + Ireceive.bind("inproc://ServicePublish"); /// multi cast ///// struct sockaddr_in addr; int addrlen, sock, cnt; - // struct ip_mreq mreq; + // struct ip_mreq mreq; // set up socket // @@ -119,8 +119,8 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ l.l_onoff = 0; l.l_linger = 0; setsockopt(sock, SOL_SOCKET, SO_LINGER,(char *) &l, sizeof(l)); - - //fcntl(sock, F_SETFL, O_NONBLOCK); + + //fcntl(sock, F_SETFL, O_NONBLOCK); if (sock < 0) { perror("socket"); printf("Failed to connect to multicast publish socket"); @@ -128,7 +128,6 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ } bzero((char *)&addr, sizeof(addr)); addr.sin_family = AF_INET; - addr.sin_addr.s_addr = htonl(INADDR_ANY); addr.sin_port = htons(m_multicastport); addrlen = sizeof(addr); @@ -137,7 +136,7 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ std::vector PubServices; - Store bb; + Store bb; bb.Set("msg_type", "Service Discovery"); bb.Set("msg_value",m_service); bb.Set("remote_port",m_remoteport); @@ -145,7 +144,7 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ else bb.Set("status_query",false); bb.Set("uuid",boost::uuids::to_string(m_UUID)); PubServices.push_back(bb); - + // Initialize poll set zmq::pollitem_t items [] = { { Ireceive, 0, ZMQ_POLLIN, 0 }, @@ -156,7 +155,7 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ bool running=true; while(running){ - + try{ zmq::poll(&items [0], 2, 1000); } catch(zmq::error_t& err){ @@ -164,7 +163,7 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ if(zmq_errno()==EINTR) continue; throw; } - + if ((items [0].revents & ZMQ_POLLIN) && running) { zmq::message_t commands; @@ -181,253 +180,253 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ if(command=="Quit"){ - //printf("publish quitting \n"); - running=false; + //printf("publish quitting \n"); + running=false; } else if(command=="Add"){ - - Store bb; - bb.Set("msg_type","Service Discovery"); - bb.Set("msg_value",service); - bb.Set("remote_port",port); - bb.Set("status_query",statusquery); - bb.Set("uuid",boost::uuids::to_string(uuid)); - PubServices.push_back(bb); + + Store bb; + bb.Set("msg_type","Service Discovery"); + bb.Set("msg_value",service); + bb.Set("remote_port",port); + bb.Set("status_query",statusquery); + bb.Set("uuid",boost::uuids::to_string(uuid)); + PubServices.push_back(bb); } else if(command=="Delete"){ - std::vector::iterator it; - for (it = PubServices.begin() ; it != PubServices.end(); ++it){ - //std::cout<<"d3.5 "<<*((*it)["msg_value"])<("msg_value")==service)break; - - - } - if (it!=PubServices.end())PubServices.erase(it); - - + std::vector::iterator it; + for (it = PubServices.begin() ; it != PubServices.end(); ++it){ + //std::cout<<"d3.5 "<<*((*it)["msg_value"])<("msg_value")==service)break; + + + } + if (it!=PubServices.end())PubServices.erase(it); + + } else if(command=="PortAdd"){ - if(PubServices.size()) PubServices.at(0).Set(service, port); + if(PubServices.size()) PubServices.at(0).Set(service, port); } else if(command =="PortDelete"){ - if(PubServices.size()) PubServices.at(0).Erase(service); + if(PubServices.size()) PubServices.at(0).Erase(service); } } if ((items [1].revents & ZMQ_POLLOUT) && running){ - + for(unsigned int i=0;i("remote_port"); - // StatusCheck.setsockopt(ZMQ_IMMEDIATE, 1); - StatusCheck.setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); - - StatusCheck.connect(connection.str().c_str()); - - zmq::pollitem_t out[]={{StatusCheck,0,ZMQ_POLLOUT,0}}; - zmq::pollitem_t in[]={{StatusCheck,0,ZMQ_POLLIN,0}}; - - mm.Set("msg_type","Command"); - mm.Set("msg_value","Status"); - - std::string command; - mm>>command; - - // zmq::message_t Esend(256); - //std::string command="Status; - mm.Delete(); - - zmq::message_t Esend(command.length()+1); - snprintf ((char *) Esend.data(), command.length()+1 , "%s" ,command.c_str()) ; - - try{ - zmq::poll(out,1,1000); - } catch(zmq::error_t& err){ - // ignore poll aborting due to signals - if(zmq_errno()==EINTR) continue; - throw; - } - - if(out[0].revents & ZMQ_POLLOUT){ - StatusCheck.send(Esend); - - //std::cout<<"waiting for message "<(Ereceive.data())); - - mm.JsonParser(ss.str()); - } - } - } - /* - std::cout<<"received for publish "< subwriter(subbuffer); - params.Accept(subwriter); - - std::string tmpbufferout=subbuffer.GetString(); - - std::cout<<" substringtest "<name.GetString()<value<name.GetString()=="port") itr->value.SetInt(m_multicastport); - //if(itr->name.GetString()=="status") itr->value.SetString(ss.str().c_str(),strlen(ss.str().c_str())); - } - */ - - - - //rapidjson::Document params=d["params"]; - //params["port"].SetInt(m_multicastport,strlen(); - // rapidjson::Value& params = d["params"]; - // rapidjson::Document::AllocatorType& allocator = d.GetAllocator(); - // params.PushBack(i, allocator); - - // params.PushBack("key1","value1"); + /* + + + rapidjson::Document d; + d.Parse(json); + // d.SetObject(); + //["hello"] = "rapidjson"; + // test.SetString("My JSON Document", d.GetAllocator()); + // d.AddMember("Doc Name", test , d.GetAllocator()); + // d.AddMember("uuid", tmp.str().c_str(), d.GetAllocator()); + // (*newDoc)["Parameters"].SetObject(); + + msg_id++; + std::stringstream tmp; + tmp<("remote_port"); + // StatusCheck.setsockopt(ZMQ_IMMEDIATE, 1); + StatusCheck.setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); + + StatusCheck.connect(connection.str().c_str()); + + zmq::pollitem_t out[]={{StatusCheck,0,ZMQ_POLLOUT,0}}; + zmq::pollitem_t in[]={{StatusCheck,0,ZMQ_POLLIN,0}}; + + mm.Set("msg_type","Command"); + mm.Set("msg_value","Status"); + + std::string command; + mm>>command; + + // zmq::message_t Esend(256); + //std::string command="Status; + mm.Delete(); + + zmq::message_t Esend(command.length()+1); + snprintf ((char *) Esend.data(), command.length()+1 , "%s" ,command.c_str()) ; + + try{ + zmq::poll(out,1,1000); + } catch(zmq::error_t& err){ + // ignore poll aborting due to signals + if(zmq_errno()==EINTR) continue; + throw; + } + + if(out[0].revents & ZMQ_POLLOUT){ + StatusCheck.send(Esend); + + //std::cout<<"waiting for message "<(Ereceive.data())); + + mm.JsonParser(ss.str()); + } + } + } + /* + std::cout<<"received for publish "< subwriter(subbuffer); + params.Accept(subwriter); + + std::string tmpbufferout=subbuffer.GetString(); + + std::cout<<" substringtest "<name.GetString()<value<name.GetString()=="port") itr->value.SetInt(m_multicastport); + //if(itr->name.GetString()=="status") itr->value.SetString(ss.str().c_str(),strlen(ss.str().c_str())); + } + */ + + + + //rapidjson::Document params=d["params"]; + //params["port"].SetInt(m_multicastport,strlen(); + // rapidjson::Value& params = d["params"]; + // rapidjson::Document::AllocatorType& allocator = d.GetAllocator(); + // params.PushBack(i, allocator); + + // params.PushBack("key1","value1"); //params.PushBack("key2","value2"); - /* - std::cout<<" d[UUID] = "< > writer(buffer); - //rapidjson::Writer writer(buffer); - d.Accept(writer); - std::string hhh=buffer.GetString(); - std::cout<< "bufer = "<("msg_value")); - else PubServices.at(i).Set("status","N/A"); - std::string pubmessage; - PubServices.at(i)>>pubmessage; - - //std::stringstream pubmessage; - - //pubmessage<<"{\"uuid\":\""< > writer(buffer); + //rapidjson::Writer writer(buffer); + d.Accept(writer); + std::string hhh=buffer.GetString(); + std::cout<< "bufer = "<("msg_value")); + else PubServices.at(i).Set("status","N/A"); + std::string pubmessage; + PubServices.at(i)>>pubmessage; + + //std::stringstream pubmessage; + + //pubmessage<<"{\"uuid\":\""<pubsec); @@ -440,7 +439,7 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ close(sock); Ireceive.close(); // printf("publish out of runnin \n"); - + pthread_exit(NULL); //return (NULL); @@ -449,7 +448,7 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ void* ServiceDiscovery::MulticastListenThread(void* arg){ - + thread_args* args= static_cast(arg); zmq::context_t * context = args->context; // boost::uuids::uuid m_UUID=args->UUID; @@ -458,9 +457,11 @@ void* ServiceDiscovery::MulticastListenThread(void* arg){ std::string m_service=args->service; zmq::socket_t Ireceive (*context, ZMQ_ROUTER); - Ireceive.bind("inproc://ServiceDiscovery"); + int linger = 0; + Ireceive.setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); + Ireceive.bind("inproc://ServiceDiscovery"); + - /* zmq::message_t config; Ireceive.recv (&config); @@ -471,26 +472,26 @@ void* ServiceDiscovery::MulticastListenThread(void* arg){ Ireceive.send(config); */ - + ///// multi cast ///// std::vector sock; char message[512]; std::vector addr; std::vector addrlen; - + std::vector items; - + items.resize(m_multicastaddress.size()+1); //zmq::pollitem_t items[m_multicastaddress.size()+1]; - + items.at(0).socket = Ireceive; items.at(0).fd = 0; items.at(0).events = ZMQ_POLLIN; items.at(0).revents = 0; - + zmq::pollitem_t out[] = { {Ireceive, 0, ZMQ_POLLOUT, 0} }; @@ -505,34 +506,34 @@ void* ServiceDiscovery::MulticastListenThread(void* arg){ sock.emplace_back(socket(AF_INET, SOCK_DGRAM, 0)); addrlen.emplace_back(); addr.emplace_back(); - + int a =1; setsockopt(sock.at(i), SOL_SOCKET, SO_REUSEADDR, &a, sizeof(int)); - //fcntl(sock, F_SETFL, O_NONBLOCK); + //fcntl(sock, F_SETFL, O_NONBLOCK); if (sock.at(i) < 0) { perror("socket"); exit(1); } bzero((char *)&addr.at(i), sizeof(addr.at(i))); addr.at(i).sin_family = AF_INET; - addr.at(i).sin_addr.s_addr = htonl(INADDR_ANY); + inet_aton(m_multicastaddress.at(i).c_str(), &addr.at(i).sin_addr); addr.at(i).sin_port = htons(m_multicastport.at(i)); addrlen.at(i) = sizeof(addr.at(i)); // receive // - if (bind(sock.at(i), (struct sockaddr *) &addr.at(i), sizeof(addr.at(i))) < 0) { + if (bind(sock.at(i), (struct sockaddr *) &addr.at(i), sizeof(addr.at(i))) < 0) { perror("bind"); printf("Failed to bind to multicast listen socket"); exit(1); - } - mreq.imr_multiaddr.s_addr = inet_addr(m_multicastaddress.at(i).c_str()); - mreq.imr_interface.s_addr = htonl(INADDR_ANY); + } + mreq.imr_multiaddr.s_addr = inet_addr(m_multicastaddress.at(i).c_str()); + mreq.imr_interface.s_addr = htonl(INADDR_ANY); if (setsockopt(sock.at(i), IPPROTO_IP, IP_ADD_MEMBERSHIP,&mreq, sizeof(mreq)) < 0) { perror("setsockopt mreq"); printf("Failed to join multicast group listen thread"); exit(1); - } - + } + items.at(i+1).socket = 0; items.at(i+1).fd = sock.at(i); @@ -561,90 +562,90 @@ void* ServiceDiscovery::MulticastListenThread(void* arg){ for(size_t i =0; i < sock.size(); i++){ if ((items.at(i+1).revents & ZMQ_POLLIN) && running) { - - - cnt = recvfrom(sock.at(i), message, sizeof(message), 0, (struct sockaddr *) &addr.at(i), (socklen_t*) &addrlen.at(i)); - if ((cnt > 0) && (message[0]=='{') ) { - //perror("recvfrom"); - // exit(1); - // break; - //} - //else if (cnt > 0){ - //printf("%s: message = \"%s\"\n", inet_ntoa(addr.at(i).sin_addr), message); - - //if(message[0]!='[') break; - - Store* newservice= new Store(); - newservice->Set("ip",inet_ntoa(addr.at(i).sin_addr)); - newservice->JsonParser(message); - - std::string uuid; - newservice->Get("uuid",uuid); - if(RemoteServices.count(uuid)) delete RemoteServices[uuid]; - //std::cout< 0){ + //printf("%s: message = \"%s\"\n", inet_ntoa(addr.at(i).sin_addr), message); + + //if(message[0]!='[') break; + + Store* newservice= new Store(); + newservice->Set("ip",inet_ntoa(addr.at(i).sin_addr)); + newservice->JsonParser(message); + + std::string uuid; + newservice->Get("uuid",uuid); + if(RemoteServices.count(uuid)) delete RemoteServices[uuid]; + //std::cout<second->Get("msg_time",msg_time_orig); - - //std::cout<<" time orig ="<second->Get("msg_time",msg_time_orig); + + //std::cout<<" time orig ="<>arg1; - - if(arg1=="All"){ - - //printf("d2\n"); - //zmq::message_t sizem(512); - int size= RemoteServices.size(); - zmq::message_t sizem(sizeof size); - - snprintf ((char *) sizem.data(), sizeof size , "%d" ,size) ; - - // zmq::poll(out,1,1000); - - // if (out[0].revents & ZMQ_POLLOUT){ - //std::cout<<"SD sent size="<>arg1>>arg2; - - for (std::map::iterator it=RemoteServices.begin(); it!=RemoteServices.end(); ++it){ - - std::string test; - it->second->Get("service",test); - - if(arg2==test){ - - std::string service; - *(it->second)>>service; - zmq::message_t send(service.length()+1); - snprintf ((char *) send.data(), service.length()+1 , "%s" ,service.c_str()) ; - - try{ - zmq::poll(out,1,1000); - } catch(zmq::error_t& err){ - // ignore poll aborting due to signals - if(zmq_errno()==EINTR) continue; - throw; - } - - - if(out[0].revents & ZMQ_POLLOUT) Ireceive.send(send); - } - } - - } - - - - if(arg1=="UUID"){ - - iss>>arg1>>arg2; - - for (std::map::iterator it=RemoteServices.begin(); it!=RemoteServices.end(); ++it){ - - - std::string test; - it->second->Get("uuid",test); - - if(arg2==test){ - - std::string service; - *(it->second)>>service; - zmq::message_t send(service.length()+1); - snprintf ((char *) send.data(), service.length()+1 , "%s" ,service.c_str()) ; - - try{ - zmq::poll(out,1,1000); - } catch(zmq::error_t& err){ - // ignore poll aborting due to signals - if(zmq_errno()==EINTR) continue; - throw; - } - - if(out[0].revents & ZMQ_POLLOUT) Ireceive.send(send); - } - } - - } - - if(arg1=="Quit"){ - - running=false; - //printf("quitting listening \n"); - } - - - - - /* - std::string tmp="0"; - zmq::message_t send(tmp.length()+1); - snprintf ((char *) send.data(), tmp.length()+1 , "%s" ,tmp.c_str()) ; - Ireceive.send(send); - //printf("sent \n"); - */ + + std::istringstream iss(static_cast(comm.data())); + std::string arg1=""; + std::string arg2=""; + + iss>>arg1; + + if(arg1=="All"){ + + //printf("d2\n"); + //zmq::message_t sizem(512); + int size= RemoteServices.size(); + zmq::message_t sizem(sizeof size); + + snprintf ((char *) sizem.data(), sizeof size , "%d" ,size) ; + + // zmq::poll(out,1,1000); + + // if (out[0].revents & ZMQ_POLLOUT){ + //std::cout<<"SD sent size="<>arg1>>arg2; + + for (std::map::iterator it=RemoteServices.begin(); it!=RemoteServices.end(); ++it){ + + std::string test; + it->second->Get("service",test); + + if(arg2==test){ + + std::string service; + *(it->second)>>service; + zmq::message_t send(service.length()+1); + snprintf ((char *) send.data(), service.length()+1 , "%s" ,service.c_str()) ; + + try{ + zmq::poll(out,1,1000); + } catch(zmq::error_t& err){ + // ignore poll aborting due to signals + if(zmq_errno()==EINTR) continue; + throw; + } + + + if(out[0].revents & ZMQ_POLLOUT) Ireceive.send(send); + } + } + + } + + + + if(arg1=="UUID"){ + + iss>>arg1>>arg2; + + for (std::map::iterator it=RemoteServices.begin(); it!=RemoteServices.end(); ++it){ + + + std::string test; + it->second->Get("uuid",test); + + if(arg2==test){ + + std::string service; + *(it->second)>>service; + zmq::message_t send(service.length()+1); + snprintf ((char *) send.data(), service.length()+1 , "%s" ,service.c_str()) ; + + try{ + zmq::poll(out,1,1000); + } catch(zmq::error_t& err){ + // ignore poll aborting due to signals + if(zmq_errno()==EINTR) continue; + throw; + } + + if(out[0].revents & ZMQ_POLLOUT) Ireceive.send(send); + } + } + + } + + if(arg1=="Quit"){ + + running=false; + //printf("quitting listening \n"); + } + + + + + /* + std::string tmp="0"; + zmq::message_t send(tmp.length()+1); + snprintf ((char *) send.data(), tmp.length()+1 , "%s" ,tmp.c_str()) ; + Ireceive.send(send); + //printf("sent \n"); + */ } } @@ -805,16 +806,16 @@ void* ServiceDiscovery::MulticastListenThread(void* arg){ } -for (std::map::iterator it=RemoteServices.begin(); it!=RemoteServices.end(); ++it){ - delete it->second; - it->second=0; + for (std::map::iterator it=RemoteServices.begin(); it!=RemoteServices.end(); ++it){ + delete it->second; + it->second=0; + + } + RemoteServices.clear(); + //printf("exiting sd listen thread \n"); + pthread_exit(NULL); + //return (NULL); - } - RemoteServices.clear(); - //printf("exiting sd listen thread \n"); - pthread_exit(NULL); - //return (NULL); - } @@ -822,7 +823,7 @@ for (std::map::iterator it=RemoteServices.begin(); it!=Remot ServiceDiscovery::~ServiceDiscovery(){ //printf("in sd destructor \n"); - sleep(1); + sleep(1); //printf("finnish sleep \n"); // kill publish thread @@ -831,6 +832,8 @@ ServiceDiscovery::~ServiceDiscovery(){ if (m_send){ //printf("in sd send kill \n"); zmq::socket_t ServicePublish (*context, ZMQ_PUSH); + int linger = 0; + ServicePublish.setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); //int a=120000; //ServicePublish.setsockopt(ZMQ_RCVTIMEO, a); //ServicePublish.setsockopt(ZMQ_SNDTIMEO, a); @@ -860,9 +863,11 @@ ServiceDiscovery::~ServiceDiscovery(){ if(m_receive){ //printf("in sd receive kill \n"); zmq::socket_t ServiceDiscovery (*context, ZMQ_DEALER); + int linger=0; + ServiceDiscovery.setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); // int a=60000; //ServiceDiscovery.setsockopt(ZMQ_RCVTIMEO, a); - //ServiceDiscovery.setsockopt(ZMQ_SNDTIMEO, a); + //ServiceDiscovery.setsockopt(ZMQ_SNDTIMEO, a); ServiceDiscovery.connect("inproc://ServiceDiscovery"); @@ -889,7 +894,7 @@ ServiceDiscovery::~ServiceDiscovery(){ //printf("finnish Set args=0 \n"); } //printf("deleted args \n"); - + //printf("finnish sd destructor \n"); } From a1348a78f4b27b2d86504f3bbb7eca4439a60f9c Mon Sep 17 00:00:00 2001 From: Marcus O'Flaherty Date: Mon, 29 Dec 2025 15:46:46 +0000 Subject: [PATCH 4/5] putting logging and monitoring on different multicast addresses (but default to the same port) --- configfiles/Dummy/ToolChainConfig | 8 ++++---- configfiles/template/ToolChainConfig | 8 ++++---- src/ServiceDiscovery/ServicesBackend.cpp | 16 +++++++++------- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/configfiles/Dummy/ToolChainConfig b/configfiles/Dummy/ToolChainConfig index 7f0a1a0..533cb97 100644 --- a/configfiles/Dummy/ToolChainConfig +++ b/configfiles/Dummy/ToolChainConfig @@ -18,8 +18,8 @@ log_interactive 1 # Interactive=cout; 0=false, 1= true log_local 1 # Local = local file log; 0=false, 1= true log_local_path ./log # file to store logs to if local is active log_remote 1 # Remote= remote logging system "serservice_name Remote_Logging"; 0=false, 1= true -log_address 239.192.1.1 # Remote multicast address to send logs -log_port 55554 # port on remote machine to connect to +log_address 239.192.1.2 # Remote multicast address to send logs +log_port 5000 # port on remote machine to connect to log_append_time 0 # append seconds since epoch to filename; 0=false, 1= true log_split_files 1 # seperate output and error log files (named x.o and x.e) @@ -46,8 +46,8 @@ clt_dlr_socket_timeout 500 # inpoll_timeout 50 # keep these short! outpoll_timeout 50 # keep these short! command_timeout 2000 # -multicast_port 55554 # -multicast_address 239.192.1.1 # +mon_port 5000 # +mon_address 239.192.1.3 # ##### Tools To Add ##### Tools_File configfiles/Dummy/ToolsConfig # list of tools to run and their config files diff --git a/configfiles/template/ToolChainConfig b/configfiles/template/ToolChainConfig index d26b67a..badb1c3 100644 --- a/configfiles/template/ToolChainConfig +++ b/configfiles/template/ToolChainConfig @@ -18,8 +18,8 @@ log_interactive 1 # Interactive=cout; 0=false, 1= true log_local 0 # Local = local file log; 0=false, 1= true log_local_path ./log # file to store logs to if local is active log_remote 0 # Remote= remote logging system "serservice_name Remote_Logging"; 0=false, 1= true -log_address 239.192.1.1 # Remote multicast address to send logs -log_port 5001 # port on remote machine to connect to +log_address 239.192.1.2 # Remote multicast address to send logs +log_port 5000 # port on remote machine to connect to log_append_time 0 # append seconds since epoch to filename; 0=false, 1= true log_split_files 0 # seperate output and error log files (named x.o and x.e) @@ -45,8 +45,8 @@ clt_dlr_socket_timeout 500 # inpoll_timeout 50 # keep these short! outpoll_timeout 50 # keep these short! command_timeout 2000 # -multicast_port 55554 # -multicast_address 239.192.1.1 # +mon_port 5000 # +mon_address 239.192.1.3 # ##### Tools To Add ##### Tools_File configfiles/ToolsConfig # list of tools to run and their config files diff --git a/src/ServiceDiscovery/ServicesBackend.cpp b/src/ServiceDiscovery/ServicesBackend.cpp index c089eea..cea3104 100644 --- a/src/ServiceDiscovery/ServicesBackend.cpp +++ b/src/ServiceDiscovery/ServicesBackend.cpp @@ -280,13 +280,15 @@ bool ServicesBackend::InitMulticast(){ /* Multicast Setup */ /* ----------------------------------------- */ - int log_port = 55554; - int mon_port = 55553; - std::string multicast_address = "239.192.1.1"; + int log_port = 5000; + int mon_port = 5000; + std::string log_address = "239.192.1.2"; + std::string mon_address = "239.192.1.3"; m_variables.Get("log_port",log_port); m_variables.Get("mon_port",mon_port); - m_variables.Get("multicast_address",multicast_address); + m_variables.Get("log_address",log_address); + m_variables.Get("mon_address",mon_address); // set up multicast socket for sending logging & monitoring data log_socket = socket(AF_INET, SOCK_DGRAM, 0); @@ -323,10 +325,10 @@ bool ServicesBackend::InitMulticast(){ mon_addr = log_addr; mon_addr.sin_port = htons(mon_port); // convert destination address string to binary - get_ok = inet_aton(multicast_address.c_str(), &log_addr.sin_addr); - get_ok = get_ok && inet_aton(multicast_address.c_str(), &mon_addr.sin_addr); + get_ok = inet_aton(log_address.c_str(), &log_addr.sin_addr); + get_ok = get_ok && inet_aton(mon_address.c_str(), &mon_addr.sin_addr); if(get_ok==0){ // returns 0 on failure, not success - Log("Bad multicast address '"+multicast_address+"'",v_error,verbosity); + Log("Bad multicast address '"+log_address+"' or '"+mon_address+"'",v_error,verbosity); return false; } multicast_addrlen = sizeof(log_addr); From e77dbd214f8a90122f8e250bea011a149bc882b0 Mon Sep 17 00:00:00 2001 From: Marcus O'Flaherty Date: Sat, 3 Jan 2026 17:24:28 +0000 Subject: [PATCH 5/5] DAQUtilities: whitespace and commented-out printout ServiceDiscovery: add continue such that when there are messages available from the inproc socket, they are all processed before moving to broadcast part. Otherwise this means at most one message is processed per SD broadcast period. Services: add lifetime argument. conversion from unix ms to time string now done on client not MM. some DB field renames, remove quotes around embedded JSON. Responses with version numbers are just the number, no longer JSON, for reduced traffic. Get requests use SQL directly with json_build_object to have Postgres form JSON directly. Remove option to retrieve timestamp with GetRootPlot and GetPlotlyPlot for simplicity. ServicesBackend: make counters atomic. Update port advertisement methods. --- .gitignore | 3 + src/DAQDataModelBase/DAQUtilities.cpp | 19 +- src/ServiceDiscovery/ServiceDiscovery.cpp | 9 +- src/ServiceDiscovery/Services.cpp | 261 +++++++++++--------- src/ServiceDiscovery/Services.h | 21 +- src/ServiceDiscovery/ServicesBackend.cpp | 51 ++-- src/ServiceDiscovery/ServicesBackend.h | 9 +- src/ServiceDiscovery/SlowControlElement.cpp | 4 +- 8 files changed, 210 insertions(+), 167 deletions(-) diff --git a/.gitignore b/.gitignore index a2cf310..b1e8c16 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,8 @@ *.pcm # python pre-compiled modules *.pyc +# core dumps +core # lib folder is populated during build lib/ @@ -23,3 +25,4 @@ tempinclude/ main NodeDaemon RemoteControl +MCDebug diff --git a/src/DAQDataModelBase/DAQUtilities.cpp b/src/DAQDataModelBase/DAQUtilities.cpp index f2cc7f5..cdc7b5e 100644 --- a/src/DAQDataModelBase/DAQUtilities.cpp +++ b/src/DAQDataModelBase/DAQUtilities.cpp @@ -119,10 +119,11 @@ int DAQUtilities::UpdateConnections(std::string ServiceName, zmq::socket_t* sock if(port=="" && port_name=="") service->Get("remote_port",remote_port); else if(port_name!="") service->Get(port_name, remote_port); if(remote_port==""){ - delete service; - service=0; - continue; + delete service; + service=0; + continue; } + //printf("updateconnections checking if service '%s' is interested in client '%s' on port '%s'\n",ServiceName.c_str(), type.c_str(),remote_port.c_str()); std::string tmp=ip + ":" + remote_port; @@ -162,7 +163,7 @@ DAQThread_args* DAQUtilities::CreateThread(std::string ThreadName, void (*func) } return args; -} +} void *DAQUtilities::String_Thread(void *arg){ @@ -189,11 +190,11 @@ void *DAQUtilities::String_Thread(void *arg){ zmq::poll(&initems[0], 1, 0); if ((initems[0].revents & ZMQ_POLLIN)){ - - zmq::message_t message; - IThread.recv(&message); - command=std::string(static_cast(message.data())); - + + zmq::message_t message; + IThread.recv(&message); + command=std::string(static_cast(message.data())); + } args->func_with_string(command); diff --git a/src/ServiceDiscovery/ServiceDiscovery.cpp b/src/ServiceDiscovery/ServiceDiscovery.cpp index c51e288..1150d16 100644 --- a/src/ServiceDiscovery/ServiceDiscovery.cpp +++ b/src/ServiceDiscovery/ServiceDiscovery.cpp @@ -177,8 +177,8 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ bool statusquery; tmp>>command>>service>>uuid>>port>>statusquery; - - + + //printf("SD publish thread got request command %s\n",command.c_str()); if(command=="Quit"){ //printf("publish quitting \n"); running=false; @@ -192,6 +192,7 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ bb.Set("status_query",statusquery); bb.Set("uuid",boost::uuids::to_string(uuid)); PubServices.push_back(bb); + //printf("SD publish thread Adding service %s\n", service.c_str()); } else if(command=="Delete"){ @@ -209,11 +210,13 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ } else if(command=="PortAdd"){ if(PubServices.size()) PubServices.at(0).Set(service, port); + //printf("SD publish thread %s adding port %d for service %s (# services: %d)\n",(PubServices.empty() ? "not" : "\b"), port, service.c_str(), PubServices.size()); } else if(command =="PortDelete"){ if(PubServices.size()) PubServices.at(0).Erase(service); } + continue; } if ((items [1].revents & ZMQ_POLLOUT) && running){ @@ -571,7 +574,7 @@ void* ServiceDiscovery::MulticastListenThread(void* arg){ // break; //} //else if (cnt > 0){ - //printf("%s: message = \"%s\"\n", inet_ntoa(addr.at(i).sin_addr), message); + //printf("SD receive from %s: message = \"%s\"\n", inet_ntoa(addr.at(i).sin_addr), message); //if(message[0]!='[') break; diff --git a/src/ServiceDiscovery/Services.cpp b/src/ServiceDiscovery/Services.cpp index 2890dc7..be3bca4 100644 --- a/src/ServiceDiscovery/Services.cpp +++ b/src/ServiceDiscovery/Services.cpp @@ -2,6 +2,10 @@ using namespace ToolFramework; +namespace { + const uint32_t MAX_UDP_PACKET_SIZE = 655355; +} + Services::Services(){ m_context=0; m_dbname=""; @@ -33,7 +37,7 @@ bool Services::Init(Store &m_variables, zmq::context_t* context_in, SlowControlC m_variables.Get("alerts_receive", alerts_receive); m_variables.Get("alert_receive_port", alert_receive_port); m_variables.Get("sc_port", sc_port); - + sc_vars->InitThreadedReceiver(m_context, sc_port, 100, new_service, alert_receive_port, alerts_receive, alert_send_port, alerts_send); m_backend_client.SetUp(m_context); @@ -50,7 +54,7 @@ bool Services::Init(Store &m_variables, zmq::context_t* context_in, SlowControlC // so we need to wait for the middleman to receive one & connect before we can communicate with it. int pub_period=5; m_variables.Get("service_publish_sec",pub_period); - Ready(pub_period*1000); + Ready(pub_period*3000); // wait for 3x SD publish period. Trial and error: Why so long needed? return true; } @@ -70,23 +74,23 @@ bool Services::SendAlarm(const std::string& message, unsigned int level, const s // we only handle 2 levels of alarm: critical (level 0) and normal (level!=0). if(level>0) level=1; - std::string cmd_string = "{\"time\":"+std::to_string(timestamp) + std::string cmd_string = "{\"time\":\""+TimeStringFromUnixMs(timestamp)+"\"" + ",\"device\":\""+name+"\"" + ",\"level\":"+std::to_string(level) - + ",\"message\":\"" + message + "\"}"; + + ",\"alarm\":\"" + message + "\"}"; std::string err=""; // send the alarm on the pub socket - bool ok = m_backend_client.SendCommand("W_ALARM", cmd_string, (std::vector*)nullptr, &timeout, &err); + bool ok = m_backend_client.SendCommand("W_ALARM", cmd_string, (std::vector*)nullptr, &timeout, &err); if(!ok){ std::clog<<"SendAlarm error: "<>'"+name+"' FROM run_config WHERE config_id="+std::to_string(runconfig_id)+")"; + std::string cmd_string = "SELECT jsonb_build_object('data', data, 'version', version) FROM device_config WHERE name='"+name+"' AND version=(SELECT data->>'"+name+"' FROM run_config WHERE config_id="+std::to_string(runconfig_id)+")"; std::string err=""; @@ -606,7 +596,7 @@ bool Services::GetRunDeviceConfig(std::string& json_data, const int runconfig_id if(json_data.empty()){ // if we got an empty response but the command succeeded, // the query worked but matched no records - run config not found - err = "GetRunDeviceConfig error: config "+name+" for runconfig "+std::to_string(runconfig_id)+" not found" + err = "GetRunDeviceConfig error: config "+name+" for runconfig "+std::to_string(runconfig_id)+" not found"; std::clog<>'"+name+"' FROM run_config WHERE name='"+runconfig_name+"' AND version="+std::to_string(runconfig_version)+")"; + std::string cmd_string = "SELECT jsonb_build_object('data', data, 'version', version) FROM device_config WHERE name='"+name+"' AND version=(SELECT data->>'"+name+"' FROM run_config WHERE name='"+runconfig_name+"' AND version="+std::to_string(runconfig_version)+")"; std::string err=""; @@ -682,15 +672,15 @@ bool Services::GetRunDeviceConfig(std::string& json_data, const std::string& run // ««-------------- ≪ °◇◆◇° ≫ --------------»» -bool Services::GetROOTplot(const std::string& plot_name, int& version, std::string& draw_options, std::string& json_data, std::string* timestamp, const unsigned int timeout){ +bool Services::GetROOTplot(const std::string& plot_name, int& version, std::string& draw_options, std::string& json_data, const unsigned int timeout){ std::string cmd_string; if(version<0){ - cmd_string = "SELECT row_to_json(version, time, data, draw_options), FROM rootplots WHERE name='"+plot_name+"' AND version="+std::to_string(version); + cmd_string = "SELECT jsonb_build_object('version', version, 'data', data, 'draw_options', draw_options), FROM rootplots WHERE name='"+plot_name+"' AND version="+std::to_string(version); } else { // https://stackoverflow.com/questions/tagged/greatest-n-per-group for faster - cmd_string = "SELECT row_to_json(version, time, data, draw_options) FROM rootplots WHERE name='"+plot_name+"' ORDER BY version DESC LIMIT 1"; + cmd_string = "SELECT jsonb_build_object('version', version, 'data', data, 'draw_options', draw_options) FROM rootplots WHERE name='"+plot_name+"' ORDER BY version DESC LIMIT 1"; } std::string err=""; @@ -714,7 +704,6 @@ bool Services::GetROOTplot(const std::string& plot_name, int& version, std::stri bool ok = plot.Get("data", json_data); ok = ok && plot.Get("draw_options", draw_options); ok = ok && plot.Get("version", version); - if(timestamp && ok) ok = ok && plot.Get("time", *timestamp); if(!ok){ err="GetROOTplot error: invalid response: '"+response+"'"; @@ -724,8 +713,7 @@ bool Services::GetROOTplot(const std::string& plot_name, int& version, std::stri } /* - std::clog<<"timestamp: "<655355){ - std::clog<<"Logging message is too long! Maximum length may be 655355 bytes"<MAX_UDP_PACKET_SIZE){ + std::clog<<"Logging message is too long! Maximum length may be MAX_UDP_PACKET_SIZE bytes"<655355){ - std::clog<<"Monitoring message is too long! Maximum length may be 655355 bytes"<MAX_UDP_PACKET_SIZE){ + std::clog<<"Monitoring message is too long! Maximum length may be MAX_UDP_PACKET_SIZE bytes"<655355){ - std::clog<<"ROOT plot json is too long! Maximum length may be 655355 bytes"<MAX_UDP_PACKET_SIZE){ + std::clog<<"ROOT plot json is too long! Maximum length may be MAX_UDP_PACKET_SIZE bytes"<& json_traces, const std::string& json_layout="{}", int* version=nullptr, uint64_t timestamp=0, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool GetPlotlyPlot(const std::string& name, int& version, std::string& json_trace, std::string& json_layout, std::string* timestamp=nullptr, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool GetRunDeviceConfig(std::string& json_data, const int runconfig_id, const std::string& device="", int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool GetRunDeviceConfig(std::string& json_data, const std::string& runconfig_name, const int runconfig_version=-1, const std::string& device="", int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + // FIXME is default lifetime 5 ok? + bool SendROOTplotZmq(const std::string& plot_name, const std::string& draw_options, const std::string& json_data, int* version=nullptr, const uint64_t timestamp=0, const unsigned int lifetime=5, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + // FIXME is default lifetime 5 ok? + bool SendROOTplotMulticast(const std::string& plot_name, const std::string& draw_options, const std::string& json_data, const unsigned int lifetime=5, const uint64_t timestamp=SERVICES_DEFAULT_TIMEOUT); + bool GetROOTplot(const std::string& plot_name, int& version, std::string& draw_option, std::string& json_data, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + // FIXME is default lifetime 5 ok? + bool SendPlotlyPlot(const std::string& name, const std::string& json_trace, const std::string& json_layout="{}", int* version=nullptr, uint64_t timestamp=0, const unsigned int lifetime=5, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + // FIXME is default lifetime 5 ok? + bool SendPlotlyPlot(const std::string& name, const std::vector& json_traces, const std::string& json_layout="{}", int* version=nullptr, uint64_t timestamp=0, const unsigned int lifetime=5, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool GetPlotlyPlot(const std::string& name, int& version, std::string& json_trace, std::string& json_layout, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + std::string TimeStringFromUnixMs(const uint64_t time); SlowControlCollection* GetSlowControlCollection(); SlowControlElement* GetSlowControlVariable(std::string key); diff --git a/src/ServiceDiscovery/ServicesBackend.cpp b/src/ServiceDiscovery/ServicesBackend.cpp index cea3104..6d300bb 100644 --- a/src/ServiceDiscovery/ServicesBackend.cpp +++ b/src/ServiceDiscovery/ServicesBackend.cpp @@ -96,22 +96,22 @@ void ServicesBackend::SetUp(zmq::context_t* in_context, std::function>tmp; - m_variables.JsonParser(tmp); +bool ServicesBackend::Initialise(Store &variables_in){ + + std::string tmp=""; + variables_in>>tmp; + m_variables.JsonParser(tmp); /* General Variables */ /* ----------------------------------------- */ @@ -348,9 +348,13 @@ bool ServicesBackend::RegisterServices(){ // we can make our lives a little easier by using a Utilities class utilities = new DAQUtilities(context); - // we can now register the client sockets with the following: - utilities->AddService("slowcontrol_write", clt_pub_port); - utilities->AddService("slowcontrol_read", clt_dlr_port); + // register our ports for advertisement + get_ok = utilities->AddPort("db_write", clt_pub_port); + get_ok = get_ok && utilities->AddPort("db_read", clt_dlr_port); + if(!get_ok){ + Log("Error advertising ports!",v_error,verbosity); + return false; + } return true; } @@ -533,8 +537,8 @@ bool ServicesBackend::DoCommand(Command& cmd, uint32_t timeout_ms){ // submit a request to send our command. std::promise send_ticket; std::future send_receipt = send_ticket.get_future(); - send_queue_mutex.lock(); if(verbosity>10) std::cout<<"ServicesBackend::DoCommand putting command into waiting-to-send list"<RemoveService("slowcontrol_write"); - if(utilities) utilities->RemoveService("slowcontrol_read"); + //if(utilities) utilities->RemoveService("slowcontrol_write"); + if(utilities) utilities->RemovePort("db_read"); + if(utilities) utilities->RemovePort("db_write"); Log("ServicesBackend Closing multicast socket",v_debug,verbosity); close(log_socket); @@ -964,6 +969,9 @@ bool ServicesBackend::Ready(int timeout){ // only poll dealer socket, pub sockets always return true immediately so ignore the timeout // polling the input socket checks for a message, so don't do that. int ret; +// printf("ServicesBackend waiting for up to %d ms for connection on read/rep socket\n",timeout); +// auto timeout_ms = std::chrono::milliseconds(timeout); +// std::chrono::time_point start = std::chrono::steady_clock::now(); try { dlr_socket_mutex.lock(); ret = zmq::poll(&out_polls.at(1), 1, timeout); @@ -978,9 +986,12 @@ bool ServicesBackend::Ready(int timeout){ return false; } else if(ret==0){ // 'resource temoprarily unavailable' - no-one connected. +// printf("ServicesBackend::Ready - no one connected (%s)\n", zmq_strerror(errno)); } else if(out_polls.at(1).revents & ZMQ_POLLOUT){ +// printf("Connected!\n"); return true; } +// printf("returning after %ld/%d ms\n", std::chrono::duration_cast(std::chrono::steady_clock::now() - start).count(), timeout); return false; diff --git a/src/ServiceDiscovery/ServicesBackend.h b/src/ServiceDiscovery/ServicesBackend.h index cceed33..f0136da 100644 --- a/src/ServiceDiscovery/ServicesBackend.h +++ b/src/ServiceDiscovery/ServicesBackend.h @@ -14,6 +14,9 @@ #include #include #include +#include +#include // sleep_for / sleep_until +#include // this_thread #include // gethostname #include // toupper/tolower #include // std::function, std::negate @@ -132,8 +135,8 @@ class ServicesBackend { boost::posix_time::ptime last_read; // when we last sent a read command boost::posix_time::ptime last_printout; // when we last printed out stats about what we're doing - int read_commands_failed; - int write_commands_failed; + std::atomic read_commands_failed{0}; + std::atomic write_commands_failed{0}; // general int verbosity; @@ -151,7 +154,7 @@ class ServicesBackend { // since that's the one the middleman needs to know to send replies back std::string clt_ID; - uint32_t msg_id = 0; + std::atomic msg_id{0}; // ======================================================= diff --git a/src/ServiceDiscovery/SlowControlElement.cpp b/src/ServiceDiscovery/SlowControlElement.cpp index e8f9f7f..f8f6ef0 100644 --- a/src/ServiceDiscovery/SlowControlElement.cpp +++ b/src/ServiceDiscovery/SlowControlElement.cpp @@ -202,10 +202,8 @@ bool SlowControlElement::SetDefault(std::string value){ bool SlowControlElement::SetValue(const char value[]){ - bool ret=false; std::string tmp_value=value; - ret=SetValue(tmp_value); - return ret; + return SetValue(tmp_value); }