From 66a28d3023b20d532791e45fd7b0fb43242ecfed Mon Sep 17 00:00:00 2001 From: Michael Stapelberg Date: Sat, 8 Oct 2016 23:53:05 +0200 Subject: [PATCH] Work in progress: waterfall-by-priority load balancing see issue #82 for context --- src/mlvpn.c | 16 +++++++++++-- src/mlvpn.h | 6 ++--- src/tuntap_generic.c | 2 +- src/wrr.c | 57 ++++++++++++++++++++++++++++---------------- 4 files changed, 54 insertions(+), 27 deletions(-) diff --git a/src/mlvpn.c b/src/mlvpn.c index 10d55ac..6a1b28a 100644 --- a/src/mlvpn.c +++ b/src/mlvpn.c @@ -82,6 +82,7 @@ static char **saved_argv; struct ev_loop *loop; static ev_timer reorder_drain_timeout; static ev_timer reorder_adjust_rtt_timeout; +static ev_timer reset_wrr_timeout; char *status_command = NULL; char *process_title = NULL; int logdebug = 0; @@ -148,6 +149,7 @@ static uint32_t mlvpn_rtun_reorder_drain(uint32_t reorder); static void mlvpn_rtun_reorder_drain_timeout(EV_P_ ev_timer *w, int revents); static void mlvpn_rtun_check_timeout(EV_P_ ev_timer *w, int revents); static void mlvpn_rtun_adjust_reorder_timeout(EV_P_ ev_timer *w, int revents); +static void mlvpn_reset_wrr_timeout(EV_P_ ev_timer *w, int revents); static void mlvpn_rtun_send_keepalive(ev_tstamp now, mlvpn_tunnel_t *t); static void mlvpn_rtun_send_disconnect(mlvpn_tunnel_t *t); static int mlvpn_rtun_send(mlvpn_tunnel_t *tun, circular_buffer_t *pktbuf); @@ -1128,10 +1130,10 @@ mlvpn_rtun_tick_connect(mlvpn_tunnel_t *t) } mlvpn_tunnel_t * -mlvpn_rtun_choose() +mlvpn_rtun_choose(uint32_t len) { mlvpn_tunnel_t *tun; - tun = mlvpn_rtun_wrr_choose(); + tun = mlvpn_rtun_wrr_choose(len); return tun; } @@ -1253,6 +1255,12 @@ mlvpn_rtun_adjust_reorder_timeout(EV_P_ ev_timer *w, int revents) } } +static void +mlvpn_reset_wrr_timeout(EV_P_ ev_timer *w, int revents) +{ + mlvpn_rtun_wrr_reset(&rtuns, mlvpn_status.fallback_mode); +} + static void tuntap_io_event(EV_P_ ev_io *w, int revents) { @@ -1531,6 +1539,10 @@ main(int argc, char **argv) mlvpn_rtun_adjust_reorder_timeout, 0., 1.0); ev_timer_start(EV_A_ &reorder_adjust_rtt_timeout); + ev_timer_init(&reset_wrr_timeout, + mlvpn_reset_wrr_timeout, 0., 1.0); + ev_timer_start(EV_A_ &reset_wrr_timeout); + priv_set_running_state(); #ifdef ENABLE_CONTROL diff --git a/src/mlvpn.h b/src/mlvpn.h index 0026dfa..a2b27ed 100644 --- a/src/mlvpn.h +++ b/src/mlvpn.h @@ -47,7 +47,7 @@ #include "timestamp.h" #define MLVPN_MAXHNAMSTR 256 -#define MLVPN_MAXPORTSTR 6 +#define MLVPN_MAXPORTSTR 5 /* Number of packets in the queue. Each pkt is ~ 1520 */ /* 1520 * 128 ~= 24 KBytes of data maximum per channel VMSize */ @@ -191,8 +191,8 @@ int mlvpn_sock_set_nonblocking(int fd); int mlvpn_loss_ratio(mlvpn_tunnel_t *tun); int mlvpn_rtun_wrr_reset(struct rtunhead *head, int use_fallbacks); -mlvpn_tunnel_t *mlvpn_rtun_wrr_choose(); -mlvpn_tunnel_t *mlvpn_rtun_choose(); +mlvpn_tunnel_t *mlvpn_rtun_wrr_choose(uint32_t len); +mlvpn_tunnel_t *mlvpn_rtun_choose(uint32_t len); mlvpn_tunnel_t *mlvpn_rtun_new(const char *name, const char *bindaddr, const char *bindport, uint32_t bindfib, const char *destaddr, const char *destport, diff --git a/src/tuntap_generic.c b/src/tuntap_generic.c index 1141c35..c2f72ff 100644 --- a/src/tuntap_generic.c +++ b/src/tuntap_generic.c @@ -15,7 +15,7 @@ mlvpn_tuntap_generic_read(u_char *data, uint32_t len) } #endif if (!rtun) { - rtun = mlvpn_rtun_choose(); + rtun = mlvpn_rtun_choose(len); /* Not connected to anyone. read and discard packet. */ if (! rtun) return len; diff --git a/src/wrr.c b/src/wrr.c index 8e0cb6e..d0100c5 100644 --- a/src/wrr.c +++ b/src/wrr.c @@ -35,8 +35,24 @@ static int wrr_min_index() /* initialize wrr system */ int mlvpn_rtun_wrr_reset(struct rtunhead *head, int use_fallbacks) { + int tunnels = 0; mlvpn_tunnel_t *t; - wrr.len = 0; + // TODO: add tunnels in the right order to |head| to begin with + log_debug("wrr_reset", "begin"); + LIST_FOREACH(t, head, entries) { + log_debug("wrr_reset", "checking tunnel"); + if (t->fallback_only != use_fallbacks) { + continue; + } + /* Don't select "LOSSY" tunnels, except if we are in fallback mode */ + if ((t->fallback_only && t->status >= MLVPN_AUTHOK) || + (t->status == MLVPN_AUTHOK)) + { + tunnels++; + } + } + log_debug("wrr_reset", "done, found %d tunnels", tunnels); + wrr.len = tunnels; LIST_FOREACH(t, head, entries) { if (t->fallback_only != use_fallbacks) { @@ -48,9 +64,9 @@ int mlvpn_rtun_wrr_reset(struct rtunhead *head, int use_fallbacks) { if (wrr.len >= MAX_TUNNELS) fatalx("You have too much tunnels declared"); - wrr.tunnel[wrr.len] = t; - wrr.tunval[wrr.len] = 0.0; - wrr.len++; + tunnels--; + wrr.tunnel[tunnels] = t; + wrr.tunval[tunnels] = 0.0; } } @@ -58,23 +74,22 @@ int mlvpn_rtun_wrr_reset(struct rtunhead *head, int use_fallbacks) } mlvpn_tunnel_t * -mlvpn_rtun_wrr_choose() +mlvpn_rtun_wrr_choose(uint32_t len) { - int i; - int idx; - - if (wrr.len == 0) - return NULL; - - idx = wrr_min_index(); - if (idx < 0) - fatalx("Programming error: wrr_min_index < 0!"); - - for(i = 0; i < wrr.len; i++) - { - if (wrr.tunval[i] > 0) - wrr.tunval[i] -= 1; + int i = 0; + /* Iterate through tunnels in order of priority */ + for (i = 0; i < wrr.len; i++) { + log_debug("wrr", "check tunnel %d/%d with %f+%u bytes used of %d total", + i, wrr.len, wrr.tunval[i], len, wrr.tunnel[i]->bandwidth); + /* Skip tunnels which have exhausted their bandwidth in this timeslot */ + if ((wrr.tunval[i] + len) >= wrr.tunnel[i]->bandwidth) { + log_debug("wrr", "bandwidth exhausted!"); + continue; + } + wrr.tunval[i] += len; + return wrr.tunnel[i]; } - wrr.tunval[idx] = (double) 100.0 / wrr.tunnel[idx]->weight; - return wrr.tunnel[idx]; + log_debug("wrr", "no tunnel found for packet of len %u", len); + /* Discard the packet in mlvpn_tuntap_generic_read() */ + return NULL; }