Skip to content
This repository was archived by the owner on Feb 21, 2020. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/final/hotflow.ini
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[Server]
Port=1234
Protocol=TCP
Protocol=UDP

[Control]
# remember to change gitvc and ignition to false for coldflow
Expand Down
50 changes: 15 additions & 35 deletions src/final/main_network_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,57 +8,36 @@
#include <unistd.h>
#include "main_network_worker.hpp"
#include "../util/logger.hpp"
#include <arpa/inet.h>
#include<netinet/in.h>
#include<sys/socket.h>
#include<sys/types.h>


bool main_network_worker::process_nqi(network_queue_item &nqi) {
char c;
work_queue_item wqi;
ssize_t read_result;
// char* combined_buff = new char[1 << 12];

Logger logger("logs/nw.log", "network thread", LOG_INFO);

switch (nqi.action) {
case (nq_recv): {
//Poll before we read:
read_result = do_recv(connfd_tcp, &c, 1);
if (read_result <= 0) {
//FIXME, do something better?
return true;
}
//TODO just make this a process and let the logic in the worker handle it.
/*
* If we get a '0' then start processing stuff.
* If we get a '1' then stop processing stuff.
* Otherwise ignore the message.
*/
logger.info("Received command " + std::to_string((uint8_t) c));
wqi.action = wq_process;
wqi.data[0] = c;
qw.enqueue(wqi);
std::cerr << "Processing a recv\n";
return true;
}
case (nq_send_ack): {
// Don't actually send an ack. Just don't timeout for now.
//network_worker::send_header(ack, 0);
break;
}
case (nq_send): {
//Poll before we read:
if (poll(&pf, 1, 0) == 0) {
if (!pf.revents & POLLOUT) {
//Cannot write. Will block.
logger.error("Socket blocked on write.");
return true;
}
}
circular_buffer *buff = nqi.buff;
send_header_t header;

send_code h = (send_code) nqi.data[0];
header.code = (send_code) nqi.data[0];
header.nbytes = nqi.nbytes;

// TODO this header should correspond to something from the nqi data.

network_worker::send_header(h, nqi.nbytes);

// TODO error check
// std::cerr << "Preparing header" << std::endl;
//
// send_header_t sh = (send_header_t) {h, nqi.nbytes};
Expand All @@ -75,15 +54,15 @@ bool main_network_worker::process_nqi(network_queue_item &nqi) {
// std::cerr << "Copy bytes from buffer failed!" << std::endl;
// }

logger.debug("Writing data: Nbytes: " + std::to_string(nqi.nbytes) + " Type: " + std::to_string(h));
int connfd = (connfd_udp != -1) ? connfd_udp : connfd_tcp; //Use UDP if the socket is configured

// ssize_t bytes_written = 0;
// if ((bytes_written = write(connfd, combined_buff, nqi.nbytes)) != nqi.nbytes) {
// std:: cerr << "Incorrect number of bytes written: " << "Expected " << nqi.nbytes << ", Actual " << bytes_written << std::endl;
// }

if (buff->write_data(connfd, nqi.nbytes, nqi.total_bytes) != 0) {
// sendto(connfd, "abc", 4, 0, &sa_udp, sizeof(struct sockaddr_in));
// std::cout << "Sent on UDP" << std::endl;

if (buff->write_data_datagram(connfd_udp, nqi.nbytes, nqi.total_bytes, &header, sizeof(send_header_t), &sa_udp) != 0) {
std::cerr << "Connection Closed" << std::endl;
//exit(0);
}
Expand All @@ -92,6 +71,7 @@ bool main_network_worker::process_nqi(network_queue_item &nqi) {
}

default: {
// TODO should this ever happen?
return network_worker::process_nqi(nqi);
}
}
Expand Down
142 changes: 56 additions & 86 deletions src/server/network_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,56 +11,66 @@
#include <arpa/inet.h>

void network_worker::worker_method() {
/*
std::cout << "Size of sht" << sizeof(send_header_t)
<< " offset of nbytes"
<< offsetof(send_header_t, nbytes)
<< "sizeof send_code" << sizeof(send_code)
<< std::endl;
*/
ssize_t read_result;
char c;
network_queue_item network_queue_item = {};
work_queue_item work_queue_item = {};

pf.events = POLLIN | POLLOUT;
fd_set rfds, wfds;

while (1) {
//std::cout << "Networker entering loop:\n";
network_queue_item = qn.poll();
//std::cout << "Networker got item:\n";
// Initialize read select
FD_ZERO(&rfds);
FD_SET(connfd_tcp, &rfds);

if (!process_nqi(network_queue_item)) {
std::cerr << "Could not process request on network thread: " << network_queue_item.action << std::endl;
}
}
}
// Initialize write select
FD_ZERO(&wfds);
FD_SET(connfd_udp, &wfds);

bool network_worker::process_nqi(network_queue_item &network_queue_item) {
if (!connected) {
std::cout << "Attempting to connect" << std::endl;
open_connection();
}

while (true) {
FD_SET(connfd_tcp, &rfds);
FD_SET(connfd_udp, &wfds);

select(connfd_tcp + 1, &rfds, &wfds, nullptr, nullptr);

if (FD_ISSET(connfd_tcp, &rfds)) {
do_recv(connfd_tcp, &c, 1);

work_queue_item.action = wq_process;
work_queue_item.data[0] = c;
qw.enqueue(work_queue_item);
}

if (FD_ISSET(connfd_udp, &wfds)) {
network_queue_item = qn.poll();
if (!process_nqi(network_queue_item)) {
std::cerr << "Could not process request on network thread: " << network_queue_item.action << std::endl;
}
}
}
}

bool network_worker::process_nqi(network_queue_item &network_queue_item) {
switch (network_queue_item.action) {
// TODO are we still using this?
case (nq_none): {
timestamp_t t = get_time();
// First check if we are close to timing out:
if (timeout > 0 && t - last_recv > timeout) {
std::cerr << "Connection timed out." << std::endl;
fail_connection();
return true;
} else if (timeout > 0 && !has_acked && t - last_recv > timeout / 2) {
} else if (timeout > 0 && t - last_recv > timeout / 2) {
//TODO maybe add this as debug option.
std::cerr << "Connection inactive. Sending ack." << std::endl;
network_queue_item.action = nq_send_ack;
qn.enqueue(network_queue_item);
return true;
}

//is this messing with an object we don't own? Doesn't seem to be.
network_queue_item.action = nq_recv;
qn.enqueue(network_queue_item); //Just always be reading because otherwise we're screwed.
// FIXME need to do some checking to make sure this happens frequently.
return true;
}
default: {
Expand All @@ -72,35 +82,16 @@ bool network_worker::process_nqi(network_queue_item &network_queue_item) {
ssize_t network_worker::do_recv(int fd, char *b, size_t nbytes) {
ssize_t read_result;

//Poll before we read:
if (poll(&pf, 1, 0) >= 0) {
if (!(pf.revents & POLLIN)) {
// Nothing to read.
//std::cerr << "Socket blocked on read" << std::endl;
//Go back to looping.
return -2;
}
} else {
std::cerr << "Poll failed" << std::endl;
exit(1);
}
read_result = read(fd, b, nbytes);
if (read_result > 0) {
/*
* Update our timing on when we last received.
*/
timestamp_t trecv = get_time();
this->last_recv = trecv;
this->has_acked = false; // No longer need to ack, so ignore these.
/* Update our timing on when we last received. */
this->last_recv = get_time();
} else if (read_result == 0) {
/*
* Check if the file descriptor has closed:
*/
/* If we read 0 then the fd is closed */
std::cerr << "Read nothing from socket. Assuming it is dead." << std::endl;
fail_connection();
}

//std::cout << "Read " << read_result << " bytes from socket." << std::endl;
return read_result;
}

Expand All @@ -111,63 +102,42 @@ void network_worker::fail_connection() {
}
connfd_tcp = -1;
connfd_udp = -1;
pf.fd = -1;

connected = false;

std::cout << "Attempting to reconnect" << std::endl;
open_connection();
}

void network_worker::open_connection() {
struct sockaddr_in sa;
connfd_tcp = wait_for_connection(port, (struct sockaddr *) &sa);
connfd_tcp = wait_for_connection(port, &sa_tcp);
if (connfd_tcp < 0)
std::cerr << "Could not open TCP connection fd." << std::endl;

if (config_map["Server.Protocol"].as<std::string>() == "UDP") {
connfd_udp = socket(AF_INET, SOCK_DGRAM, 0);
sa.sin_port = htons(port); //Destination UDP port is same as listening TCP port
if (connect(connfd_udp,(struct sockaddr *) &sa,sizeof(sa))) {
connfd_udp = -1;
std::cerr << "Could not open UDP connection fd." << std::endl;
}
std::cout << "Successfully opened UDP socket on " << connfd_udp << "." << std::endl;
}

/*
* Stuff used to poll the socket:
*/
pf.fd = connfd_tcp;
// if (config_map["Server.Protocol"].as<std::string>() == "UDP") {
connfd_udp = create_send_fd(port, (sockaddr_in *) &sa_udp);
// if (connect(connfd_udp,(struct sockaddr *) &sa_udp, sizeof(sa_udp)) == -1) {
// connfd_udp = -1;
// std::cerr << "Could not open UDP connection fd." << std::endl;
// }
// }

/*
* Update the last received and mark that we are connected:
*/
/* Update the last received and mark that we are connected */
connected = true;
has_acked = false;
last_recv = get_time();
}

ssize_t network_worker::send_header(send_code h, size_t nbytes) {
bool network_worker::send_header(send_code h, size_t nbytes) {
send_header_t sh;
sh.code = h;
sh.nbytes = nbytes;

// If the UDP Socket is setup, use it.
if (connfd_udp != -1) {
ssize_t result = write(connfd_udp, &sh, sizeof(sh));
} else {
ssize_t result = write(connfd_tcp, &sh, sizeof(sh));
}
}

ssize_t rwrite(int fd, void *b, size_t n) {
if (n == 0) {
return 0;
}
ssize_t result = write(fd, b, n);
ssize_t result = write(connfd_udp, &sh, sizeof(sh));

if (result <= 0) {
std::cerr << "Error while writing?" << std::endl;
perror("help me please");
if (result != nbytes) {
std::cerr << "ERROR: Wrote " << result << " out of " << nbytes << " bytes\n";
return false;
}

return result + rwrite(fd, ((char *)b ) + result, n - result);
}
return true;
}
17 changes: 9 additions & 8 deletions src/server/network_worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@
#include <poll.h>
#include "../util/circular_buffer.hpp"
#include "../util/timestamps.hpp"
#include <arpa/inet.h>
#include<netinet/in.h>
#include<sys/socket.h>
#include<sys/types.h>

class network_worker : public worker {
public:
int port;
int connfd_tcp;
int connfd_udp;
bool connected;
struct sockaddr sa_udp, sa_tcp;

/**
* The number of microseconds where no receive has occured before the worker should treat the peer
Expand All @@ -44,11 +49,14 @@ class network_worker : public worker {
(safe_queue<network_queue_item> &my_qn, safe_queue<work_queue_item> &my_qw, int port,
timestamp_t timeout)
: worker(my_qn, my_qw)
, connected(0)
, port(port)
, last_recv(0)
, timeout(timeout)
, connfd_tcp(-1)
, connfd_udp(-1)
, sa_udp({})
, sa_tcp({})
{
last_recv = get_time() - timeout; // Set the last time such that we are immediately timed out.
};
Expand Down Expand Up @@ -76,11 +84,7 @@ class network_worker : public worker {
*/
void open_connection();

send_header_t* prepare_header(send_code h, size_t nbytes);

ssize_t send_header(send_code h, size_t nbytes);

ssize_t rwrite(int fd, void *b, size_t n);
bool send_header(send_code h, size_t nbytes);

protected:
/**
Expand All @@ -91,9 +95,6 @@ class network_worker : public worker {
* @return Whatever read returned. -2 if poll succeeded but there was nothing to receive. -3 if poll failed.
*/
ssize_t do_recv(int fd, char *b, size_t nbytes);
pollfd pf;

bool has_acked;
};


Expand Down
Loading