-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathibkvser.c
More file actions
210 lines (190 loc) · 5.84 KB
/
ibkvser.c
File metadata and controls
210 lines (190 loc) · 5.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
/*
* Copyright (c) 2016--2021 Wu, Xingbo <wuxb45@gmail.com>
*
* All rights reserved. No warranty, explicit or implicit, provided.
*/
#define _GNU_SOURCE
#include "lib.h"
#include "kv.h"
#include "ib.h"
struct ibkv_inpr_info { u8 * buf; u32 buf_len; u32 off; u32 out_len; };
static void
ibkv_inpr_helper(struct kv * const curr, void * const priv)
{
struct ibkv_inpr_info * const info = (typeof(info))priv;
if (!curr) { // miss
info->out_len = 0; // miss + 0
return;
}
const u32 kvsz = kv_size(curr);
const u32 kvsz_a = bits_round_up(kvsz, 2);
if ((info->off + kvsz_a) > info->buf_len) {
info->out_len = 0; // hit + 0
return;
}
memcpy(info->buf+info->off, curr, kvsz);
info->out_len = kvsz_a;
}
// max buffer size: 64kB * 64 = 4MB
// req imm: offset = low-16-bit * 64; buf-size = high-16-bit * 64
// resp imm: offset = low-16-bit * 64; (== req's low-16-bit);
// resp-size = high-16-bit * 64 (<= buf-size)
// return the imm to be sent back
static u32
kv_process(const struct kvmap_api * const api, void * const ref,
u8 * const req_base, u8 * const res_base, const u32 req_imm)
{
const u32 req_off = (req_imm & 0xffffu) << 6;
const u32 buf_size = (req_imm >> 16) << 6;
// req
const u8 * const req_buf = req_base + req_off;
const u32 * const hdrq = (typeof(hdrq))req_buf;
const u32 nreq = hdrq[0];
u8 * const res_buf = res_base + req_off;
u32 * const hdrs = (typeof(hdrs))res_buf;
u32 reskvoff = sizeof(u32) * (nreq + 1);
struct ibkv_inpr_info inpinfo = {.buf = res_buf, .buf_len = buf_size};
for (u32 i = 1; i <= nreq; i++) {
const u32 off = hdrq[i] >> 8;
struct kv * const kv = (typeof(kv))(req_buf + off);
const u8 op = (u8)hdrq[i];
if (op == 'P') {
hdrs[i] = (((u32)kvmap_kv_probe(api, ref, kv)) << 8) | op;
} else if (op == 'G') {
inpinfo.off = reskvoff;
const bool r = kvmap_kv_inpr(api, ref, kv, ibkv_inpr_helper, &inpinfo);
if (inpinfo.out_len) {
debug_assert((reskvoff & 1) == 0);
hdrs[i] = ((reskvoff | r) << 8) | op;
reskvoff += inpinfo.out_len;
} else {
hdrs[i] = (((u32)r) << 8) | op;
}
} else if (op == 'S') {
hdrs[i] = (((u32)kvmap_kv_put(api, ref, kv)) << 8) | op;
} else if (op == 'D') {
hdrs[i] = (((u32)kvmap_kv_del(api, ref, kv)) << 8) | op;
}
}
hdrs[0] = nreq;
const u32 nlines = bits_round_up(reskvoff, 6) >> 6;
return (req_imm & 0xffffu) | (nlines << 16);
}
struct worker_info {
const struct kvmap_api * api;
void * map;
struct ib_port * ibport;
u64 depth;
};
#define BUFSIZE ((1lu << 21))
static void *
ibkv_worker(void * const ptr)
{
struct server_wi * const si = (typeof(si))ptr;
struct worker_info * const wi = (typeof(wi))server_worker_priv(si);
const u64 depth = wi->depth;
struct ib_socket * const s = ib_socket_create(wi->ibport, depth);
if (!s)
server_worker_exit(si);
const int fd = server_worker_fd(si);
const bool rc = ib_socket_connect_fd(s, fd);
if (!rc) {
ib_socket_destroy(s);
server_worker_exit(si);
}
const struct kvmap_api * const api = wi->api;
void * const map = wi->map;
void * const ref = kvmap_ref(api, map);
// TODO: handle errors
u64 memsz_req = 0, memsz_res = 0;
u8 * const mem_req = pages_alloc_best(BUFSIZE, true, &memsz_req);
struct ibv_mr * const mr_req = ib_mr_helper(wi->ibport, mem_req, memsz_req);
u8 * const mem_res = pages_alloc_best(BUFSIZE, true, &memsz_res);
struct ibv_mr * const mr_res = ib_mr_helper(wi->ibport, mem_res, memsz_res);
// send two remote memory buffers
ib_mr_send_rptr_fd(fd, mr_req);
ib_mr_send_rptr_fd(fd, mr_res);
struct ibv_send_wr swr;
bool wait_more = true;
for (u64 i = 0; i < depth; i++) {
if (!ib_socket_post_recv_sge(s, i, NULL))
debug_die();
}
do {
int n = ib_socket_poll(s);
if (n < 0) {
perror("ib_socket_poll");
break;
} else if (n == 0) {
cpu_pause();
}
for (int i = 0; i < n; i++) {
struct ibv_wc * const wc = &(s->wcs[i]);
if (wc->status != IBV_WC_SUCCESS) {
wait_more = false;
break;
}
switch (wc->opcode) {
case IBV_WC_RECV_RDMA_WITH_IMM:
// handle request
{
const u32 imm = kv_process(api, ref, mem_req, mem_res, wc->imm_data);
// send response; client will read; can use send-imm or write-imm
//ib_wr_fill_send_imm(&swr, 0, NULL, imm);
ib_wr_fill_write_imm(&swr, 0, NULL, NULL, 0, imm);
if (!ib_socket_post_send(s, &swr, true))
debug_die();
// prepare for next request
if (!ib_socket_post_recv_sge(s, 0, NULL))
debug_die();
}
break;
case IBV_WC_RECV:
if (wc->imm_data == UINT32_MAX)
wait_more = false;
break;
default:
break;
}
}
} while (wait_more);
kvmap_unref(api, ref);
ibv_dereg_mr(mr_req);
ibv_dereg_mr(mr_res);
pages_unmap(mem_req, memsz_req);
pages_unmap(mem_res, memsz_res);
ib_socket_destroy(s);
server_worker_exit(si);
return NULL;
}
int
main(int argc, char ** argv)
{
if (argc < 6) {
fprintf(stderr, "Usage: <ib-port> <host> <port> <depth> api ...\n");
kvmap_api_helper_message();
exit(0);
}
struct worker_info wi = {};
wi.ibport = ib_port_create(a2u64(argv[1]));
if (wi.ibport == NULL) {
fprintf(stderr, "ib_port_create() failed\n");
exit(0);
}
const char * const host = argv[2];
const char * const port = argv[3];
wi.depth = bits_p2_up_u64(a2u64(argv[4]));
const int n1 = kvmap_api_helper(argc - 5, argv + 5, NULL, &wi.api, &wi.map);
if (n1 < 0) {
fprintf(stderr, "kvmap_api_helper() failed\n");
exit(0);
}
struct server * const ser = server_create(host, port, ibkv_worker, &wi);
if (ser == NULL) {
fprintf(stderr, "server_create() failed\n");
exit(0);
}
server_wait_destroy(ser);
wi.api->destroy(wi.map);
return 0;
}