Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
Dependencies=/opt

PostgresLib= -L $(Dependencies)/libpqxx-6.4.5/install/lib -lpqxx -L `pg_config --libdir` -lpq
PostgresInclude= -I $(Dependencies)/libpqxx-6.4.5/install/include -I `pg_config --includedir`
#PostgresLib= -L $(Dependencies)/libpqxx-6.4.5/install/lib -lpqxx `pg_config --ldflags` `pg_config --libs`
PostgresLib= -L $(Dependencies)/libpqxx-7.8.1/install/lib -lpqxx -L `pg_config --libdir` -lpq
PostgresInclude= -I $(Dependencies)/libpqxx-7.8.1/install/include -I `pg_config --includedir`
#PostgresLib= -L $(Dependencies)/libpqxx-7.8.1/install/lib -lpqxx `pg_config --ldflags` `pg_config --libs`

ZMQLib= -L $(Dependencies)/zeromq-4.0.7/lib -lzmq
ZMQInclude= -I $(Dependencies)/zeromq-4.0.7/include
ZMQLib= -L $(Dependencies)/zeromq-4.3.5/lib -lzmq
ZMQInclude= -I $(Dependencies)/zeromq-4.3.5/include

BoostLib= -L $(Dependencies)/boost_1_66_0/install/lib -lboost_date_time
BoostInclude= -I $(Dependencies)/boost_1_66_0/install/include

CXXFLAGS= -g -std=c++11 -fdiagnostics-color=always -Wno-attributes -O3
CXXFLAGS= -g -fdiagnostics-color=always -Wno-attributes -O3

all: middleman

Expand Down
10 changes: 2 additions & 8 deletions Postgres.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,8 @@ bool Postgres::CloseConnection(std::string* err){
if(verbosity>v_debug) std::cout<<"Connection is not open"<<std::endl;
return true;
}
conn->disconnect();
if(conn->is_open()){
std::cerr<<"Attempted to close postgresql connection, yet it remains open?!" <<std::endl;
if(err) *err="pqxx::connection::is_open() returns true even after calling disconnect";
return false;
}
delete conn;
conn = nullptr;
return true;
}
catch (const pqxx::broken_connection &e){
Expand All @@ -95,7 +91,6 @@ bool Postgres::CloseConnection(std::string* err){

Postgres::~Postgres(){
CloseConnection();
if(conn) delete conn;
}

Postgres::Postgres(){}
Expand Down Expand Up @@ -174,7 +169,6 @@ bool Postgres::Query(std::string query, int nret, pqxx::result* res, pqxx::row*
// if our connection is broken after all, disconnect, reconnect and retry
if(tries==0){
CloseConnection();
delete conn; conn=nullptr;
continue;
} else {
std::cerr<<"Postgres::Query error - broken connection, failed to re-establish it"<<std::endl;
Expand Down
8 changes: 4 additions & 4 deletions Query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

// constructor from elements, given a direct SQL query string
Query::Query(zmq::message_t& client_id_in, zmq::message_t& msg_id_in, zmq::message_t& database_in, zmq::message_t& query_in, uint32_t query_ok_in, std::string response_in){
client_id.move(&client_id_in);
message_id.move(&msg_id_in);
client_id.move(client_id_in);
message_id.move(msg_id_in);

// NOPE reinterpret_cast and copy construction doesn't work
//database = reinterpret_cast<const char*>(database_in.data());
Expand All @@ -28,8 +28,8 @@ Query::Query(zmq::message_t& client_id_in, zmq::message_t& msg_id_in, zmq::messa

// copy constructor
Query::Query(const Query& in){
client_id.copy(&in.client_id);
message_id.copy(&in.message_id);
client_id.copy(const_cast<zmq::message_t&>(in.client_id));
message_id.copy(const_cast<zmq::message_t&>(in.message_id));
database = in.database;
query = in.query;
query_ok = in.query_ok;
Expand Down
106 changes: 40 additions & 66 deletions ReceiveSQL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ bool ReceiveSQL::Execute(){

// poll the input sockets for messages
Log("Polling input sockets",4);
get_ok = zmq::poll(in_polls.data(), in_polls.size(), inpoll_timeout);
get_ok = zmq::poll(in_polls.data(), in_polls.size(), std::chrono::milliseconds(inpoll_timeout));
if(get_ok<0){
Log("Warning! ReceiveSQL error polling input sockets; have they closed?",0);
}
Expand Down Expand Up @@ -87,7 +87,7 @@ bool ReceiveSQL::Execute(){

// poll the output sockets for listeners
Log("Polling output sockets",4);
get_ok = zmq::poll(out_polls.data(), out_polls.size(), outpoll_timeout);
get_ok = zmq::poll(out_polls.data(), out_polls.size(), std::chrono::milliseconds(outpoll_timeout));
if(get_ok<0){
Log("Warning! ReceiveSQL error polling output sockets; have they closed?",0);
}
Expand Down Expand Up @@ -320,43 +320,43 @@ bool ReceiveSQL::InitZMQ(Store& m_variables){
// -------------------------------------------------------
clt_sub_socket = new zmq::socket_t(*context, ZMQ_SUB);
// this socket never sends, so a send timeout is irrelevant.
clt_sub_socket->setsockopt(ZMQ_RCVTIMEO, clt_sub_socket_timeout);
clt_sub_socket->set(zmq::sockopt::rcvtimeo, clt_sub_socket_timeout);
// don't linger too long, it looks like the program crashed.
clt_sub_socket->setsockopt(ZMQ_LINGER, 10);
clt_sub_socket->setsockopt(ZMQ_SUBSCRIBE,"",0);
clt_sub_socket->set(zmq::sockopt::linger, 10);
clt_sub_socket->set(zmq::sockopt::subscribe, "");
// we will connect this socket to clients with the utilities class
}

// socket to receive dealt read queries and send responses to clients
// ------------------------------------------------------------------
clt_rtr_socket = new zmq::socket_t(*context, ZMQ_ROUTER);
clt_rtr_socket->setsockopt(ZMQ_SNDTIMEO, clt_rtr_socket_timeout);
clt_rtr_socket->setsockopt(ZMQ_RCVTIMEO, clt_rtr_socket_timeout);
clt_rtr_socket->set(zmq::sockopt::sndtimeo, clt_rtr_socket_timeout);
clt_rtr_socket->set(zmq::sockopt::rcvtimeo, clt_rtr_socket_timeout);
// don't linger too long, it looks like the program crashed.
clt_rtr_socket->setsockopt(ZMQ_LINGER, 10);
clt_rtr_socket->set(zmq::sockopt::linger, 10);
// FIXME remove- for debug only:
// make reply socket error out if the destination is unreachable
// (normally it silently drops the message)
clt_rtr_socket->setsockopt(ZMQ_ROUTER_MANDATORY, 1);
clt_rtr_socket->set(zmq::sockopt::router_mandatory, 1);
// we'll connect this socket to clients with the utilities class

// socket to listen for presence of the other middleman
// ----------------------------------------------------
mm_rcv_socket = new zmq::socket_t(*context, ZMQ_SUB);
mm_rcv_socket->setsockopt(ZMQ_RCVTIMEO, mm_rcv_socket_timeout);
mm_rcv_socket->set(zmq::sockopt::rcvtimeo, mm_rcv_socket_timeout);
// this socket never sends, so a send timeout is irrelevant.
// don't linger too long, it looks like the program crashed.
mm_rcv_socket->setsockopt(ZMQ_LINGER, 10);
mm_rcv_socket->setsockopt(ZMQ_SUBSCRIBE,"",0);
mm_rcv_socket->set(zmq::sockopt::linger, 10);
mm_rcv_socket->set(zmq::sockopt::subscribe, "");
// we'll connect this socket to clients with the utilities class

// socket to broadcast our presence to the other middleman
// -------------------------------------------------------
mm_snd_socket = new zmq::socket_t(*context, ZMQ_PUB);
mm_snd_socket->setsockopt(ZMQ_SNDTIMEO, mm_snd_socket_timeout);
mm_snd_socket->set(zmq::sockopt::sndtimeo, mm_snd_socket_timeout);
// this socket never receives, so a recieve timeout is irrelevant.
// don't linger too long, it looks like the program crashed.
mm_snd_socket->setsockopt(ZMQ_LINGER, 10);
mm_snd_socket->set(zmq::sockopt::linger, 10);
// this one we're a publisher, so we do bind.
mm_snd_socket->bind(std::string("tcp://*:")+std::to_string(mm_snd_port));

Expand All @@ -365,21 +365,21 @@ bool ReceiveSQL::InitZMQ(Store& m_variables){
// socket to receive logging queries for the monitoring db
// -------------------------------------------------------
log_sub_socket = new zmq::socket_t(*context, ZMQ_SUB);
log_sub_socket->setsockopt(ZMQ_RCVTIMEO, log_sub_socket_timeout);
log_sub_socket->set(zmq::sockopt::rcvtimeo, log_sub_socket_timeout);
// this socket never sends, so a send timeout is irrelevant.
// don't linger too long, it looks like the program crashed.
log_sub_socket->setsockopt(ZMQ_LINGER, 10);
log_sub_socket->setsockopt(ZMQ_SUBSCRIBE,"",0);
log_sub_socket->set(zmq::sockopt::linger, 10);
log_sub_socket->set(zmq::sockopt::subscribe, "");
// we'll connect this socket to clients with the utilities class
}

// socket to send log queries to the master, if not us
// ----------------------------------------------------
log_pub_socket = new zmq::socket_t(*context, ZMQ_PUB);
log_pub_socket->setsockopt(ZMQ_SNDTIMEO, log_pub_socket_timeout);
log_pub_socket->set(zmq::sockopt::sndtimeo, log_pub_socket_timeout);
// this socket never receives, so a recieve timeout is irrelevant.
// don't linger too long, it looks like the program crashed.
log_pub_socket->setsockopt(ZMQ_LINGER, 10);
log_pub_socket->set(zmq::sockopt::linger, 10);
// again we'll bind to this as it's a publisher
log_pub_socket->bind(std::string("tcp://*:")+std::to_string(log_pub_port));

Expand Down Expand Up @@ -1909,22 +1909,22 @@ bool ReceiveSQL::UpdateRole(){
// socket to receive logging queries for the monitoring db
// -------------------------------------------------------
log_sub_socket = new zmq::socket_t(*context, ZMQ_SUB);
log_sub_socket->setsockopt(ZMQ_RCVTIMEO, log_sub_socket_timeout);
log_sub_socket->set(zmq::sockopt::rcvtimeo, log_sub_socket_timeout);
// this socket never receives, so a recieve timeout is irrelevant.
// don't linger too long, it looks like the program crashed.
log_sub_socket->setsockopt(ZMQ_LINGER, 10);
log_sub_socket->set(zmq::sockopt::linger, 10);
// connection to clients will be made via the utitlies class
log_sub_socket->setsockopt(ZMQ_SUBSCRIBE,"",0);
log_sub_socket->set(zmq::sockopt::subscribe, "");

// socket to receive published write queries from clients
// -------------------------------------------------------
clt_sub_socket = new zmq::socket_t(*context, ZMQ_SUB);
// this socket never sends, so a send timeout is irrelevant.
clt_sub_socket->setsockopt(ZMQ_RCVTIMEO, clt_sub_socket_timeout);
clt_sub_socket->set(zmq::sockopt::rcvtimeo, clt_sub_socket_timeout);
// don't linger too long, it looks like the program crashed.
clt_sub_socket->setsockopt(ZMQ_LINGER, 10);
clt_sub_socket->set(zmq::sockopt::linger, 10);
// connections to clients will be made via the utilities class
clt_sub_socket->setsockopt(ZMQ_SUBSCRIBE,"",0);
clt_sub_socket->set(zmq::sockopt::subscribe, "");

// add the additional polls for these sockets
zmq::pollitem_t clt_sub_socket_pollin = zmq::pollitem_t{*clt_sub_socket,0,ZMQ_POLLIN,0};
Expand Down Expand Up @@ -2101,10 +2101,8 @@ bool ReceiveSQL::GetLastUpdateTime(std::string& our_timestamp){

// ««-------------- ≪ °◇◆◇° ≫ --------------»»
bool ReceiveSQL::Send(zmq::socket_t* sock, bool more, zmq::message_t& message){
bool send_ok;
if(more) send_ok = sock->send(message, ZMQ_SNDMORE);
else send_ok = sock->send(message);
return send_ok;
auto flags = more ? zmq::send_flags::sndmore : zmq::send_flags::none;
return sock->send(message, flags).has_value();
}

// ««-------------- ≪ °◇◆◇° ≫ --------------»»
Expand All @@ -2116,52 +2114,28 @@ bool ReceiveSQL::Send(zmq::socket_t* sock, bool more, std::string messagedata){
//snprintf((char*)message.data(), messagedata.size()+1, "%s", messagedata.c_str());

// send it with given SNDMORE flag
bool send_ok;
if(more) send_ok = sock->send(message, ZMQ_SNDMORE);
else send_ok = sock->send(message);

return send_ok;
return Send(sock, more, message);
}

// ««-------------- ≪ °◇◆◇° ≫ --------------»»

bool ReceiveSQL::Send(zmq::socket_t* sock, bool more, std::vector<std::string> messages){

// loop over all but the last part in the input vector,
// and send with the SNDMORE flag
for(int i=0; i<(messages.size()-1); ++i){

// form zmq::message_t
zmq::message_t message(messages.at(i).size());
memcpy(message.data(), messages.at(i).data(), messages.at(i).size());
//snprintf((char*)message.data(), messages.at(i).size()+1, "%s", messages.at(i).c_str());

// send this part
bool send_ok = sock->send(message, ZMQ_SNDMORE);

// break on error
if(not send_ok) return false;
}

// form the zmq::message_t for the last part
zmq::message_t message(messages.back().size());
memcpy(message.data(), messages.back().data(), messages.back().size());
//snprintf((char*)message.data(), messages.back().size()+1, "%s", messages.back().c_str());

// send it with, or without SNDMORE flag as requested
bool send_ok;
if(more) send_ok = sock->send(message, ZMQ_SNDMORE);
else send_ok = sock->send(message);

return send_ok;
// loop over all parts in the input vector
// send all but the last part with SNDMORE flag
// send the last part with SNDMORE flag if requested
auto rest = messages.size();
for (auto& message : messages)
if (not Send(sock, more or --rest, message))
return false;
return true;
}

// ««-------------- ≪ °◇◆◇° ≫ --------------»»

int ReceiveSQL::PollAndReceive(zmq::socket_t* sock, zmq::pollitem_t poll, int timeout, std::vector<zmq::message_t>& outputs){

// poll the input socket for messages
get_ok = zmq::poll(&poll, 1, timeout);
get_ok = zmq::poll(&poll, 1, std::chrono::milliseconds(timeout));
if(get_ok<0){
// error polling - is the socket closed?
return -3;
Expand Down Expand Up @@ -2195,14 +2169,14 @@ bool ReceiveSQL::Receive(zmq::socket_t* sock, std::vector<zmq::message_t>& outpu
bool err=false;
while(true){
//std::cout<<part<<"...";
int ok = sock->recv(&tmp);
if(ok<0){
auto ok = sock->recv(tmp);
if(not ok){
err=true;
break;
}
// transfer the received message to the output vector
outputs.resize(outputs.size()+1);
outputs.back().move(&tmp);
outputs.back().move(tmp);

// receive next part if there is more to come
if(!outputs.back().more()) break;
Expand Down
9 changes: 3 additions & 6 deletions ReceiveSQL.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,7 @@ class ReceiveSQL{
bool Send(zmq::socket_t* sock, bool more, T&& messagedata){
zmq::message_t message(sizeof(T));
memcpy(message.data(), &messagedata, sizeof(T));
bool send_ok;
if(more) send_ok = sock->send(message, ZMQ_SNDMORE);
else send_ok = sock->send(message);
return send_ok;
return Send(sock, more, message);
}

// recursive case; send the next message part and forward all remaining parts
Expand All @@ -300,7 +297,7 @@ class ReceiveSQL{
template <typename T>
int PollAndSend(zmq::socket_t* sock, zmq::pollitem_t poll, int timeout, T&& message){
// check for listener
int ret = zmq::poll(&poll, 1, timeout);
int ret = zmq::poll(&poll, 1, std::chrono::milliseconds(timeout));
if(ret<0){
// error polling - is the socket closed?
return -3;
Expand All @@ -320,7 +317,7 @@ class ReceiveSQL{
template <typename T, typename... Rest>
int PollAndSend(zmq::socket_t* sock, zmq::pollitem_t poll, int timeout, T&& message, Rest&&... rest){
// check for listener
int ret = zmq::poll(&poll, 1, timeout);
int ret = zmq::poll(&poll, 1, std::chrono::milliseconds(timeout));
if(ret<0){
// error polling - is the socket closed?
return -3;
Expand Down
Loading