From 88990ea0d8bdcd98ade39fa506c7deab1c1e7a05 Mon Sep 17 00:00:00 2001 From: Tommy Yuan Date: Fri, 25 Jan 2019 22:10:57 -0600 Subject: [PATCH 1/4] Preliminary changes to add udp socket --- src/final/main_network_worker.cpp | 1 + src/server/network_worker.cpp | 49 ++++------------------ src/server/network_worker.hpp | 4 -- src/util/listener.cpp | 70 +++++++++++++++++++++---------- src/util/listener.hpp | 5 ++- 5 files changed, 62 insertions(+), 67 deletions(-) diff --git a/src/final/main_network_worker.cpp b/src/final/main_network_worker.cpp index 9987f6d..9b8d38a 100644 --- a/src/final/main_network_worker.cpp +++ b/src/final/main_network_worker.cpp @@ -92,6 +92,7 @@ bool main_network_worker::process_nqi(network_queue_item &nqi) { } default: { + // TODO should this ever happen? return network_worker::process_nqi(nqi); } } diff --git a/src/server/network_worker.cpp b/src/server/network_worker.cpp index acfd314..bba23d6 100644 --- a/src/server/network_worker.cpp +++ b/src/server/network_worker.cpp @@ -11,20 +11,11 @@ #include void network_worker::worker_method() { - /* - std::cout << "Size of sht" << sizeof(send_header_t) - << " offset of nbytes" - << offsetof(send_header_t, nbytes) - << "sizeof send_code" << sizeof(send_code) - << std::endl; - */ ssize_t read_result; char c; network_queue_item network_queue_item = {}; work_queue_item work_queue_item = {}; - pf.events = POLLIN | POLLOUT; - while (1) { //std::cout << "Networker entering loop:\n"; network_queue_item = qn.poll(); @@ -91,7 +82,6 @@ ssize_t network_worker::do_recv(int fd, char *b, size_t nbytes) { */ timestamp_t trecv = get_time(); this->last_recv = trecv; - this->has_acked = false; // No longer need to ack, so ignore these. } else if (read_result == 0) { /* * Check if the file descriptor has closed: @@ -111,37 +101,30 @@ void network_worker::fail_connection() { } connfd_tcp = -1; connfd_udp = -1; - pf.fd = -1; connected = false; } void network_worker::open_connection() { - struct sockaddr_in sa; - connfd_tcp = wait_for_connection(port, (struct sockaddr *) &sa); + struct sockaddr_in sa_udp = {}, sa_tcp = {}; + + connfd_tcp = wait_for_connection(port, (struct sockaddr *) &sa_tcp); if (connfd_tcp < 0) std::cerr << "Could not open TCP connection fd." << std::endl; if (config_map["Server.Protocol"].as() == "UDP") { - connfd_udp = socket(AF_INET, SOCK_DGRAM, 0); - sa.sin_port = htons(port); //Destination UDP port is same as listening TCP port - if (connect(connfd_udp,(struct sockaddr *) &sa,sizeof(sa))) { - connfd_udp = -1; - std::cerr << "Could not open UDP connection fd." << std::endl; - } + connfd_udp = create_send_fd(port, (struct sockaddr*) &sa_udp); +// if (connect(connfd_udp,(struct sockaddr *) &sa,sizeof(sa))) { +// connfd_udp = -1; +// std::cerr << "Could not open UDP connection fd." << std::endl; +// } std::cout << "Successfully opened UDP socket on " << connfd_udp << "." << std::endl; } - /* - * Stuff used to poll the socket: - */ - pf.fd = connfd_tcp; - /* * Update the last received and mark that we are connected: */ connected = true; - has_acked = false; last_recv = get_time(); } @@ -156,18 +139,4 @@ ssize_t network_worker::send_header(send_code h, size_t nbytes) { } else { ssize_t result = write(connfd_tcp, &sh, sizeof(sh)); } -} - -ssize_t rwrite(int fd, void *b, size_t n) { - if (n == 0) { - return 0; - } - ssize_t result = write(fd, b, n); - - if (result <= 0) { - std::cerr << "Error while writing?" << std::endl; - perror("help me please"); - } - - return result + rwrite(fd, ((char *)b ) + result, n - result); -} +} \ No newline at end of file diff --git a/src/server/network_worker.hpp b/src/server/network_worker.hpp index ddd1d64..be53835 100644 --- a/src/server/network_worker.hpp +++ b/src/server/network_worker.hpp @@ -76,12 +76,8 @@ class network_worker : public worker { */ void open_connection(); - send_header_t* prepare_header(send_code h, size_t nbytes); - ssize_t send_header(send_code h, size_t nbytes); - ssize_t rwrite(int fd, void *b, size_t n); - protected: /** * A simple wrapper over read that handles errors and updates the timeout information. diff --git a/src/util/listener.cpp b/src/util/listener.cpp index cc9d9a1..bff43f3 100644 --- a/src/util/listener.cpp +++ b/src/util/listener.cpp @@ -17,20 +17,15 @@ int open_listen(int port) { - int listenfd, optval=1; - struct sockaddr_in serveraddr; + int listenfd, optval = 1; + struct sockaddr_in serveraddr = {}; /* Set "listenfd" to a newly created stream socket */ - if ((listenfd = socket(PF_INET, SOCK_STREAM, 0)) == -1) { + if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { + perror("TCP Socket Creation Error"); return (-1); } - //Not needed. - /* Eliminates "Address already in use" error from bind. */ - if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, - (const void *)&optval , sizeof(int)) < 0) - return (-1); - /* * Set the IP address of serveraddr to be the special ANY IP address * and set port to be the input port. Be careful to ensure that the @@ -39,19 +34,19 @@ open_listen(int port) bzero((char *) &serveraddr, sizeof(serveraddr)); serveraddr.sin_family = AF_INET; serveraddr.sin_port = htons((uint16_t) port); - //Raspberry Pi will only have one interface, just use ANY. serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); /* Use bind to set the address of "listenfd" to be serveraddr */ if (bind(listenfd, (struct sockaddr *) &serveraddr, sizeof(struct sockaddr)) == -1) { + perror("TCP Socket Bind Error"); return (-1); } /* Use listen to make the socket ready to accept connection requests */ if (listen(listenfd, LISTEN_MAX) == -1) { + perror("TCP Socket Listen Error"); return (-1); - } #ifdef DEBUG_LISTENER @@ -61,30 +56,61 @@ open_listen(int port) } int wait_for_connection(int port, sockaddr *sa) { - socklen_t clientlen; + socklen_t clientlen = sizeof(struct sockaddr_in); int listenfd, connfd; - /* Acquire a port, and make sure we got it successfully. */ + + /* Create a TCP port, and check that it is valid. */ listenfd = open_listen(port); if (listenfd < 0) { - //TODO I think the best option is to try opening a new port if this fails. - //Alternately closing the listenfd is a useful fix. + // TODO have a procedure (hopefully on a new thread) to create a new port and + // start listening again fprintf(stderr, "Failed to open listener on %d\n", port); exit(1); } - clientlen = sizeof(struct sockaddr_in); - /* - * Use accept to set the connection file descriptor of the - * request + * Accept a new connection and return the file descriptor of + * the client, with the appropriate address already set. */ - connfd = accept(listenfd, sa, - &clientlen); + if((connfd = accept(listenfd, sa, &clientlen)) == -1) { + perror("TCP Client Accept Error"); + return (-1); + }; #ifdef DEBUG_LISTENER fprintf(stderr, "Received request on connfd on %d\n", connfd); #endif /*DEBUG_LISTENER*/ - close(listenfd); + // close(listenfd); return connfd; } + +int +create_send_fd(int port, sockaddr *sa) { + int udp_server_fd; + socklen_t sockaddr_len = sizeof(sockaddr_in); + struct sockaddr_in udp_server = {}; + + if((udp_server_fd = socket(AF_INET, SOCK_DGRAM, 0)) == -1) + { + perror("UDP Socket Creation Error"); + exit(-1); + } + + udp_server.sin_family = AF_INET; + udp_server.sin_port = htons((uint16_t) port); + udp_server.sin_addr.s_addr = INADDR_ANY; + bzero(udp_server.sin_zero, 8); + +// if(bind(udp_server_fd, (struct sockaddr *)&udp_server, sockaddr_len) == -1) +// { +// perror("bind"); +// exit(-1); +// } + + #ifdef DEBUG_LISTENER + fprintf(stderr, "Sending on fd %d\n", udp_server_fd); + #endif /*DEBUG_LISTENER*/ + + return udp_server_fd; +} diff --git a/src/util/listener.hpp b/src/util/listener.hpp index 7002620..76a9e32 100644 --- a/src/util/listener.hpp +++ b/src/util/listener.hpp @@ -22,7 +22,7 @@ * * Effects: * Listens on the port specified and blocks until a connection is recieved - * and then returns the corresponding fildes for the connection. + * and then returns the corresponding file descriptors for the connection. * TODO not sure what is being done with "sa" */ int wait_for_connection(int port, sockaddr *sa); @@ -37,4 +37,7 @@ int wait_for_connection(int port, sockaddr *sa); */ int open_listen(int port); +// TODO +int create_send_fd(int port, sockaddr *sa); + #endif //SOFTWARE_LISTENER_HPP From ce5b7558f587e30438ad4ed80ae62c22fd6d702f Mon Sep 17 00:00:00 2001 From: Tommy Yuan Date: Sat, 26 Jan 2019 13:29:02 -0600 Subject: [PATCH 2/4] Add select. Tested with python code. UDP address is currently INADDR_ANY = 0.0.0.0. Sends "abc" over UDP. Old code had network_worker::worker_method() continually poll the network queue to either send, recv, or queue another recv. New code uses select, which replaces all recv network queue items since we just receive when the fd has data. Sending checks for an open fd first then polls the network queue to see if we need to send. Now that recv is removed, send queue items *should* be the only thing we need in the network queue. Removed some references to poll and acks in networking. TODO modify circular_buffer to send data in the proper format. --- src/final/hotflow.ini | 2 +- src/final/main_network_worker.cpp | 63 ++++++++++++--------- src/server/network_worker.cpp | 92 +++++++++++++++++++++---------- src/server/network_worker.hpp | 11 +++- src/util/circular_buffer.cpp | 7 +++ src/util/listener.cpp | 21 ++++--- src/util/listener.hpp | 2 +- 7 files changed, 127 insertions(+), 71 deletions(-) diff --git a/src/final/hotflow.ini b/src/final/hotflow.ini index 095c3c5..b604470 100644 --- a/src/final/hotflow.ini +++ b/src/final/hotflow.ini @@ -1,6 +1,6 @@ [Server] Port=1234 -Protocol=TCP +Protocol=UDP [Control] # remember to change gitvc and ignition to false for coldflow diff --git a/src/final/main_network_worker.cpp b/src/final/main_network_worker.cpp index 9b8d38a..a1216cc 100644 --- a/src/final/main_network_worker.cpp +++ b/src/final/main_network_worker.cpp @@ -8,6 +8,11 @@ #include #include "main_network_worker.hpp" #include "../util/logger.hpp" +#include +#include +#include +#include + bool main_network_worker::process_nqi(network_queue_item &nqi) { char c; @@ -19,22 +24,23 @@ bool main_network_worker::process_nqi(network_queue_item &nqi) { switch (nqi.action) { case (nq_recv): { - //Poll before we read: - read_result = do_recv(connfd_tcp, &c, 1); - if (read_result <= 0) { - //FIXME, do something better? - return true; - } - //TODO just make this a process and let the logic in the worker handle it. - /* - * If we get a '0' then start processing stuff. - * If we get a '1' then stop processing stuff. - * Otherwise ignore the message. - */ - logger.info("Received command " + std::to_string((uint8_t) c)); - wqi.action = wq_process; - wqi.data[0] = c; - qw.enqueue(wqi); +// //Poll before we read: +// read_result = do_recv(connfd_tcp, &c, 1); +// if (read_result <= 0) { +// //FIXME, do something better? +// return true; +// } +// //TODO just make this a process and let the logic in the worker handle it. +// /* +// * If we get a '0' then start processing stuff. +// * If we get a '1' then stop processing stuff. +// * Otherwise ignore the message. +// */ +// logger.info("Received command " + std::to_string((uint8_t) c)); +// wqi.action = wq_process; +// wqi.data[0] = c; +// qw.enqueue(wqi); + std::cerr << "Processing a recv\n"; return true; } case (nq_send_ack): { @@ -44,13 +50,13 @@ bool main_network_worker::process_nqi(network_queue_item &nqi) { } case (nq_send): { //Poll before we read: - if (poll(&pf, 1, 0) == 0) { - if (!pf.revents & POLLOUT) { - //Cannot write. Will block. - logger.error("Socket blocked on write."); - return true; - } - } +// if (poll(&pf, 1, 0) == 0) { +// if (!pf.revents & POLLOUT) { +// //Cannot write. Will block. +// logger.error("Socket blocked on write."); +// return true; +// } +// } circular_buffer *buff = nqi.buff; send_code h = (send_code) nqi.data[0]; @@ -83,10 +89,13 @@ bool main_network_worker::process_nqi(network_queue_item &nqi) { // std:: cerr << "Incorrect number of bytes written: " << "Expected " << nqi.nbytes << ", Actual " << bytes_written << std::endl; // } - if (buff->write_data(connfd, nqi.nbytes, nqi.total_bytes) != 0) { - std::cerr << "Connection Closed" << std::endl; - //exit(0); - } + sendto(connfd, "abc", 4, 0, &sa_udp, sizeof(struct sockaddr_in)); + std::cout << "Sent on UDP" << std::endl; + +// if (buff->write_data(connfd, nqi.nbytes, nqi.total_bytes) != 0) { +// std::cerr << "Connection Closed" << std::endl; +// //exit(0); +// } return true; } diff --git a/src/server/network_worker.cpp b/src/server/network_worker.cpp index bba23d6..a4cfa7a 100644 --- a/src/server/network_worker.cpp +++ b/src/server/network_worker.cpp @@ -16,22 +16,56 @@ void network_worker::worker_method() { network_queue_item network_queue_item = {}; work_queue_item work_queue_item = {}; - while (1) { - //std::cout << "Networker entering loop:\n"; - network_queue_item = qn.poll(); - //std::cout << "Networker got item:\n"; + fd_set rfds, wfds; - if (!process_nqi(network_queue_item)) { - std::cerr << "Could not process request on network thread: " << network_queue_item.action << std::endl; - } - } -} + // Initialize read select + FD_ZERO(&rfds); + FD_SET(connfd_tcp, &rfds); + + // Initialize write select + FD_ZERO(&wfds); + FD_SET(connfd_udp, &wfds); -bool network_worker::process_nqi(network_queue_item &network_queue_item) { if (!connected) { std::cout << "Attempting to connect" << std::endl; open_connection(); } + + while (true) { + FD_SET(connfd_tcp, &rfds); + FD_SET(connfd_udp, &wfds); + + select(connfd_tcp + 1, &rfds, &wfds, nullptr, nullptr); + + if (FD_ISSET(connfd_tcp, &rfds)) { + read_result = do_recv(connfd_tcp, &c, 1); + + if (read_result == 0) { + printf("Socket is dead. I no longer have a reason to live.\n"); + exit(1); + } + + work_queue_item.action = wq_process; + work_queue_item.data[0] = c; + qw.enqueue(work_queue_item); + } + + if (FD_ISSET(connfd_udp, &wfds)) { + // printf("UDP send\n"); + network_queue_item = qn.poll(); + if (!process_nqi(network_queue_item)) { + std::cerr << "Could not process request on network thread: " << network_queue_item.action << std::endl; + } + } + +// //std::cout << "Networker entering loop:\n"; +// network_queue_item = qn.poll(); +// //std::cout << "Networker got item:\n"; + + } +} + +bool network_worker::process_nqi(network_queue_item &network_queue_item) { switch (network_queue_item.action) { case (nq_none): { timestamp_t t = get_time(); @@ -40,7 +74,7 @@ bool network_worker::process_nqi(network_queue_item &network_queue_item) { std::cerr << "Connection timed out." << std::endl; fail_connection(); return true; - } else if (timeout > 0 && !has_acked && t - last_recv > timeout / 2) { + } else if (timeout > 0 && t - last_recv > timeout / 2) { //TODO maybe add this as debug option. std::cerr << "Connection inactive. Sending ack." << std::endl; network_queue_item.action = nq_send_ack; @@ -49,8 +83,8 @@ bool network_worker::process_nqi(network_queue_item &network_queue_item) { } //is this messing with an object we don't own? Doesn't seem to be. - network_queue_item.action = nq_recv; - qn.enqueue(network_queue_item); //Just always be reading because otherwise we're screwed. +// network_queue_item.action = nq_recv; +// qn.enqueue(network_queue_item); //Just always be reading because otherwise we're screwed. // FIXME need to do some checking to make sure this happens frequently. return true; } @@ -64,17 +98,17 @@ ssize_t network_worker::do_recv(int fd, char *b, size_t nbytes) { ssize_t read_result; //Poll before we read: - if (poll(&pf, 1, 0) >= 0) { - if (!(pf.revents & POLLIN)) { - // Nothing to read. - //std::cerr << "Socket blocked on read" << std::endl; - //Go back to looping. - return -2; - } - } else { - std::cerr << "Poll failed" << std::endl; - exit(1); - } +// if (poll(&pf, 1, 0) >= 0) { +// if (!(pf.revents & POLLIN)) { +// // Nothing to read. +// //std::cerr << "Socket blocked on read" << std::endl; +// //Go back to looping. +// return -2; +// } +// } else { +// std::cerr << "Poll failed" << std::endl; +// exit(1); +// } read_result = read(fd, b, nbytes); if (read_result > 0) { /* @@ -106,19 +140,17 @@ void network_worker::fail_connection() { } void network_worker::open_connection() { - struct sockaddr_in sa_udp = {}, sa_tcp = {}; - - connfd_tcp = wait_for_connection(port, (struct sockaddr *) &sa_tcp); + connfd_tcp = wait_for_connection(port, &sa_tcp); if (connfd_tcp < 0) std::cerr << "Could not open TCP connection fd." << std::endl; if (config_map["Server.Protocol"].as() == "UDP") { - connfd_udp = create_send_fd(port, (struct sockaddr*) &sa_udp); -// if (connect(connfd_udp,(struct sockaddr *) &sa,sizeof(sa))) { + connfd_udp = create_send_fd(port, (sockaddr_in *) &sa_udp); +// if (connect(connfd_udp,(struct sockaddr *) &sa_udp, sizeof(sa_udp)) == -1) { // connfd_udp = -1; // std::cerr << "Could not open UDP connection fd." << std::endl; // } - std::cout << "Successfully opened UDP socket on " << connfd_udp << "." << std::endl; + std::cerr << "Successfully connected on udp." << std::endl; } /* diff --git a/src/server/network_worker.hpp b/src/server/network_worker.hpp index be53835..245b8e4 100644 --- a/src/server/network_worker.hpp +++ b/src/server/network_worker.hpp @@ -16,6 +16,10 @@ #include #include "../util/circular_buffer.hpp" #include "../util/timestamps.hpp" +#include +#include +#include +#include class network_worker : public worker { public: @@ -23,6 +27,7 @@ class network_worker : public worker { int connfd_tcp; int connfd_udp; bool connected; + struct sockaddr sa_udp, sa_tcp; /** * The number of microseconds where no receive has occured before the worker should treat the peer @@ -44,11 +49,14 @@ class network_worker : public worker { (safe_queue &my_qn, safe_queue &my_qw, int port, timestamp_t timeout) : worker(my_qn, my_qw) + , connected(0) , port(port) , last_recv(0) , timeout(timeout) , connfd_tcp(-1) , connfd_udp(-1) + , sa_udp({}) + , sa_tcp({}) { last_recv = get_time() - timeout; // Set the last time such that we are immediately timed out. }; @@ -87,9 +95,6 @@ class network_worker : public worker { * @return Whatever read returned. -2 if poll succeeded but there was nothing to receive. -3 if poll failed. */ ssize_t do_recv(int fd, char *b, size_t nbytes); - pollfd pf; - - bool has_acked; }; diff --git a/src/util/circular_buffer.cpp b/src/util/circular_buffer.cpp index dd84a53..28b703d 100644 --- a/src/util/circular_buffer.cpp +++ b/src/util/circular_buffer.cpp @@ -5,7 +5,12 @@ #include "unistd.h" #include #include +#include #include "circular_buffer.hpp" +#include +#include +#include +#include //#define DEBUG_CIRC_SEND @@ -95,6 +100,8 @@ ssize_t circular_buffer::write_data(int fd, size_t n, size_t offset) { #endif ssize_t result; result = write(fd, data + offset % this->nbytes, to_send); + // result = sendto(fd, data + offset % this->nbytes, to_send, 0, destination, sizeof(struct sockaddr_in)); + if (result <= 0) { perror("help?"); std::cerr << "Error while writing." << std::endl; diff --git a/src/util/listener.cpp b/src/util/listener.cpp index bff43f3..0ee3688 100644 --- a/src/util/listener.cpp +++ b/src/util/listener.cpp @@ -4,6 +4,7 @@ #include #include +#include #include "listener.hpp" /* @@ -81,15 +82,15 @@ int wait_for_connection(int port, sockaddr *sa) { fprintf(stderr, "Received request on connfd on %d\n", connfd); #endif /*DEBUG_LISTENER*/ - // close(listenfd); + close(listenfd); return connfd; } int -create_send_fd(int port, sockaddr *sa) { +create_send_fd(int port, sockaddr_in *sa) { int udp_server_fd; - socklen_t sockaddr_len = sizeof(sockaddr_in); - struct sockaddr_in udp_server = {}; + // socklen_t sockaddr_len = sizeof(sockaddr_in); + // struct sockaddr_in udp_server = {}; if((udp_server_fd = socket(AF_INET, SOCK_DGRAM, 0)) == -1) { @@ -97,12 +98,14 @@ create_send_fd(int port, sockaddr *sa) { exit(-1); } - udp_server.sin_family = AF_INET; - udp_server.sin_port = htons((uint16_t) port); - udp_server.sin_addr.s_addr = INADDR_ANY; - bzero(udp_server.sin_zero, 8); + sa->sin_family = AF_INET; + sa->sin_port = htons((uint16_t) port); + sa->sin_addr.s_addr = htonl(INADDR_ANY); + bzero(sa->sin_zero, 8); -// if(bind(udp_server_fd, (struct sockaddr *)&udp_server, sockaddr_len) == -1) + std::cerr << sa->sin_addr.s_addr << std::endl; + +// if(bind(udp_server_fd, (struct sockaddr *)&udp_server, sizeof(struct sockaddr_in)) == -1) // { // perror("bind"); // exit(-1); diff --git a/src/util/listener.hpp b/src/util/listener.hpp index 76a9e32..476552a 100644 --- a/src/util/listener.hpp +++ b/src/util/listener.hpp @@ -38,6 +38,6 @@ int wait_for_connection(int port, sockaddr *sa); int open_listen(int port); // TODO -int create_send_fd(int port, sockaddr *sa); +int create_send_fd(int port, sockaddr_in *sa); #endif //SOFTWARE_LISTENER_HPP From 2992bcfa2482653bd8378034f51045e2606dea02 Mon Sep 17 00:00:00 2001 From: Tommy Yuan Date: Sat, 26 Jan 2019 23:47:57 -0600 Subject: [PATCH 3/4] Delete unused code. Organizing stuff before I start working on the circular buffer sending. --- src/final/main_network_worker.cpp | 41 +++--------------- src/server/network_worker.cpp | 71 +++++++++---------------------- src/server/network_worker.hpp | 2 +- src/util/circular_buffer.cpp | 6 ++- src/util/listener.cpp | 20 +++------ 5 files changed, 38 insertions(+), 102 deletions(-) diff --git a/src/final/main_network_worker.cpp b/src/final/main_network_worker.cpp index a1216cc..b9c3874 100644 --- a/src/final/main_network_worker.cpp +++ b/src/final/main_network_worker.cpp @@ -15,54 +15,27 @@ bool main_network_worker::process_nqi(network_queue_item &nqi) { - char c; - work_queue_item wqi; - ssize_t read_result; // char* combined_buff = new char[1 << 12]; Logger logger("logs/nw.log", "network thread", LOG_INFO); switch (nqi.action) { case (nq_recv): { -// //Poll before we read: -// read_result = do_recv(connfd_tcp, &c, 1); -// if (read_result <= 0) { -// //FIXME, do something better? -// return true; -// } -// //TODO just make this a process and let the logic in the worker handle it. -// /* -// * If we get a '0' then start processing stuff. -// * If we get a '1' then stop processing stuff. -// * Otherwise ignore the message. -// */ -// logger.info("Received command " + std::to_string((uint8_t) c)); -// wqi.action = wq_process; -// wqi.data[0] = c; -// qw.enqueue(wqi); std::cerr << "Processing a recv\n"; return true; } case (nq_send_ack): { // Don't actually send an ack. Just don't timeout for now. - //network_worker::send_header(ack, 0); break; } case (nq_send): { - //Poll before we read: -// if (poll(&pf, 1, 0) == 0) { -// if (!pf.revents & POLLOUT) { -// //Cannot write. Will block. -// logger.error("Socket blocked on write."); -// return true; -// } -// } circular_buffer *buff = nqi.buff; send_code h = (send_code) nqi.data[0]; // TODO this header should correspond to something from the nqi data. + // TODO error check network_worker::send_header(h, nqi.nbytes); // std::cerr << "Preparing header" << std::endl; @@ -89,13 +62,13 @@ bool main_network_worker::process_nqi(network_queue_item &nqi) { // std:: cerr << "Incorrect number of bytes written: " << "Expected " << nqi.nbytes << ", Actual " << bytes_written << std::endl; // } - sendto(connfd, "abc", 4, 0, &sa_udp, sizeof(struct sockaddr_in)); - std::cout << "Sent on UDP" << std::endl; +// sendto(connfd, "abc", 4, 0, &sa_udp, sizeof(struct sockaddr_in)); +// std::cout << "Sent on UDP" << std::endl; -// if (buff->write_data(connfd, nqi.nbytes, nqi.total_bytes) != 0) { -// std::cerr << "Connection Closed" << std::endl; -// //exit(0); -// } + if (buff->write_data(connfd, nqi.nbytes, nqi.total_bytes) != 0) { + std::cerr << "Connection Closed" << std::endl; + //exit(0); + } return true; } diff --git a/src/server/network_worker.cpp b/src/server/network_worker.cpp index a4cfa7a..f6962d3 100644 --- a/src/server/network_worker.cpp +++ b/src/server/network_worker.cpp @@ -11,7 +11,6 @@ #include void network_worker::worker_method() { - ssize_t read_result; char c; network_queue_item network_queue_item = {}; work_queue_item work_queue_item = {}; @@ -38,12 +37,7 @@ void network_worker::worker_method() { select(connfd_tcp + 1, &rfds, &wfds, nullptr, nullptr); if (FD_ISSET(connfd_tcp, &rfds)) { - read_result = do_recv(connfd_tcp, &c, 1); - - if (read_result == 0) { - printf("Socket is dead. I no longer have a reason to live.\n"); - exit(1); - } + do_recv(connfd_tcp, &c, 1); work_queue_item.action = wq_process; work_queue_item.data[0] = c; @@ -51,22 +45,17 @@ void network_worker::worker_method() { } if (FD_ISSET(connfd_udp, &wfds)) { - // printf("UDP send\n"); network_queue_item = qn.poll(); if (!process_nqi(network_queue_item)) { std::cerr << "Could not process request on network thread: " << network_queue_item.action << std::endl; } } - -// //std::cout << "Networker entering loop:\n"; -// network_queue_item = qn.poll(); -// //std::cout << "Networker got item:\n"; - } } bool network_worker::process_nqi(network_queue_item &network_queue_item) { switch (network_queue_item.action) { + // TODO are we still using this? case (nq_none): { timestamp_t t = get_time(); // First check if we are close to timing out: @@ -82,10 +71,6 @@ bool network_worker::process_nqi(network_queue_item &network_queue_item) { return true; } - //is this messing with an object we don't own? Doesn't seem to be. -// network_queue_item.action = nq_recv; -// qn.enqueue(network_queue_item); //Just always be reading because otherwise we're screwed. - // FIXME need to do some checking to make sure this happens frequently. return true; } default: { @@ -97,34 +82,16 @@ bool network_worker::process_nqi(network_queue_item &network_queue_item) { ssize_t network_worker::do_recv(int fd, char *b, size_t nbytes) { ssize_t read_result; - //Poll before we read: -// if (poll(&pf, 1, 0) >= 0) { -// if (!(pf.revents & POLLIN)) { -// // Nothing to read. -// //std::cerr << "Socket blocked on read" << std::endl; -// //Go back to looping. -// return -2; -// } -// } else { -// std::cerr << "Poll failed" << std::endl; -// exit(1); -// } read_result = read(fd, b, nbytes); if (read_result > 0) { - /* - * Update our timing on when we last received. - */ - timestamp_t trecv = get_time(); - this->last_recv = trecv; + /* Update our timing on when we last received. */ + this->last_recv = get_time(); } else if (read_result == 0) { - /* - * Check if the file descriptor has closed: - */ + /* If we read 0 then the fd is closed */ std::cerr << "Read nothing from socket. Assuming it is dead." << std::endl; fail_connection(); } - //std::cout << "Read " << read_result << " bytes from socket." << std::endl; return read_result; } @@ -137,6 +104,9 @@ void network_worker::fail_connection() { connfd_udp = -1; connected = false; + + std::cout << "Attempting to reconnect" << std::endl; + open_connection(); } void network_worker::open_connection() { @@ -144,31 +114,30 @@ void network_worker::open_connection() { if (connfd_tcp < 0) std::cerr << "Could not open TCP connection fd." << std::endl; - if (config_map["Server.Protocol"].as() == "UDP") { - connfd_udp = create_send_fd(port, (sockaddr_in *) &sa_udp); +// if (config_map["Server.Protocol"].as() == "UDP") { + connfd_udp = create_send_fd(port, (sockaddr_in *) &sa_udp); // if (connect(connfd_udp,(struct sockaddr *) &sa_udp, sizeof(sa_udp)) == -1) { // connfd_udp = -1; // std::cerr << "Could not open UDP connection fd." << std::endl; // } - std::cerr << "Successfully connected on udp." << std::endl; - } +// } - /* - * Update the last received and mark that we are connected: - */ + /* Update the last received and mark that we are connected */ connected = true; last_recv = get_time(); } -ssize_t network_worker::send_header(send_code h, size_t nbytes) { +bool network_worker::send_header(send_code h, size_t nbytes) { send_header_t sh; sh.code = h; sh.nbytes = nbytes; - // If the UDP Socket is setup, use it. - if (connfd_udp != -1) { - ssize_t result = write(connfd_udp, &sh, sizeof(sh)); - } else { - ssize_t result = write(connfd_tcp, &sh, sizeof(sh)); + ssize_t result = write(connfd_udp, &sh, sizeof(sh)); + + if (result != nbytes) { + std::cerr << "ERROR: Wrote " << result << " out of " << nbytes << " bytes\n"; + return false; } + + return true; } \ No newline at end of file diff --git a/src/server/network_worker.hpp b/src/server/network_worker.hpp index 245b8e4..e3dfc5e 100644 --- a/src/server/network_worker.hpp +++ b/src/server/network_worker.hpp @@ -84,7 +84,7 @@ class network_worker : public worker { */ void open_connection(); - ssize_t send_header(send_code h, size_t nbytes); + bool send_header(send_code h, size_t nbytes); protected: /** diff --git a/src/util/circular_buffer.cpp b/src/util/circular_buffer.cpp index 28b703d..1facd12 100644 --- a/src/util/circular_buffer.cpp +++ b/src/util/circular_buffer.cpp @@ -14,10 +14,12 @@ //#define DEBUG_CIRC_SEND -circular_buffer::circular_buffer(size_t size) : bytes_written(0) { +circular_buffer::circular_buffer(size_t size) + : bytes_written(0) + { this->nbytes = size; this->data = new char[size]; -} + } circular_buffer::~circular_buffer() { delete [] this->data; diff --git a/src/util/listener.cpp b/src/util/listener.cpp index 0ee3688..32db7ac 100644 --- a/src/util/listener.cpp +++ b/src/util/listener.cpp @@ -18,7 +18,7 @@ int open_listen(int port) { - int listenfd, optval = 1; + int listenfd; struct sockaddr_in serveraddr = {}; /* Set "listenfd" to a newly created stream socket */ @@ -63,15 +63,14 @@ int wait_for_connection(int port, sockaddr *sa) { /* Create a TCP port, and check that it is valid. */ listenfd = open_listen(port); if (listenfd < 0) { - // TODO have a procedure (hopefully on a new thread) to create a new port and - // start listening again + // TODO have a procedure (on a new thread) to create a new port and listen again fprintf(stderr, "Failed to open listener on %d\n", port); exit(1); } /* * Accept a new connection and return the file descriptor of - * the client, with the appropriate address already set. + * the client. */ if((connfd = accept(listenfd, sa, &clientlen)) == -1) { perror("TCP Client Accept Error"); @@ -82,6 +81,7 @@ int wait_for_connection(int port, sockaddr *sa) { fprintf(stderr, "Received request on connfd on %d\n", connfd); #endif /*DEBUG_LISTENER*/ + // Close this since we only care about what the client sends to us. close(listenfd); return connfd; } @@ -89,28 +89,20 @@ int wait_for_connection(int port, sockaddr *sa) { int create_send_fd(int port, sockaddr_in *sa) { int udp_server_fd; - // socklen_t sockaddr_len = sizeof(sockaddr_in); - // struct sockaddr_in udp_server = {}; + // Create a new UDP socket if((udp_server_fd = socket(AF_INET, SOCK_DGRAM, 0)) == -1) { perror("UDP Socket Creation Error"); exit(-1); } + // These settings must match those on the mission control end sa->sin_family = AF_INET; sa->sin_port = htons((uint16_t) port); sa->sin_addr.s_addr = htonl(INADDR_ANY); bzero(sa->sin_zero, 8); - std::cerr << sa->sin_addr.s_addr << std::endl; - -// if(bind(udp_server_fd, (struct sockaddr *)&udp_server, sizeof(struct sockaddr_in)) == -1) -// { -// perror("bind"); -// exit(-1); -// } - #ifdef DEBUG_LISTENER fprintf(stderr, "Sending on fd %d\n", udp_server_fd); #endif /*DEBUG_LISTENER*/ From fe85e8562c6d1403d27f4e35486ec79477674b28 Mon Sep 17 00:00:00 2001 From: Tommy Yuan Date: Sun, 27 Jan 2019 02:08:45 -0600 Subject: [PATCH 4/4] Add write_data_datagram to circular buffer Copies header and data into a new char array and sends it in one go as a large datagram. Not sure if this will affect memory/speed enough to matter. Need to clean things up as it was a quick and dirty test. Can remove write_data now that we have this method. --- src/final/main_network_worker.cpp | 11 ++---- src/util/circular_buffer.cpp | 61 +++++++++++++++++++++++++++++++ src/util/circular_buffer.hpp | 3 ++ 3 files changed, 68 insertions(+), 7 deletions(-) diff --git a/src/final/main_network_worker.cpp b/src/final/main_network_worker.cpp index b9c3874..2b0a191 100644 --- a/src/final/main_network_worker.cpp +++ b/src/final/main_network_worker.cpp @@ -30,14 +30,14 @@ bool main_network_worker::process_nqi(network_queue_item &nqi) { } case (nq_send): { circular_buffer *buff = nqi.buff; + send_header_t header; - send_code h = (send_code) nqi.data[0]; + header.code = (send_code) nqi.data[0]; + header.nbytes = nqi.nbytes; // TODO this header should correspond to something from the nqi data. // TODO error check - network_worker::send_header(h, nqi.nbytes); - // std::cerr << "Preparing header" << std::endl; // // send_header_t sh = (send_header_t) {h, nqi.nbytes}; @@ -54,9 +54,6 @@ bool main_network_worker::process_nqi(network_queue_item &nqi) { // std::cerr << "Copy bytes from buffer failed!" << std::endl; // } - logger.debug("Writing data: Nbytes: " + std::to_string(nqi.nbytes) + " Type: " + std::to_string(h)); - int connfd = (connfd_udp != -1) ? connfd_udp : connfd_tcp; //Use UDP if the socket is configured - // ssize_t bytes_written = 0; // if ((bytes_written = write(connfd, combined_buff, nqi.nbytes)) != nqi.nbytes) { // std:: cerr << "Incorrect number of bytes written: " << "Expected " << nqi.nbytes << ", Actual " << bytes_written << std::endl; @@ -65,7 +62,7 @@ bool main_network_worker::process_nqi(network_queue_item &nqi) { // sendto(connfd, "abc", 4, 0, &sa_udp, sizeof(struct sockaddr_in)); // std::cout << "Sent on UDP" << std::endl; - if (buff->write_data(connfd, nqi.nbytes, nqi.total_bytes) != 0) { + if (buff->write_data_datagram(connfd_udp, nqi.nbytes, nqi.total_bytes, &header, sizeof(send_header_t), &sa_udp) != 0) { std::cerr << "Connection Closed" << std::endl; //exit(0); } diff --git a/src/util/circular_buffer.cpp b/src/util/circular_buffer.cpp index 1facd12..7e2fdfb 100644 --- a/src/util/circular_buffer.cpp +++ b/src/util/circular_buffer.cpp @@ -115,6 +115,67 @@ ssize_t circular_buffer::write_data(int fd, size_t n, size_t offset) { } } +ssize_t circular_buffer::write_data_datagram(int fd, size_t n, size_t offset, void *header, size_t header_size, sockaddr *destination) { + size_t bw = bytes_written.load(); + char *send_buff = new char[header_size + n]; + char *pos = send_buff; + + /* + offset is smallest number we're trying to write. + bw - nbytes is the smallest number that hasn't been overwritten. + We want to ensure that we aren't trying to write data that has been overwritten. + */ + if ((long) offset < (long) (bw - this->nbytes)) { + std::cerr << "Bytes already overwritten before sending: Num_to_write:" << n + << " Offset:" << offset + << " Total bytes written into buffer" << bw + << " Total buffer size: " << nbytes << std::endl; + //return -1; //TODO don't collide with write error? + } + + // Copy in the header at the beginning + memcpy(pos, header, header_size); + pos += header_size; + + size_t to_send = nbytes - offset % this->nbytes; // The distance between start and the end of the buffer. + to_send = to_send > n ? n : to_send; //Take the minimum value of n and to_send. + + // Copy in the data + memcpy(pos, data + offset % this->nbytes, to_send); + pos += offset % this->nbytes; + + if (n > to_send) { + memcpy(pos, data + (offset + to_send) % this->nbytes, n - to_send); + } + +#ifdef DEBUG_CIRC_SEND + if (to_send > 4 * 16) { + size_t temp_offset = offset % this->nbytes; + + fprintf(stdout, "Offset %#u \n", (uint16_t) temp_offset); + fprintf(stdout, "Sending Bytes: %X %lld %X %lld %X %lld %X %lld \n", + *((uint16_t *)(data + temp_offset)), *((uint64_t *)(data + temp_offset + 8)), + *((uint16_t *)(data + temp_offset + 16)), *((uint64_t *)(data + temp_offset + 24)), + *((uint16_t *)(data + temp_offset + 32)), *((uint64_t *)(data + temp_offset + 40)), + *((uint16_t *)(data + temp_offset + 48)), *((uint64_t *)(data + temp_offset + 56))); + } +#endif + ssize_t result; + // result = write(fd, data + offset % this->nbytes, to_send); + result = sendto(fd, send_buff, to_send, 0, destination, sizeof(struct sockaddr_in)); + + if (result <= 0) { + perror("help?"); + std::cerr << "Error while writing." << std::endl; + return result; + } else if (result != n) { + std::cerr << "Error while writing. Wrote " << result << " bytes, expecting " << n << " bytes total\n"; + return -1; + } + + return 0; +} + void circular_buffer::zero() { bzero(data, nbytes); bytes_written = 0; diff --git a/src/util/circular_buffer.hpp b/src/util/circular_buffer.hpp index 0af035e..88b8265 100644 --- a/src/util/circular_buffer.hpp +++ b/src/util/circular_buffer.hpp @@ -54,6 +54,9 @@ class circular_buffer { */ ssize_t write_data(int fd, size_t n, size_t offset); + // TODO void * should be send_header_t*. Not for now due to circular include. + ssize_t write_data_datagram(int fd, size_t n, size_t offset, void *header, size_t header_size, struct sockaddr *destination); + void zero(); };