diff --git a/Makefile b/Makefile index 7708262..ee4da40 100755 --- a/Makefile +++ b/Makefile @@ -1,16 +1,16 @@ Dependencies=/opt -PostgresLib= -L $(Dependencies)/libpqxx-6.4.5/install/lib -lpqxx -L `pg_config --libdir` -lpq -PostgresInclude= -I $(Dependencies)/libpqxx-6.4.5/install/include -I `pg_config --includedir` -#PostgresLib= -L $(Dependencies)/libpqxx-6.4.5/install/lib -lpqxx `pg_config --ldflags` `pg_config --libs` +PostgresLib= -L $(Dependencies)/libpqxx-7.8.1/install/lib -lpqxx -L `pg_config --libdir` -lpq +PostgresInclude= -I $(Dependencies)/libpqxx-7.8.1/install/include -I `pg_config --includedir` +#PostgresLib= -L $(Dependencies)/libpqxx-7.8.1/install/lib -lpqxx `pg_config --ldflags` `pg_config --libs` -ZMQLib= -L $(Dependencies)/zeromq-4.0.7/lib -lzmq -ZMQInclude= -I $(Dependencies)/zeromq-4.0.7/include +ZMQLib= -L $(Dependencies)/zeromq-4.3.5/lib -lzmq +ZMQInclude= -I $(Dependencies)/zeromq-4.3.5/include BoostLib= -L $(Dependencies)/boost_1_66_0/install/lib -lboost_date_time BoostInclude= -I $(Dependencies)/boost_1_66_0/install/include -CXXFLAGS= -g -std=c++11 -fdiagnostics-color=always -Wno-attributes -O3 +CXXFLAGS= -g -fdiagnostics-color=always -Wno-attributes -O3 all: middleman diff --git a/Postgres.cpp b/Postgres.cpp index c880ee9..6e2d930 100755 --- a/Postgres.cpp +++ b/Postgres.cpp @@ -74,12 +74,8 @@ bool Postgres::CloseConnection(std::string* err){ if(verbosity>v_debug) std::cout<<"Connection is not open"<disconnect(); - if(conn->is_open()){ - std::cerr<<"Attempted to close postgresql connection, yet it remains open?!" <(database_in.data()); @@ -28,8 +28,8 @@ Query::Query(zmq::message_t& client_id_in, zmq::message_t& msg_id_in, zmq::messa // copy constructor Query::Query(const Query& in){ - client_id.copy(&in.client_id); - message_id.copy(&in.message_id); + client_id.copy(const_cast(in.client_id)); + message_id.copy(const_cast(in.message_id)); database = in.database; query = in.query; query_ok = in.query_ok; diff --git a/ReceiveSQL.cpp b/ReceiveSQL.cpp index 06ee8eb..46833c4 100755 --- a/ReceiveSQL.cpp +++ b/ReceiveSQL.cpp @@ -56,7 +56,7 @@ bool ReceiveSQL::Execute(){ // poll the input sockets for messages Log("Polling input sockets",4); - get_ok = zmq::poll(in_polls.data(), in_polls.size(), inpoll_timeout); + get_ok = zmq::poll(in_polls.data(), in_polls.size(), std::chrono::milliseconds(inpoll_timeout)); if(get_ok<0){ Log("Warning! ReceiveSQL error polling input sockets; have they closed?",0); } @@ -87,7 +87,7 @@ bool ReceiveSQL::Execute(){ // poll the output sockets for listeners Log("Polling output sockets",4); - get_ok = zmq::poll(out_polls.data(), out_polls.size(), outpoll_timeout); + get_ok = zmq::poll(out_polls.data(), out_polls.size(), std::chrono::milliseconds(outpoll_timeout)); if(get_ok<0){ Log("Warning! ReceiveSQL error polling output sockets; have they closed?",0); } @@ -320,43 +320,43 @@ bool ReceiveSQL::InitZMQ(Store& m_variables){ // ------------------------------------------------------- clt_sub_socket = new zmq::socket_t(*context, ZMQ_SUB); // this socket never sends, so a send timeout is irrelevant. - clt_sub_socket->setsockopt(ZMQ_RCVTIMEO, clt_sub_socket_timeout); + clt_sub_socket->set(zmq::sockopt::rcvtimeo, clt_sub_socket_timeout); // don't linger too long, it looks like the program crashed. - clt_sub_socket->setsockopt(ZMQ_LINGER, 10); - clt_sub_socket->setsockopt(ZMQ_SUBSCRIBE,"",0); + clt_sub_socket->set(zmq::sockopt::linger, 10); + clt_sub_socket->set(zmq::sockopt::subscribe, ""); // we will connect this socket to clients with the utilities class } // socket to receive dealt read queries and send responses to clients // ------------------------------------------------------------------ clt_rtr_socket = new zmq::socket_t(*context, ZMQ_ROUTER); - clt_rtr_socket->setsockopt(ZMQ_SNDTIMEO, clt_rtr_socket_timeout); - clt_rtr_socket->setsockopt(ZMQ_RCVTIMEO, clt_rtr_socket_timeout); + clt_rtr_socket->set(zmq::sockopt::sndtimeo, clt_rtr_socket_timeout); + clt_rtr_socket->set(zmq::sockopt::rcvtimeo, clt_rtr_socket_timeout); // don't linger too long, it looks like the program crashed. - clt_rtr_socket->setsockopt(ZMQ_LINGER, 10); + clt_rtr_socket->set(zmq::sockopt::linger, 10); // FIXME remove- for debug only: // make reply socket error out if the destination is unreachable // (normally it silently drops the message) - clt_rtr_socket->setsockopt(ZMQ_ROUTER_MANDATORY, 1); + clt_rtr_socket->set(zmq::sockopt::router_mandatory, 1); // we'll connect this socket to clients with the utilities class // socket to listen for presence of the other middleman // ---------------------------------------------------- mm_rcv_socket = new zmq::socket_t(*context, ZMQ_SUB); - mm_rcv_socket->setsockopt(ZMQ_RCVTIMEO, mm_rcv_socket_timeout); + mm_rcv_socket->set(zmq::sockopt::rcvtimeo, mm_rcv_socket_timeout); // this socket never sends, so a send timeout is irrelevant. // don't linger too long, it looks like the program crashed. - mm_rcv_socket->setsockopt(ZMQ_LINGER, 10); - mm_rcv_socket->setsockopt(ZMQ_SUBSCRIBE,"",0); + mm_rcv_socket->set(zmq::sockopt::linger, 10); + mm_rcv_socket->set(zmq::sockopt::subscribe, ""); // we'll connect this socket to clients with the utilities class // socket to broadcast our presence to the other middleman // ------------------------------------------------------- mm_snd_socket = new zmq::socket_t(*context, ZMQ_PUB); - mm_snd_socket->setsockopt(ZMQ_SNDTIMEO, mm_snd_socket_timeout); + mm_snd_socket->set(zmq::sockopt::sndtimeo, mm_snd_socket_timeout); // this socket never receives, so a recieve timeout is irrelevant. // don't linger too long, it looks like the program crashed. - mm_snd_socket->setsockopt(ZMQ_LINGER, 10); + mm_snd_socket->set(zmq::sockopt::linger, 10); // this one we're a publisher, so we do bind. mm_snd_socket->bind(std::string("tcp://*:")+std::to_string(mm_snd_port)); @@ -365,21 +365,21 @@ bool ReceiveSQL::InitZMQ(Store& m_variables){ // socket to receive logging queries for the monitoring db // ------------------------------------------------------- log_sub_socket = new zmq::socket_t(*context, ZMQ_SUB); - log_sub_socket->setsockopt(ZMQ_RCVTIMEO, log_sub_socket_timeout); + log_sub_socket->set(zmq::sockopt::rcvtimeo, log_sub_socket_timeout); // this socket never sends, so a send timeout is irrelevant. // don't linger too long, it looks like the program crashed. - log_sub_socket->setsockopt(ZMQ_LINGER, 10); - log_sub_socket->setsockopt(ZMQ_SUBSCRIBE,"",0); + log_sub_socket->set(zmq::sockopt::linger, 10); + log_sub_socket->set(zmq::sockopt::subscribe, ""); // we'll connect this socket to clients with the utilities class } // socket to send log queries to the master, if not us // ---------------------------------------------------- log_pub_socket = new zmq::socket_t(*context, ZMQ_PUB); - log_pub_socket->setsockopt(ZMQ_SNDTIMEO, log_pub_socket_timeout); + log_pub_socket->set(zmq::sockopt::sndtimeo, log_pub_socket_timeout); // this socket never receives, so a recieve timeout is irrelevant. // don't linger too long, it looks like the program crashed. - log_pub_socket->setsockopt(ZMQ_LINGER, 10); + log_pub_socket->set(zmq::sockopt::linger, 10); // again we'll bind to this as it's a publisher log_pub_socket->bind(std::string("tcp://*:")+std::to_string(log_pub_port)); @@ -1909,22 +1909,22 @@ bool ReceiveSQL::UpdateRole(){ // socket to receive logging queries for the monitoring db // ------------------------------------------------------- log_sub_socket = new zmq::socket_t(*context, ZMQ_SUB); - log_sub_socket->setsockopt(ZMQ_RCVTIMEO, log_sub_socket_timeout); + log_sub_socket->set(zmq::sockopt::rcvtimeo, log_sub_socket_timeout); // this socket never receives, so a recieve timeout is irrelevant. // don't linger too long, it looks like the program crashed. - log_sub_socket->setsockopt(ZMQ_LINGER, 10); + log_sub_socket->set(zmq::sockopt::linger, 10); // connection to clients will be made via the utitlies class - log_sub_socket->setsockopt(ZMQ_SUBSCRIBE,"",0); + log_sub_socket->set(zmq::sockopt::subscribe, ""); // socket to receive published write queries from clients // ------------------------------------------------------- clt_sub_socket = new zmq::socket_t(*context, ZMQ_SUB); // this socket never sends, so a send timeout is irrelevant. - clt_sub_socket->setsockopt(ZMQ_RCVTIMEO, clt_sub_socket_timeout); + clt_sub_socket->set(zmq::sockopt::rcvtimeo, clt_sub_socket_timeout); // don't linger too long, it looks like the program crashed. - clt_sub_socket->setsockopt(ZMQ_LINGER, 10); + clt_sub_socket->set(zmq::sockopt::linger, 10); // connections to clients will be made via the utilities class - clt_sub_socket->setsockopt(ZMQ_SUBSCRIBE,"",0); + clt_sub_socket->set(zmq::sockopt::subscribe, ""); // add the additional polls for these sockets zmq::pollitem_t clt_sub_socket_pollin = zmq::pollitem_t{*clt_sub_socket,0,ZMQ_POLLIN,0}; @@ -2101,10 +2101,8 @@ bool ReceiveSQL::GetLastUpdateTime(std::string& our_timestamp){ // ««-------------- ≪ °◇◆◇° ≫ --------------»» bool ReceiveSQL::Send(zmq::socket_t* sock, bool more, zmq::message_t& message){ - bool send_ok; - if(more) send_ok = sock->send(message, ZMQ_SNDMORE); - else send_ok = sock->send(message); - return send_ok; + auto flags = more ? zmq::send_flags::sndmore : zmq::send_flags::none; + return sock->send(message, flags).has_value(); } // ««-------------- ≪ °◇◆◇° ≫ --------------»» @@ -2116,44 +2114,20 @@ bool ReceiveSQL::Send(zmq::socket_t* sock, bool more, std::string messagedata){ //snprintf((char*)message.data(), messagedata.size()+1, "%s", messagedata.c_str()); // send it with given SNDMORE flag - bool send_ok; - if(more) send_ok = sock->send(message, ZMQ_SNDMORE); - else send_ok = sock->send(message); - - return send_ok; + return Send(sock, more, message); } // ««-------------- ≪ °◇◆◇° ≫ --------------»» bool ReceiveSQL::Send(zmq::socket_t* sock, bool more, std::vector messages){ - - // loop over all but the last part in the input vector, - // and send with the SNDMORE flag - for(int i=0; i<(messages.size()-1); ++i){ - - // form zmq::message_t - zmq::message_t message(messages.at(i).size()); - memcpy(message.data(), messages.at(i).data(), messages.at(i).size()); - //snprintf((char*)message.data(), messages.at(i).size()+1, "%s", messages.at(i).c_str()); - - // send this part - bool send_ok = sock->send(message, ZMQ_SNDMORE); - - // break on error - if(not send_ok) return false; - } - - // form the zmq::message_t for the last part - zmq::message_t message(messages.back().size()); - memcpy(message.data(), messages.back().data(), messages.back().size()); - //snprintf((char*)message.data(), messages.back().size()+1, "%s", messages.back().c_str()); - - // send it with, or without SNDMORE flag as requested - bool send_ok; - if(more) send_ok = sock->send(message, ZMQ_SNDMORE); - else send_ok = sock->send(message); - - return send_ok; + // loop over all parts in the input vector + // send all but the last part with SNDMORE flag + // send the last part with SNDMORE flag if requested + auto rest = messages.size(); + for (auto& message : messages) + if (not Send(sock, more or --rest, message)) + return false; + return true; } // ««-------------- ≪ °◇◆◇° ≫ --------------»» @@ -2161,7 +2135,7 @@ bool ReceiveSQL::Send(zmq::socket_t* sock, bool more, std::vector m int ReceiveSQL::PollAndReceive(zmq::socket_t* sock, zmq::pollitem_t poll, int timeout, std::vector& outputs){ // poll the input socket for messages - get_ok = zmq::poll(&poll, 1, timeout); + get_ok = zmq::poll(&poll, 1, std::chrono::milliseconds(timeout)); if(get_ok<0){ // error polling - is the socket closed? return -3; @@ -2195,14 +2169,14 @@ bool ReceiveSQL::Receive(zmq::socket_t* sock, std::vector& outpu bool err=false; while(true){ //std::cout<recv(&tmp); - if(ok<0){ + auto ok = sock->recv(tmp); + if(not ok){ err=true; break; } // transfer the received message to the output vector outputs.resize(outputs.size()+1); - outputs.back().move(&tmp); + outputs.back().move(tmp); // receive next part if there is more to come if(!outputs.back().more()) break; diff --git a/ReceiveSQL.h b/ReceiveSQL.h index c50fdfb..f9c88c0 100755 --- a/ReceiveSQL.h +++ b/ReceiveSQL.h @@ -281,10 +281,7 @@ class ReceiveSQL{ bool Send(zmq::socket_t* sock, bool more, T&& messagedata){ zmq::message_t message(sizeof(T)); memcpy(message.data(), &messagedata, sizeof(T)); - bool send_ok; - if(more) send_ok = sock->send(message, ZMQ_SNDMORE); - else send_ok = sock->send(message); - return send_ok; + return Send(sock, more, message); } // recursive case; send the next message part and forward all remaining parts @@ -300,7 +297,7 @@ class ReceiveSQL{ template int PollAndSend(zmq::socket_t* sock, zmq::pollitem_t poll, int timeout, T&& message){ // check for listener - int ret = zmq::poll(&poll, 1, timeout); + int ret = zmq::poll(&poll, 1, std::chrono::milliseconds(timeout)); if(ret<0){ // error polling - is the socket closed? return -3; @@ -320,7 +317,7 @@ class ReceiveSQL{ template int PollAndSend(zmq::socket_t* sock, zmq::pollitem_t poll, int timeout, T&& message, Rest&&... rest){ // check for listener - int ret = zmq::poll(&poll, 1, timeout); + int ret = zmq::poll(&poll, 1, std::chrono::milliseconds(timeout)); if(ret<0){ // error polling - is the socket closed? return -3; diff --git a/ServiceDiscovery.cpp b/ServiceDiscovery.cpp index 71fb887..eb63bad 100644 --- a/ServiceDiscovery.cpp +++ b/ServiceDiscovery.cpp @@ -1,5 +1,7 @@ #include "ServiceDiscovery.h" +using namespace std::literals::chrono_literals; + ServiceDiscovery::ServiceDiscovery(bool Send, bool Receive, int remoteport, std::string address, int multicastport, zmq::context_t * incontext, boost::uuids::uuid UUID, std::string service, int pubsec, int kicksec){ @@ -65,8 +67,7 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ long msg_id=0; zmq::socket_t Ireceive (*context, ZMQ_PULL); - int linger = 0; - Ireceive.setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); + Ireceive.set(zmq::sockopt::linger, 0); Ireceive.bind("inproc://ServicePublish"); /// multi cast ///// @@ -119,12 +120,12 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ while(running){ - zmq::poll(&items [0], 2, -1); + zmq::poll(&items [0], 2); if ((items [0].revents & ZMQ_POLLIN) && running) { zmq::message_t commands; - Ireceive.recv(&commands); + Ireceive.recv(commands); std::istringstream tmp(static_cast(commands.data())); std::string command; @@ -193,13 +194,12 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ zmq::socket_t StatusCheck (*context, ZMQ_REQ); int a=2000; - StatusCheck.setsockopt(ZMQ_RCVTIMEO, a); - StatusCheck.setsockopt(ZMQ_SNDTIMEO, a); - int linger = 0; - StatusCheck.setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); - // StatusCheck.setsockopt(ZMQ_IMMEDIATE, 1); - //StatusCheck.setsockopt(ZMQ_REQ_RELAXED, 1); - //StatusCheck.setsockopt(ZMQ_REQ_CORRELATE, 1); + StatusCheck.set(zmq::sockopt::rcvtimeo, a); + StatusCheck.set(zmq::sockopt::sndtimeo, a); + StatusCheck.set(zmq::sockopt::linger, 0); + // StatusCheck.set(zmq::sockopt::immediate, 1); + //StatusCheck.set(zmq::sockopt:req_relaxed, 1); + //StatusCheck.set(zmq::sockopt::req_correlate, 1); std::stringstream connection; connection<<"tcp://localhost:"<<*(PubServices.at(i)["remote_port"]); StatusCheck.connect(connection.str().c_str()); @@ -220,18 +220,18 @@ void* ServiceDiscovery::MulticastPublishThread(void* arg){ zmq::message_t Esend(command.length()+1); snprintf ((char *) Esend.data(), command.length()+1 , "%s" ,command.c_str()) ; - zmq::poll(out,1,1000); + zmq::poll(out,1,1s); if(out[0].revents & ZMQ_POLLOUT){ - if(StatusCheck.send(Esend)){ + if(StatusCheck.send(Esend, zmq::send_flags::none)){ //StatusCheck.disconnect(connection.str().c_str()); //StatusCheck.close(); //std::cout<<"waiting for message "<(Ereceive.data())); @@ -380,7 +380,7 @@ void* ServiceDiscovery::MulticastListenThread(void* arg){ while(running){ - zmq::poll (&items [0], 2, -1); + zmq::poll (&items [0], 2); if ((items [0].revents & ZMQ_POLLIN) && running) { @@ -471,13 +471,13 @@ void* ServiceDiscovery::MulticastListenThread(void* arg){ if ((items [1].revents & ZMQ_POLLIN) && running) { zmq::message_t Identity; - Ireceive.recv(&Identity); + Ireceive.recv(Identity); - Ireceive.send(Identity,ZMQ_SNDMORE); + Ireceive.send(Identity, zmq::send_flags::sndmore); zmq::message_t comm; - if(Ireceive.recv(&comm)){ + if(Ireceive.recv(comm)){ std::istringstream iss(static_cast(comm.data())); std::string arg1=""; @@ -500,9 +500,7 @@ void* ServiceDiscovery::MulticastListenThread(void* arg){ //std::cout<<"SD sent size="<(arg); - zmq::poll(&(args->items[0]), 1, args->poll_length); + zmq::poll(&(args->items[0]), 1, std::chrono::milliseconds(args->poll_length)); if (args->items[0].revents & ZMQ_POLLIN){ zmq::message_t identity; - args->sock->recv(&identity); + args->sock->recv(identity); zmq::message_t blank; - args->sock->recv(&blank); + args->sock->recv(blank); zmq::message_t message; - args->sock->recv(&message); + args->sock->recv(message); std::istringstream iss(static_cast(message.data())); Store tmp; tmp.JsonParser(iss.str()); @@ -160,9 +160,9 @@ void SlowControlCollection::Thread(Thread_args* arg){ snprintf ((char *) send.data(), tmp2.length()+1 , "%s" ,tmp2.c_str()) ; - args->sock->send(identity, ZMQ_SNDMORE); - args->sock->send(blank, ZMQ_SNDMORE); - args->sock->send(send); + args->sock->send(identity, zmq::send_flags::sndmore); + args->sock->send(blank, zmq::send_flags::sndmore); + args->sock->send(send, zmq::send_flags::none); } diff --git a/Utilities.cpp b/Utilities.cpp index 0c3dc86..b2e422e 100644 --- a/Utilities.cpp +++ b/Utilities.cpp @@ -1,5 +1,7 @@ #include +using namespace std::literals::chrono_literals; + Utilities::Utilities(zmq::context_t* zmqcontext){ context=zmqcontext; Threads.clear(); @@ -19,7 +21,7 @@ bool Utilities::AddService(std::string ServiceName, unsigned int port, bool Stat zmq::message_t send(test.str().length()+1); snprintf ((char *) send.data(), test.str().length()+1 , "%s" ,test.str().c_str()) ; - return Ireceive.send(send); + return Ireceive.send(send, zmq::send_flags::none).has_value(); } @@ -35,7 +37,7 @@ bool Utilities::RemoveService(std::string ServiceName){ zmq::message_t send(test.str().length()+1); snprintf ((char *) send.data(), test.str().length()+1 , "%s" ,test.str().c_str()) ; - return Ireceive.send(send); + return Ireceive.send(send, zmq::send_flags::none).has_value(); } @@ -52,12 +54,12 @@ int Utilities::UpdateConnections(std::string ServiceName, zmq::socket_t* sock, s snprintf ((char *) send.data(), 4 , "%s" ,"All") ; - if(!Ireceive.send(send)){ + if(!Ireceive.send(send, zmq::send_flags::none)){ std::cerr<<"Failed to send 'ALL' query to ServiceDiscovery!"<(receive.data())); @@ -70,7 +72,7 @@ int Utilities::UpdateConnections(std::string ServiceName, zmq::socket_t* sock, s Store *service = new Store; zmq::message_t servicem; - Ireceive.recv(&servicem); + Ireceive.recv(servicem); std::istringstream ss(static_cast(servicem.data())); service->JsonParser(ss.str()); @@ -128,12 +130,12 @@ int Utilities::ConnectToEndpoints(zmq::socket_t* readrep_sock, std::map(receive.data())); @@ -148,7 +150,7 @@ int Utilities::ConnectToEndpoints(zmq::socket_t* readrep_sock, std::map(servicem.data())); service->JsonParser(ss.str()); @@ -304,12 +306,12 @@ void *Utilities::String_Thread(void *arg){ std::string command=""; - zmq::poll(&initems[0], 1, 0); + zmq::poll(&initems[0], 1, 0ms); if ((initems[0].revents & ZMQ_POLLIN)){ zmq::message_t message; - IThread.recv(&message); + IThread.recv(message); command=std::string(static_cast(message.data())); } @@ -357,9 +359,8 @@ bool Utilities::MessageThread(Thread_args* args, std::string Message, bool block zmq::message_t msg(Message.length()+1); snprintf((char *)msg.data(), Message.length()+1, "%s", Message.c_str()); - if(block) ret=args->sock->send(msg); - else ret=args->sock->send(msg, ZMQ_NOBLOCK); - + auto flags = block ? zmq::send_flags::none : zmq::send_flags::dontwait; + ret = args->sock->send(msg, flags).has_value(); } return ret; diff --git a/run_middleman.sh b/run_middleman.sh index cdbdcef..d2fefcb 100644 --- a/run_middleman.sh +++ b/run_middleman.sh @@ -3,7 +3,7 @@ #systemctl start postgresql-12 # export path to zmq, pqxx and boost -export LD_LIBRARY_PATH=/opt/libpqxx-6.4.5/install/lib:/opt/boost_1_66_0/install/lib:/opt/zeromq-4.0.7/lib:$LD_LIBRARY_PATH +export LD_LIBRARY_PATH=/opt/libpqxx-7.8.1/install/lib:/opt/boost_1_66_0/install/lib:/opt/zeromq-4.3.5/lib:$LD_LIBRARY_PATH # setup database environmental variables # (probably not required as they're overridden in the config file anyway)