diff --git a/src/final/hotflow.ini b/src/final/hotflow.ini index 095c3c5..b604470 100644 --- a/src/final/hotflow.ini +++ b/src/final/hotflow.ini @@ -1,6 +1,6 @@ [Server] Port=1234 -Protocol=TCP +Protocol=UDP [Control] # remember to change gitvc and ignition to false for coldflow diff --git a/src/final/main_network_worker.cpp b/src/final/main_network_worker.cpp index 9987f6d..2b0a191 100644 --- a/src/final/main_network_worker.cpp +++ b/src/final/main_network_worker.cpp @@ -8,57 +8,36 @@ #include #include "main_network_worker.hpp" #include "../util/logger.hpp" +#include +#include +#include +#include + bool main_network_worker::process_nqi(network_queue_item &nqi) { - char c; - work_queue_item wqi; - ssize_t read_result; // char* combined_buff = new char[1 << 12]; Logger logger("logs/nw.log", "network thread", LOG_INFO); switch (nqi.action) { case (nq_recv): { - //Poll before we read: - read_result = do_recv(connfd_tcp, &c, 1); - if (read_result <= 0) { - //FIXME, do something better? - return true; - } - //TODO just make this a process and let the logic in the worker handle it. - /* - * If we get a '0' then start processing stuff. - * If we get a '1' then stop processing stuff. - * Otherwise ignore the message. - */ - logger.info("Received command " + std::to_string((uint8_t) c)); - wqi.action = wq_process; - wqi.data[0] = c; - qw.enqueue(wqi); + std::cerr << "Processing a recv\n"; return true; } case (nq_send_ack): { // Don't actually send an ack. Just don't timeout for now. - //network_worker::send_header(ack, 0); break; } case (nq_send): { - //Poll before we read: - if (poll(&pf, 1, 0) == 0) { - if (!pf.revents & POLLOUT) { - //Cannot write. Will block. - logger.error("Socket blocked on write."); - return true; - } - } circular_buffer *buff = nqi.buff; + send_header_t header; - send_code h = (send_code) nqi.data[0]; + header.code = (send_code) nqi.data[0]; + header.nbytes = nqi.nbytes; // TODO this header should correspond to something from the nqi data. - network_worker::send_header(h, nqi.nbytes); - + // TODO error check // std::cerr << "Preparing header" << std::endl; // // send_header_t sh = (send_header_t) {h, nqi.nbytes}; @@ -75,15 +54,15 @@ bool main_network_worker::process_nqi(network_queue_item &nqi) { // std::cerr << "Copy bytes from buffer failed!" << std::endl; // } - logger.debug("Writing data: Nbytes: " + std::to_string(nqi.nbytes) + " Type: " + std::to_string(h)); - int connfd = (connfd_udp != -1) ? connfd_udp : connfd_tcp; //Use UDP if the socket is configured - // ssize_t bytes_written = 0; // if ((bytes_written = write(connfd, combined_buff, nqi.nbytes)) != nqi.nbytes) { // std:: cerr << "Incorrect number of bytes written: " << "Expected " << nqi.nbytes << ", Actual " << bytes_written << std::endl; // } - if (buff->write_data(connfd, nqi.nbytes, nqi.total_bytes) != 0) { +// sendto(connfd, "abc", 4, 0, &sa_udp, sizeof(struct sockaddr_in)); +// std::cout << "Sent on UDP" << std::endl; + + if (buff->write_data_datagram(connfd_udp, nqi.nbytes, nqi.total_bytes, &header, sizeof(send_header_t), &sa_udp) != 0) { std::cerr << "Connection Closed" << std::endl; //exit(0); } @@ -92,6 +71,7 @@ bool main_network_worker::process_nqi(network_queue_item &nqi) { } default: { + // TODO should this ever happen? return network_worker::process_nqi(nqi); } } diff --git a/src/server/network_worker.cpp b/src/server/network_worker.cpp index acfd314..f6962d3 100644 --- a/src/server/network_worker.cpp +++ b/src/server/network_worker.cpp @@ -11,37 +11,51 @@ #include void network_worker::worker_method() { - /* - std::cout << "Size of sht" << sizeof(send_header_t) - << " offset of nbytes" - << offsetof(send_header_t, nbytes) - << "sizeof send_code" << sizeof(send_code) - << std::endl; - */ - ssize_t read_result; char c; network_queue_item network_queue_item = {}; work_queue_item work_queue_item = {}; - pf.events = POLLIN | POLLOUT; + fd_set rfds, wfds; - while (1) { - //std::cout << "Networker entering loop:\n"; - network_queue_item = qn.poll(); - //std::cout << "Networker got item:\n"; + // Initialize read select + FD_ZERO(&rfds); + FD_SET(connfd_tcp, &rfds); - if (!process_nqi(network_queue_item)) { - std::cerr << "Could not process request on network thread: " << network_queue_item.action << std::endl; - } - } -} + // Initialize write select + FD_ZERO(&wfds); + FD_SET(connfd_udp, &wfds); -bool network_worker::process_nqi(network_queue_item &network_queue_item) { if (!connected) { std::cout << "Attempting to connect" << std::endl; open_connection(); } + + while (true) { + FD_SET(connfd_tcp, &rfds); + FD_SET(connfd_udp, &wfds); + + select(connfd_tcp + 1, &rfds, &wfds, nullptr, nullptr); + + if (FD_ISSET(connfd_tcp, &rfds)) { + do_recv(connfd_tcp, &c, 1); + + work_queue_item.action = wq_process; + work_queue_item.data[0] = c; + qw.enqueue(work_queue_item); + } + + if (FD_ISSET(connfd_udp, &wfds)) { + network_queue_item = qn.poll(); + if (!process_nqi(network_queue_item)) { + std::cerr << "Could not process request on network thread: " << network_queue_item.action << std::endl; + } + } + } +} + +bool network_worker::process_nqi(network_queue_item &network_queue_item) { switch (network_queue_item.action) { + // TODO are we still using this? case (nq_none): { timestamp_t t = get_time(); // First check if we are close to timing out: @@ -49,7 +63,7 @@ bool network_worker::process_nqi(network_queue_item &network_queue_item) { std::cerr << "Connection timed out." << std::endl; fail_connection(); return true; - } else if (timeout > 0 && !has_acked && t - last_recv > timeout / 2) { + } else if (timeout > 0 && t - last_recv > timeout / 2) { //TODO maybe add this as debug option. std::cerr << "Connection inactive. Sending ack." << std::endl; network_queue_item.action = nq_send_ack; @@ -57,10 +71,6 @@ bool network_worker::process_nqi(network_queue_item &network_queue_item) { return true; } - //is this messing with an object we don't own? Doesn't seem to be. - network_queue_item.action = nq_recv; - qn.enqueue(network_queue_item); //Just always be reading because otherwise we're screwed. - // FIXME need to do some checking to make sure this happens frequently. return true; } default: { @@ -72,35 +82,16 @@ bool network_worker::process_nqi(network_queue_item &network_queue_item) { ssize_t network_worker::do_recv(int fd, char *b, size_t nbytes) { ssize_t read_result; - //Poll before we read: - if (poll(&pf, 1, 0) >= 0) { - if (!(pf.revents & POLLIN)) { - // Nothing to read. - //std::cerr << "Socket blocked on read" << std::endl; - //Go back to looping. - return -2; - } - } else { - std::cerr << "Poll failed" << std::endl; - exit(1); - } read_result = read(fd, b, nbytes); if (read_result > 0) { - /* - * Update our timing on when we last received. - */ - timestamp_t trecv = get_time(); - this->last_recv = trecv; - this->has_acked = false; // No longer need to ack, so ignore these. + /* Update our timing on when we last received. */ + this->last_recv = get_time(); } else if (read_result == 0) { - /* - * Check if the file descriptor has closed: - */ + /* If we read 0 then the fd is closed */ std::cerr << "Read nothing from socket. Assuming it is dead." << std::endl; fail_connection(); } - //std::cout << "Read " << read_result << " bytes from socket." << std::endl; return read_result; } @@ -111,63 +102,42 @@ void network_worker::fail_connection() { } connfd_tcp = -1; connfd_udp = -1; - pf.fd = -1; connected = false; + + std::cout << "Attempting to reconnect" << std::endl; + open_connection(); } void network_worker::open_connection() { - struct sockaddr_in sa; - connfd_tcp = wait_for_connection(port, (struct sockaddr *) &sa); + connfd_tcp = wait_for_connection(port, &sa_tcp); if (connfd_tcp < 0) std::cerr << "Could not open TCP connection fd." << std::endl; - if (config_map["Server.Protocol"].as() == "UDP") { - connfd_udp = socket(AF_INET, SOCK_DGRAM, 0); - sa.sin_port = htons(port); //Destination UDP port is same as listening TCP port - if (connect(connfd_udp,(struct sockaddr *) &sa,sizeof(sa))) { - connfd_udp = -1; - std::cerr << "Could not open UDP connection fd." << std::endl; - } - std::cout << "Successfully opened UDP socket on " << connfd_udp << "." << std::endl; - } - - /* - * Stuff used to poll the socket: - */ - pf.fd = connfd_tcp; +// if (config_map["Server.Protocol"].as() == "UDP") { + connfd_udp = create_send_fd(port, (sockaddr_in *) &sa_udp); +// if (connect(connfd_udp,(struct sockaddr *) &sa_udp, sizeof(sa_udp)) == -1) { +// connfd_udp = -1; +// std::cerr << "Could not open UDP connection fd." << std::endl; +// } +// } - /* - * Update the last received and mark that we are connected: - */ + /* Update the last received and mark that we are connected */ connected = true; - has_acked = false; last_recv = get_time(); } -ssize_t network_worker::send_header(send_code h, size_t nbytes) { +bool network_worker::send_header(send_code h, size_t nbytes) { send_header_t sh; sh.code = h; sh.nbytes = nbytes; - // If the UDP Socket is setup, use it. - if (connfd_udp != -1) { - ssize_t result = write(connfd_udp, &sh, sizeof(sh)); - } else { - ssize_t result = write(connfd_tcp, &sh, sizeof(sh)); - } -} - -ssize_t rwrite(int fd, void *b, size_t n) { - if (n == 0) { - return 0; - } - ssize_t result = write(fd, b, n); + ssize_t result = write(connfd_udp, &sh, sizeof(sh)); - if (result <= 0) { - std::cerr << "Error while writing?" << std::endl; - perror("help me please"); + if (result != nbytes) { + std::cerr << "ERROR: Wrote " << result << " out of " << nbytes << " bytes\n"; + return false; } - return result + rwrite(fd, ((char *)b ) + result, n - result); -} + return true; +} \ No newline at end of file diff --git a/src/server/network_worker.hpp b/src/server/network_worker.hpp index ddd1d64..e3dfc5e 100644 --- a/src/server/network_worker.hpp +++ b/src/server/network_worker.hpp @@ -16,6 +16,10 @@ #include #include "../util/circular_buffer.hpp" #include "../util/timestamps.hpp" +#include +#include +#include +#include class network_worker : public worker { public: @@ -23,6 +27,7 @@ class network_worker : public worker { int connfd_tcp; int connfd_udp; bool connected; + struct sockaddr sa_udp, sa_tcp; /** * The number of microseconds where no receive has occured before the worker should treat the peer @@ -44,11 +49,14 @@ class network_worker : public worker { (safe_queue &my_qn, safe_queue &my_qw, int port, timestamp_t timeout) : worker(my_qn, my_qw) + , connected(0) , port(port) , last_recv(0) , timeout(timeout) , connfd_tcp(-1) , connfd_udp(-1) + , sa_udp({}) + , sa_tcp({}) { last_recv = get_time() - timeout; // Set the last time such that we are immediately timed out. }; @@ -76,11 +84,7 @@ class network_worker : public worker { */ void open_connection(); - send_header_t* prepare_header(send_code h, size_t nbytes); - - ssize_t send_header(send_code h, size_t nbytes); - - ssize_t rwrite(int fd, void *b, size_t n); + bool send_header(send_code h, size_t nbytes); protected: /** @@ -91,9 +95,6 @@ class network_worker : public worker { * @return Whatever read returned. -2 if poll succeeded but there was nothing to receive. -3 if poll failed. */ ssize_t do_recv(int fd, char *b, size_t nbytes); - pollfd pf; - - bool has_acked; }; diff --git a/src/util/circular_buffer.cpp b/src/util/circular_buffer.cpp index dd84a53..7e2fdfb 100644 --- a/src/util/circular_buffer.cpp +++ b/src/util/circular_buffer.cpp @@ -5,14 +5,21 @@ #include "unistd.h" #include #include +#include #include "circular_buffer.hpp" +#include +#include +#include +#include //#define DEBUG_CIRC_SEND -circular_buffer::circular_buffer(size_t size) : bytes_written(0) { +circular_buffer::circular_buffer(size_t size) + : bytes_written(0) + { this->nbytes = size; this->data = new char[size]; -} + } circular_buffer::~circular_buffer() { delete [] this->data; @@ -95,6 +102,8 @@ ssize_t circular_buffer::write_data(int fd, size_t n, size_t offset) { #endif ssize_t result; result = write(fd, data + offset % this->nbytes, to_send); + // result = sendto(fd, data + offset % this->nbytes, to_send, 0, destination, sizeof(struct sockaddr_in)); + if (result <= 0) { perror("help?"); std::cerr << "Error while writing." << std::endl; @@ -106,6 +115,67 @@ ssize_t circular_buffer::write_data(int fd, size_t n, size_t offset) { } } +ssize_t circular_buffer::write_data_datagram(int fd, size_t n, size_t offset, void *header, size_t header_size, sockaddr *destination) { + size_t bw = bytes_written.load(); + char *send_buff = new char[header_size + n]; + char *pos = send_buff; + + /* + offset is smallest number we're trying to write. + bw - nbytes is the smallest number that hasn't been overwritten. + We want to ensure that we aren't trying to write data that has been overwritten. + */ + if ((long) offset < (long) (bw - this->nbytes)) { + std::cerr << "Bytes already overwritten before sending: Num_to_write:" << n + << " Offset:" << offset + << " Total bytes written into buffer" << bw + << " Total buffer size: " << nbytes << std::endl; + //return -1; //TODO don't collide with write error? + } + + // Copy in the header at the beginning + memcpy(pos, header, header_size); + pos += header_size; + + size_t to_send = nbytes - offset % this->nbytes; // The distance between start and the end of the buffer. + to_send = to_send > n ? n : to_send; //Take the minimum value of n and to_send. + + // Copy in the data + memcpy(pos, data + offset % this->nbytes, to_send); + pos += offset % this->nbytes; + + if (n > to_send) { + memcpy(pos, data + (offset + to_send) % this->nbytes, n - to_send); + } + +#ifdef DEBUG_CIRC_SEND + if (to_send > 4 * 16) { + size_t temp_offset = offset % this->nbytes; + + fprintf(stdout, "Offset %#u \n", (uint16_t) temp_offset); + fprintf(stdout, "Sending Bytes: %X %lld %X %lld %X %lld %X %lld \n", + *((uint16_t *)(data + temp_offset)), *((uint64_t *)(data + temp_offset + 8)), + *((uint16_t *)(data + temp_offset + 16)), *((uint64_t *)(data + temp_offset + 24)), + *((uint16_t *)(data + temp_offset + 32)), *((uint64_t *)(data + temp_offset + 40)), + *((uint16_t *)(data + temp_offset + 48)), *((uint64_t *)(data + temp_offset + 56))); + } +#endif + ssize_t result; + // result = write(fd, data + offset % this->nbytes, to_send); + result = sendto(fd, send_buff, to_send, 0, destination, sizeof(struct sockaddr_in)); + + if (result <= 0) { + perror("help?"); + std::cerr << "Error while writing." << std::endl; + return result; + } else if (result != n) { + std::cerr << "Error while writing. Wrote " << result << " bytes, expecting " << n << " bytes total\n"; + return -1; + } + + return 0; +} + void circular_buffer::zero() { bzero(data, nbytes); bytes_written = 0; diff --git a/src/util/circular_buffer.hpp b/src/util/circular_buffer.hpp index 0af035e..88b8265 100644 --- a/src/util/circular_buffer.hpp +++ b/src/util/circular_buffer.hpp @@ -54,6 +54,9 @@ class circular_buffer { */ ssize_t write_data(int fd, size_t n, size_t offset); + // TODO void * should be send_header_t*. Not for now due to circular include. + ssize_t write_data_datagram(int fd, size_t n, size_t offset, void *header, size_t header_size, struct sockaddr *destination); + void zero(); }; diff --git a/src/util/listener.cpp b/src/util/listener.cpp index cc9d9a1..32db7ac 100644 --- a/src/util/listener.cpp +++ b/src/util/listener.cpp @@ -4,6 +4,7 @@ #include #include +#include #include "listener.hpp" /* @@ -17,20 +18,15 @@ int open_listen(int port) { - int listenfd, optval=1; - struct sockaddr_in serveraddr; + int listenfd; + struct sockaddr_in serveraddr = {}; /* Set "listenfd" to a newly created stream socket */ - if ((listenfd = socket(PF_INET, SOCK_STREAM, 0)) == -1) { + if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { + perror("TCP Socket Creation Error"); return (-1); } - //Not needed. - /* Eliminates "Address already in use" error from bind. */ - if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, - (const void *)&optval , sizeof(int)) < 0) - return (-1); - /* * Set the IP address of serveraddr to be the special ANY IP address * and set port to be the input port. Be careful to ensure that the @@ -39,19 +35,19 @@ open_listen(int port) bzero((char *) &serveraddr, sizeof(serveraddr)); serveraddr.sin_family = AF_INET; serveraddr.sin_port = htons((uint16_t) port); - //Raspberry Pi will only have one interface, just use ANY. serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); /* Use bind to set the address of "listenfd" to be serveraddr */ if (bind(listenfd, (struct sockaddr *) &serveraddr, sizeof(struct sockaddr)) == -1) { + perror("TCP Socket Bind Error"); return (-1); } /* Use listen to make the socket ready to accept connection requests */ if (listen(listenfd, LISTEN_MAX) == -1) { + perror("TCP Socket Listen Error"); return (-1); - } #ifdef DEBUG_LISTENER @@ -61,30 +57,55 @@ open_listen(int port) } int wait_for_connection(int port, sockaddr *sa) { - socklen_t clientlen; + socklen_t clientlen = sizeof(struct sockaddr_in); int listenfd, connfd; - /* Acquire a port, and make sure we got it successfully. */ + + /* Create a TCP port, and check that it is valid. */ listenfd = open_listen(port); if (listenfd < 0) { - //TODO I think the best option is to try opening a new port if this fails. - //Alternately closing the listenfd is a useful fix. + // TODO have a procedure (on a new thread) to create a new port and listen again fprintf(stderr, "Failed to open listener on %d\n", port); exit(1); } - clientlen = sizeof(struct sockaddr_in); - /* - * Use accept to set the connection file descriptor of the - * request + * Accept a new connection and return the file descriptor of + * the client. */ - connfd = accept(listenfd, sa, - &clientlen); + if((connfd = accept(listenfd, sa, &clientlen)) == -1) { + perror("TCP Client Accept Error"); + return (-1); + }; #ifdef DEBUG_LISTENER fprintf(stderr, "Received request on connfd on %d\n", connfd); #endif /*DEBUG_LISTENER*/ + // Close this since we only care about what the client sends to us. close(listenfd); return connfd; } + +int +create_send_fd(int port, sockaddr_in *sa) { + int udp_server_fd; + + // Create a new UDP socket + if((udp_server_fd = socket(AF_INET, SOCK_DGRAM, 0)) == -1) + { + perror("UDP Socket Creation Error"); + exit(-1); + } + + // These settings must match those on the mission control end + sa->sin_family = AF_INET; + sa->sin_port = htons((uint16_t) port); + sa->sin_addr.s_addr = htonl(INADDR_ANY); + bzero(sa->sin_zero, 8); + + #ifdef DEBUG_LISTENER + fprintf(stderr, "Sending on fd %d\n", udp_server_fd); + #endif /*DEBUG_LISTENER*/ + + return udp_server_fd; +} diff --git a/src/util/listener.hpp b/src/util/listener.hpp index 7002620..476552a 100644 --- a/src/util/listener.hpp +++ b/src/util/listener.hpp @@ -22,7 +22,7 @@ * * Effects: * Listens on the port specified and blocks until a connection is recieved - * and then returns the corresponding fildes for the connection. + * and then returns the corresponding file descriptors for the connection. * TODO not sure what is being done with "sa" */ int wait_for_connection(int port, sockaddr *sa); @@ -37,4 +37,7 @@ int wait_for_connection(int port, sockaddr *sa); */ int open_listen(int port); +// TODO +int create_send_fd(int port, sockaddr_in *sa); + #endif //SOFTWARE_LISTENER_HPP