From acad3c257d031b0803f06a27a6c53cfffb5e6cd8 Mon Sep 17 00:00:00 2001 From: Iftah Levi Date: Wed, 23 Mar 2022 16:26:11 +0000 Subject: [PATCH 1/2] Add -r option - rate_per_thread Allow to control rate per thread context instead of per connection context. Signed-off-by: Iftah Levi --- src/wrk.c | 93 ++++++++++++++++++++++++++++++++++++++----------------- src/wrk.h | 20 +++++++----- 2 files changed, 77 insertions(+), 36 deletions(-) diff --git a/src/wrk.c b/src/wrk.c index 96ac53a..0806833 100644 --- a/src/wrk.c +++ b/src/wrk.c @@ -38,6 +38,7 @@ static struct config { bool dynamic; bool record_all_responses; bool warmup; + bool rate_by_thread; char *host; char *script; char *local_ip; @@ -97,6 +98,9 @@ static void usage() { " -W --warmup Enable warmup phase \n" " In warmup phase connections are establised,\n" " but no requests are sent \n" + " -r --rate_per_thread Use rate limit per thread \n" + " instead of per connection. \n" + " Might help to reduce traffic bursts\n" " \n" " \n" " Numeric arguments may include a SI unit (1k, 1M, 1G)\n" @@ -194,7 +198,7 @@ int main(int argc, char **argv) { // TODO Review whether we can reduce number of events per thread t->loop = aeCreateEventLoop(10 + cfg.connections * 3); t->connections = connections; - t->throughput = throughput; + t->rate_handler.throughput = throughput; t->stop_at = stop_at; if (local_ip_nr > 0) @@ -369,19 +373,26 @@ void *thread_main(void *arg) { script_request(thread->L, &request, &length); } - double throughput = (thread->throughput / 1000000.0) / thread->connections; + double throughput = (thread->rate_handler.throughput / 1000000.0) / thread->connections; connection *c = thread->cs; + if (cfg.rate_by_thread) { + thread->rate_handler.throughput = thread->rate_handler.throughput / 1000000.0; + thread->rate_handler.catch_up_throughput = thread->rate_handler.throughput * 2; + thread->rate_handler.sent = 0; + thread->rate_handler.caught_up = true; + } + for (uint64_t i = 0; i < thread->connections; i++, c++) { c->thread = thread; c->ssl = cfg.ctx ? SSL_new(cfg.ctx) : NULL; c->request = request; c->length = length; - c->throughput = throughput; - c->catch_up_throughput = throughput * 2; - c->complete = 0; - c->caught_up = true; + c->rate_handler.throughput = throughput; + c->rate_handler.catch_up_throughput = throughput * 2; + c->rate_handler.sent = 0; + c->rate_handler.caught_up = true; // Stagger connects 5 msec apart within thread: aeCreateTimeEvent(loop, i * 5, delayed_initial_connect, c, NULL); } @@ -525,7 +536,7 @@ static int reconnect_socket(thread *thread, connection *c) { static int delayed_initial_connect(aeEventLoop *loop, long long id, void *data) { connection* c = data; - c->thread_start = time_us(); + c->rate_handler.thread_start = time_us(); connect_socket(c->thread, c); return AE_NOMORE; } @@ -631,30 +642,40 @@ static int response_body(http_parser *parser, const char *at, size_t len) { static uint64_t usec_to_next_send(connection *c) { uint64_t now = time_us(); + rate_handler_t *rate_handler = + (cfg.rate_by_thread) ? &c->thread->rate_handler : &c->rate_handler; - uint64_t next_start_time = c->thread_start + (c->complete / c->throughput); + uint64_t next_start_time = + rate_handler->thread_start + (rate_handler->sent / rate_handler->throughput); bool send_now = true; + if (cfg.rate_by_thread) { + if (rate_handler->sent == 0) { + rate_handler->thread_start = now; + goto ret; + } + } + if (next_start_time > now) { // We are on pace. Indicate caught_up and don't send now. - c->caught_up = true; + rate_handler->caught_up = true; send_now = false; } else { // We are behind - if (c->caught_up) { + if (rate_handler->caught_up) { // This is the first fall-behind since we were last caught up - c->caught_up = false; - c->catch_up_start_time = now; - c->complete_at_catch_up_start = c->complete; + rate_handler->caught_up = false; + rate_handler->catch_up_start_time = now; + rate_handler->complete_at_catch_up_start = rate_handler->sent; } // Figure out if it's time to send, per catch up throughput: uint64_t complete_since_catch_up_start = - c->complete - c->complete_at_catch_up_start; + rate_handler->sent - rate_handler->complete_at_catch_up_start; - next_start_time = c->catch_up_start_time + - (complete_since_catch_up_start / c->catch_up_throughput); + next_start_time = rate_handler->catch_up_start_time + + (complete_since_catch_up_start / rate_handler->catch_up_throughput); if (next_start_time > now) { // Not yet time to send, even at catch-up throughout: @@ -662,9 +683,13 @@ static uint64_t usec_to_next_send(connection *c) { } } +ret: if (send_now) { c->latest_should_send_time = now; c->latest_expected_start = next_start_time; + if (cfg.rate_by_thread) { + rate_handler->sent++; + } } return send_now ? 0 : (next_start_time - now); @@ -672,9 +697,11 @@ static uint64_t usec_to_next_send(connection *c) { static int delay_request(aeEventLoop *loop, long long id, void *data) { connection* c = data; - uint64_t time_usec_to_wait = usec_to_next_send(c); - if (time_usec_to_wait) { - return round((time_usec_to_wait / 1000.0L) + 0.5); /* don't send, wait */ + if (!cfg.rate_by_thread) { + uint64_t time_usec_to_wait = usec_to_next_send(c); + if (time_usec_to_wait) { + return round((time_usec_to_wait / 1000.0L) + 0.5); /* don't send, wait */ + } } aeCreateFileEvent(c->thread->loop, c->fd, AE_WRITABLE, socket_writeable, c); return AE_NOMORE; @@ -705,7 +732,7 @@ static int response_complete(http_parser *parser) { } // Count all responses (including pipelined ones:) - c->complete++; + c->rate_handler.sent++; // Note that expected start time is computed based on the completed // response count seen at the beginning of the last request batch sent. @@ -714,8 +741,9 @@ static int response_complete(http_parser *parser) { // start time based on the completion count of these individual pipelined // requests we can easily end up "gifting" them time and seeing // negative latencies. - uint64_t expected_latency_start = c->thread_start + - (c->complete_at_last_batch_start / c->throughput); + uint64_t expected_latency_start = c->rate_handler.thread_start + + (c->complete_at_last_batch_start / c->rate_handler.throughput); + int64_t expected_latency_timing = now - expected_latency_start; @@ -728,16 +756,16 @@ static int response_complete(http_parser *parser) { printf(" expected_latency_timing = %"PRId64"\n", expected_latency_timing); printf(" now = %"PRIu64"\n", now); printf(" expected_latency_start = %"PRIu64"\n", expected_latency_start); - printf(" c->thread_start = %"PRIu64"\n", c->thread_start); - printf(" c->complete = %"PRIu64"\n", c->complete); - printf(" throughput = %g\n", c->throughput); + printf(" c->thread_start = %"PRIu64"\n", c->rate_handler.thread_start); + printf(" c->complete = %"PRIu64"\n", c->rate_handler.sent); + printf(" throughput = %g\n", c->rate_handler.throughput); printf(" latest_should_send_time = %"PRIu64"\n", c->latest_should_send_time); printf(" latest_expected_start = %"PRIu64"\n", c->latest_expected_start); printf(" latest_connect = %"PRIu64"\n", c->latest_connect); printf(" latest_write = %"PRIu64"\n", c->latest_write); - expected_latency_start = c->thread_start + - ((c->complete ) / c->throughput); + expected_latency_start = c->rate_handler.thread_start + + ((c->rate_handler.sent ) / c->rate_handler.throughput); printf(" next expected_latency_start = %"PRIu64"\n", expected_latency_start); } @@ -848,6 +876,11 @@ static void socket_writeable(aeEventLoop *loop, int fd, void *data, int mask) { int msec_to_wait = round((time_usec_to_wait / 1000.0L) + 0.5); // Not yet time to send. Delay: + if (cfg.rate_by_thread) { + // in case the rate is controlled by the thread + // we wait 1ms for the specific connection + msec_to_wait = 1; + } aeDeleteFileEvent(loop, fd, AE_WRITABLE); aeCreateTimeEvent( thread->loop, msec_to_wait, delay_request, c, NULL); @@ -868,7 +901,7 @@ static void socket_writeable(aeEventLoop *loop, int fd, void *data, int mask) { c->start = time_us(); if (!c->has_pending) { c->actual_latency_start = c->start; - c->complete_at_last_batch_start = c->complete; + c->complete_at_last_batch_start = c->rate_handler.sent; c->has_pending = true; } c->pending = cfg.pipeline; @@ -950,6 +983,7 @@ static struct option longopts[] = { { "version", no_argument, NULL, 'v' }, { "rate", required_argument, NULL, 'R' }, { "warmup", no_argument, NULL, 'W' }, + { "rate_per_thread",no_argument, NULL, 'r' }, { NULL, 0, NULL, 0 } }; @@ -1010,6 +1044,9 @@ static int parse_args(struct config *cfg, char **url, struct http_parser_url *pa case 'W': cfg->warmup = true; break; + case 'r': + cfg->rate_by_thread = true; + break; case 'h': case '?': case ':': diff --git a/src/wrk.h b/src/wrk.h index 9396e47..7c233c1 100644 --- a/src/wrk.h +++ b/src/wrk.h @@ -27,6 +27,16 @@ #define STOP_CHECK_INTERNAL_MS 2000 #define THREAD_SYNC_INTERVAL_MS 1000 +typedef struct { + double throughput; + uint64_t sent; + bool caught_up; + uint64_t catch_up_start_time; + uint64_t complete_at_catch_up_start; + double catch_up_throughput; + uint64_t thread_start; +} rate_handler_t; + typedef struct { pthread_t thread; aeEventLoop *loop; @@ -40,7 +50,6 @@ typedef struct { uint64_t requests; uint64_t bytes; uint64_t start; - double throughput; uint64_t mean; struct hdr_histogram *latency_histogram; struct hdr_histogram *u_latency_histogram; @@ -49,6 +58,7 @@ typedef struct { errors errors; struct connection *cs; char *local_ip; + rate_handler_t rate_handler; } thread; typedef struct { @@ -66,13 +76,8 @@ typedef struct connection { int fd; int connect_mask; SSL *ssl; - double throughput; - double catch_up_throughput; - uint64_t complete; + rate_handler_t rate_handler; uint64_t complete_at_last_batch_start; - uint64_t catch_up_start_time; - uint64_t complete_at_catch_up_start; - uint64_t thread_start; uint64_t start; char *request; size_t length; @@ -84,7 +89,6 @@ typedef struct connection { uint64_t actual_latency_start; bool is_connected; bool has_pending; - bool caught_up; // Internal tracking numbers (used purely for debugging): uint64_t latest_should_send_time; uint64_t latest_expected_start; From c0bc61800679690422d57d96473a7fee2950b995 Mon Sep 17 00:00:00 2001 From: Iftah Levi Date: Wed, 23 Mar 2022 16:50:27 +0000 Subject: [PATCH 2/2] remove -U option Signed-off-by: Iftah Levi --- src/wrk.c | 58 +------------------------------------------------------ 1 file changed, 1 insertion(+), 57 deletions(-) diff --git a/src/wrk.c b/src/wrk.c index 0806833..59b2807 100644 --- a/src/wrk.c +++ b/src/wrk.c @@ -34,7 +34,6 @@ static struct config { uint64_t delay_ms; uint64_t warmup_timeout; bool latency; - bool u_latency; bool dynamic; bool record_all_responses; bool warmup; @@ -86,7 +85,6 @@ static void usage() { " -s, --script Load Lua script file \n" " -H, --header Add header to request \n" " -L --latency Print latency statistics \n" - " -U --u_latency Print uncorrected latency statistics\n" " --timeout Socket/request timeout \n" " -B, --batch_latency Measure latency of whole \n" " batches of pipelined ops \n" @@ -243,8 +241,6 @@ int main(int argc, char **argv) { struct hdr_histogram* latency_histogram; hdr_init(1, MAX_LATENCY, 3, &latency_histogram); - struct hdr_histogram* u_latency_histogram; - hdr_init(1, MAX_LATENCY, 3, &u_latency_histogram); uint64_t phase_normal_start_min = 0; @@ -278,7 +274,6 @@ int main(int argc, char **argv) { errors.reconnect += t->errors.reconnect; hdr_add(latency_histogram, t->latency_histogram); - hdr_add(u_latency_histogram, t->u_latency_histogram); } long double runtime_s = runtime_us / 1000000.0; @@ -300,13 +295,6 @@ int main(int argc, char **argv) { printf("----------------------------------------------------------\n"); } - if (cfg.u_latency) { - printf("\n"); - print_hdr_latency(u_latency_histogram, - "Uncorrected Latency (measured without taking delayed starts into account)"); - printf("----------------------------------------------------------\n"); - } - char *runtime_msg = format_time_us(runtime_us); printf(" %"PRIu64" requests in %s, %sB read\n", @@ -364,7 +352,6 @@ void *thread_main(void *arg) { thread->cs = zcalloc(thread->connections * sizeof(connection)); tinymt64_init(&thread->rand, time_us()); hdr_init(1, MAX_LATENCY, 3, &thread->latency_histogram); - hdr_init(1, MAX_LATENCY, 3, &thread->u_latency_histogram); char *request = NULL; size_t length = 0; @@ -553,7 +540,6 @@ static int calibrate(aeEventLoop *loop, long long id, void *data) { thread->mean = (uint64_t) mean; hdr_reset(thread->latency_histogram); - hdr_reset(thread->u_latency_histogram); thread->start = time_us(); thread->interval = interval; @@ -734,41 +720,6 @@ static int response_complete(http_parser *parser) { // Count all responses (including pipelined ones:) c->rate_handler.sent++; - // Note that expected start time is computed based on the completed - // response count seen at the beginning of the last request batch sent. - // A single request batch send may contain multiple requests, and - // result in multiple responses. If we incorrectly calculated expect - // start time based on the completion count of these individual pipelined - // requests we can easily end up "gifting" them time and seeing - // negative latencies. - uint64_t expected_latency_start = c->rate_handler.thread_start + - (c->complete_at_last_batch_start / c->rate_handler.throughput); - - - int64_t expected_latency_timing = now - expected_latency_start; - - if (expected_latency_timing < 0) { - printf("\n\n ---------- \n\n"); - printf("We are about to crash and die (recoridng a negative #)"); - printf("This wil never ever ever happen..."); - printf("But when it does. The following information will help in debugging"); - printf("response_complete:\n"); - printf(" expected_latency_timing = %"PRId64"\n", expected_latency_timing); - printf(" now = %"PRIu64"\n", now); - printf(" expected_latency_start = %"PRIu64"\n", expected_latency_start); - printf(" c->thread_start = %"PRIu64"\n", c->rate_handler.thread_start); - printf(" c->complete = %"PRIu64"\n", c->rate_handler.sent); - printf(" throughput = %g\n", c->rate_handler.throughput); - printf(" latest_should_send_time = %"PRIu64"\n", c->latest_should_send_time); - printf(" latest_expected_start = %"PRIu64"\n", c->latest_expected_start); - printf(" latest_connect = %"PRIu64"\n", c->latest_connect); - printf(" latest_write = %"PRIu64"\n", c->latest_write); - - expected_latency_start = c->rate_handler.thread_start + - ((c->rate_handler.sent ) / c->rate_handler.throughput); - printf(" next expected_latency_start = %"PRIu64"\n", expected_latency_start); - } - c->latest_should_send_time = 0; c->latest_expected_start = 0; @@ -779,10 +730,8 @@ static int response_complete(http_parser *parser) { // Record if needed, either last in batch or all, depending in cfg: if (cfg.record_all_responses || !c->has_pending) { - hdr_record_value(thread->latency_histogram, expected_latency_timing); - uint64_t actual_latency_timing = now - c->actual_latency_start; - hdr_record_value(thread->u_latency_histogram, actual_latency_timing); + hdr_record_value(thread->latency_histogram, actual_latency_timing); } @@ -976,7 +925,6 @@ static struct option longopts[] = { { "script", required_argument, NULL, 's' }, { "header", required_argument, NULL, 'H' }, { "latency", no_argument, NULL, 'L' }, - { "u_latency", no_argument, NULL, 'U' }, { "batch_latency", no_argument, NULL, 'B' }, { "timeout", required_argument, NULL, 'T' }, { "help", no_argument, NULL, 'h' }, @@ -1026,10 +974,6 @@ static int parse_args(struct config *cfg, char **url, struct http_parser_url *pa case 'B': cfg->record_all_responses = false; break; - case 'U': - cfg->latency = true; - cfg->u_latency = true; - break; case 'T': if (scan_time(optarg, &cfg->timeout)) return -1; cfg->timeout *= 1000;