Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 58 additions & 77 deletions src/wrk.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ static struct config {
uint64_t delay_ms;
uint64_t warmup_timeout;
bool latency;
bool u_latency;
bool dynamic;
bool record_all_responses;
bool warmup;
bool rate_by_thread;
char *host;
char *script;
char *local_ip;
Expand Down Expand Up @@ -85,7 +85,6 @@ static void usage() {
" -s, --script <S> Load Lua script file \n"
" -H, --header <H> Add header to request \n"
" -L --latency Print latency statistics \n"
" -U --u_latency Print uncorrected latency statistics\n"
" --timeout <T> Socket/request timeout \n"
" -B, --batch_latency Measure latency of whole \n"
" batches of pipelined ops \n"
Expand All @@ -97,6 +96,9 @@ static void usage() {
" -W --warmup Enable warmup phase \n"
" In warmup phase connections are establised,\n"
" but no requests are sent \n"
" -r --rate_per_thread Use rate limit per thread \n"
" instead of per connection. \n"
" Might help to reduce traffic bursts\n"
" \n"
" \n"
" Numeric arguments may include a SI unit (1k, 1M, 1G)\n"
Expand Down Expand Up @@ -194,7 +196,7 @@ int main(int argc, char **argv) {
// TODO Review whether we can reduce number of events per thread
t->loop = aeCreateEventLoop(10 + cfg.connections * 3);
t->connections = connections;
t->throughput = throughput;
t->rate_handler.throughput = throughput;
t->stop_at = stop_at;

if (local_ip_nr > 0)
Expand Down Expand Up @@ -239,8 +241,6 @@ int main(int argc, char **argv) {

struct hdr_histogram* latency_histogram;
hdr_init(1, MAX_LATENCY, 3, &latency_histogram);
struct hdr_histogram* u_latency_histogram;
hdr_init(1, MAX_LATENCY, 3, &u_latency_histogram);

uint64_t phase_normal_start_min = 0;

Expand Down Expand Up @@ -274,7 +274,6 @@ int main(int argc, char **argv) {
errors.reconnect += t->errors.reconnect;

hdr_add(latency_histogram, t->latency_histogram);
hdr_add(u_latency_histogram, t->u_latency_histogram);
}

long double runtime_s = runtime_us / 1000000.0;
Expand All @@ -296,13 +295,6 @@ int main(int argc, char **argv) {
printf("----------------------------------------------------------\n");
}

if (cfg.u_latency) {
printf("\n");
print_hdr_latency(u_latency_histogram,
"Uncorrected Latency (measured without taking delayed starts into account)");
printf("----------------------------------------------------------\n");
}

char *runtime_msg = format_time_us(runtime_us);

printf(" %"PRIu64" requests in %s, %sB read\n",
Expand Down Expand Up @@ -360,7 +352,6 @@ void *thread_main(void *arg) {
thread->cs = zcalloc(thread->connections * sizeof(connection));
tinymt64_init(&thread->rand, time_us());
hdr_init(1, MAX_LATENCY, 3, &thread->latency_histogram);
hdr_init(1, MAX_LATENCY, 3, &thread->u_latency_histogram);

char *request = NULL;
size_t length = 0;
Expand All @@ -369,19 +360,26 @@ void *thread_main(void *arg) {
script_request(thread->L, &request, &length);
}

double throughput = (thread->throughput / 1000000.0) / thread->connections;
double throughput = (thread->rate_handler.throughput / 1000000.0) / thread->connections;

connection *c = thread->cs;

if (cfg.rate_by_thread) {
thread->rate_handler.throughput = thread->rate_handler.throughput / 1000000.0;
thread->rate_handler.catch_up_throughput = thread->rate_handler.throughput * 2;
thread->rate_handler.sent = 0;
thread->rate_handler.caught_up = true;
}

for (uint64_t i = 0; i < thread->connections; i++, c++) {
c->thread = thread;
c->ssl = cfg.ctx ? SSL_new(cfg.ctx) : NULL;
c->request = request;
c->length = length;
c->throughput = throughput;
c->catch_up_throughput = throughput * 2;
c->complete = 0;
c->caught_up = true;
c->rate_handler.throughput = throughput;
c->rate_handler.catch_up_throughput = throughput * 2;
c->rate_handler.sent = 0;
c->rate_handler.caught_up = true;
// Stagger connects 5 msec apart within thread:
aeCreateTimeEvent(loop, i * 5, delayed_initial_connect, c, NULL);
}
Expand Down Expand Up @@ -525,7 +523,7 @@ static int reconnect_socket(thread *thread, connection *c) {

static int delayed_initial_connect(aeEventLoop *loop, long long id, void *data) {
connection* c = data;
c->thread_start = time_us();
c->rate_handler.thread_start = time_us();
connect_socket(c->thread, c);
return AE_NOMORE;
}
Expand All @@ -542,7 +540,6 @@ static int calibrate(aeEventLoop *loop, long long id, void *data) {

thread->mean = (uint64_t) mean;
hdr_reset(thread->latency_histogram);
hdr_reset(thread->u_latency_histogram);

thread->start = time_us();
thread->interval = interval;
Expand Down Expand Up @@ -631,50 +628,66 @@ static int response_body(http_parser *parser, const char *at, size_t len) {

static uint64_t usec_to_next_send(connection *c) {
uint64_t now = time_us();
rate_handler_t *rate_handler =
(cfg.rate_by_thread) ? &c->thread->rate_handler : &c->rate_handler;

uint64_t next_start_time = c->thread_start + (c->complete / c->throughput);
uint64_t next_start_time =
rate_handler->thread_start + (rate_handler->sent / rate_handler->throughput);

bool send_now = true;

if (cfg.rate_by_thread) {
if (rate_handler->sent == 0) {
rate_handler->thread_start = now;
goto ret;
}
}

if (next_start_time > now) {
// We are on pace. Indicate caught_up and don't send now.
c->caught_up = true;
rate_handler->caught_up = true;
send_now = false;
} else {
// We are behind
if (c->caught_up) {
if (rate_handler->caught_up) {
// This is the first fall-behind since we were last caught up
c->caught_up = false;
c->catch_up_start_time = now;
c->complete_at_catch_up_start = c->complete;
rate_handler->caught_up = false;
rate_handler->catch_up_start_time = now;
rate_handler->complete_at_catch_up_start = rate_handler->sent;
}

// Figure out if it's time to send, per catch up throughput:
uint64_t complete_since_catch_up_start =
c->complete - c->complete_at_catch_up_start;
rate_handler->sent - rate_handler->complete_at_catch_up_start;

next_start_time = c->catch_up_start_time +
(complete_since_catch_up_start / c->catch_up_throughput);
next_start_time = rate_handler->catch_up_start_time +
(complete_since_catch_up_start / rate_handler->catch_up_throughput);

if (next_start_time > now) {
// Not yet time to send, even at catch-up throughout:
send_now = false;
}
}

ret:
if (send_now) {
c->latest_should_send_time = now;
c->latest_expected_start = next_start_time;
if (cfg.rate_by_thread) {
rate_handler->sent++;
}
}

return send_now ? 0 : (next_start_time - now);
}

static int delay_request(aeEventLoop *loop, long long id, void *data) {
connection* c = data;
uint64_t time_usec_to_wait = usec_to_next_send(c);
if (time_usec_to_wait) {
return round((time_usec_to_wait / 1000.0L) + 0.5); /* don't send, wait */
if (!cfg.rate_by_thread) {
uint64_t time_usec_to_wait = usec_to_next_send(c);
if (time_usec_to_wait) {
return round((time_usec_to_wait / 1000.0L) + 0.5); /* don't send, wait */
}
}
aeCreateFileEvent(c->thread->loop, c->fd, AE_WRITABLE, socket_writeable, c);
return AE_NOMORE;
Expand Down Expand Up @@ -705,41 +718,7 @@ static int response_complete(http_parser *parser) {
}

// Count all responses (including pipelined ones:)
c->complete++;

// Note that expected start time is computed based on the completed
// response count seen at the beginning of the last request batch sent.
// A single request batch send may contain multiple requests, and
// result in multiple responses. If we incorrectly calculated expect
// start time based on the completion count of these individual pipelined
// requests we can easily end up "gifting" them time and seeing
// negative latencies.
uint64_t expected_latency_start = c->thread_start +
(c->complete_at_last_batch_start / c->throughput);

int64_t expected_latency_timing = now - expected_latency_start;

if (expected_latency_timing < 0) {
printf("\n\n ---------- \n\n");
printf("We are about to crash and die (recoridng a negative #)");
printf("This wil never ever ever happen...");
printf("But when it does. The following information will help in debugging");
printf("response_complete:\n");
printf(" expected_latency_timing = %"PRId64"\n", expected_latency_timing);
printf(" now = %"PRIu64"\n", now);
printf(" expected_latency_start = %"PRIu64"\n", expected_latency_start);
printf(" c->thread_start = %"PRIu64"\n", c->thread_start);
printf(" c->complete = %"PRIu64"\n", c->complete);
printf(" throughput = %g\n", c->throughput);
printf(" latest_should_send_time = %"PRIu64"\n", c->latest_should_send_time);
printf(" latest_expected_start = %"PRIu64"\n", c->latest_expected_start);
printf(" latest_connect = %"PRIu64"\n", c->latest_connect);
printf(" latest_write = %"PRIu64"\n", c->latest_write);

expected_latency_start = c->thread_start +
((c->complete ) / c->throughput);
printf(" next expected_latency_start = %"PRIu64"\n", expected_latency_start);
}
c->rate_handler.sent++;

c->latest_should_send_time = 0;
c->latest_expected_start = 0;
Expand All @@ -751,10 +730,8 @@ static int response_complete(http_parser *parser) {

// Record if needed, either last in batch or all, depending in cfg:
if (cfg.record_all_responses || !c->has_pending) {
hdr_record_value(thread->latency_histogram, expected_latency_timing);

uint64_t actual_latency_timing = now - c->actual_latency_start;
hdr_record_value(thread->u_latency_histogram, actual_latency_timing);
hdr_record_value(thread->latency_histogram, actual_latency_timing);
}


Expand Down Expand Up @@ -848,6 +825,11 @@ static void socket_writeable(aeEventLoop *loop, int fd, void *data, int mask) {
int msec_to_wait = round((time_usec_to_wait / 1000.0L) + 0.5);

// Not yet time to send. Delay:
if (cfg.rate_by_thread) {
// in case the rate is controlled by the thread
// we wait 1ms for the specific connection
msec_to_wait = 1;
}
aeDeleteFileEvent(loop, fd, AE_WRITABLE);
aeCreateTimeEvent(
thread->loop, msec_to_wait, delay_request, c, NULL);
Expand All @@ -868,7 +850,7 @@ static void socket_writeable(aeEventLoop *loop, int fd, void *data, int mask) {
c->start = time_us();
if (!c->has_pending) {
c->actual_latency_start = c->start;
c->complete_at_last_batch_start = c->complete;
c->complete_at_last_batch_start = c->rate_handler.sent;
c->has_pending = true;
}
c->pending = cfg.pipeline;
Expand Down Expand Up @@ -943,13 +925,13 @@ static struct option longopts[] = {
{ "script", required_argument, NULL, 's' },
{ "header", required_argument, NULL, 'H' },
{ "latency", no_argument, NULL, 'L' },
{ "u_latency", no_argument, NULL, 'U' },
{ "batch_latency", no_argument, NULL, 'B' },
{ "timeout", required_argument, NULL, 'T' },
{ "help", no_argument, NULL, 'h' },
{ "version", no_argument, NULL, 'v' },
{ "rate", required_argument, NULL, 'R' },
{ "warmup", no_argument, NULL, 'W' },
{ "rate_per_thread",no_argument, NULL, 'r' },
{ NULL, 0, NULL, 0 }
};

Expand Down Expand Up @@ -992,10 +974,6 @@ static int parse_args(struct config *cfg, char **url, struct http_parser_url *pa
case 'B':
cfg->record_all_responses = false;
break;
case 'U':
cfg->latency = true;
cfg->u_latency = true;
break;
case 'T':
if (scan_time(optarg, &cfg->timeout)) return -1;
cfg->timeout *= 1000;
Expand All @@ -1010,6 +988,9 @@ static int parse_args(struct config *cfg, char **url, struct http_parser_url *pa
case 'W':
cfg->warmup = true;
break;
case 'r':
cfg->rate_by_thread = true;
break;
case 'h':
case '?':
case ':':
Expand Down
20 changes: 12 additions & 8 deletions src/wrk.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@
#define STOP_CHECK_INTERNAL_MS 2000
#define THREAD_SYNC_INTERVAL_MS 1000

typedef struct {
double throughput;
uint64_t sent;
bool caught_up;
uint64_t catch_up_start_time;
uint64_t complete_at_catch_up_start;
double catch_up_throughput;
uint64_t thread_start;
} rate_handler_t;

typedef struct {
pthread_t thread;
aeEventLoop *loop;
Expand All @@ -40,7 +50,6 @@ typedef struct {
uint64_t requests;
uint64_t bytes;
uint64_t start;
double throughput;
uint64_t mean;
struct hdr_histogram *latency_histogram;
struct hdr_histogram *u_latency_histogram;
Expand All @@ -49,6 +58,7 @@ typedef struct {
errors errors;
struct connection *cs;
char *local_ip;
rate_handler_t rate_handler;
} thread;

typedef struct {
Expand All @@ -66,13 +76,8 @@ typedef struct connection {
int fd;
int connect_mask;
SSL *ssl;
double throughput;
double catch_up_throughput;
uint64_t complete;
rate_handler_t rate_handler;
uint64_t complete_at_last_batch_start;
uint64_t catch_up_start_time;
uint64_t complete_at_catch_up_start;
uint64_t thread_start;
uint64_t start;
char *request;
size_t length;
Expand All @@ -84,7 +89,6 @@ typedef struct connection {
uint64_t actual_latency_start;
bool is_connected;
bool has_pending;
bool caught_up;
// Internal tracking numbers (used purely for debugging):
uint64_t latest_should_send_time;
uint64_t latest_expected_start;
Expand Down