-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathib.c
More file actions
444 lines (399 loc) · 11.1 KB
/
ib.c
File metadata and controls
444 lines (399 loc) · 11.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
/*
* Copyright (c) 2016--2021 Wu, Xingbo <wuxb45@gmail.com>
*
* All rights reserved. No warranty, explicit or implicit, provided.
*/
#define _GNU_SOURCE
#include "lib.h"
#include "ib.h"
// define/struct {{{
#define IB1_SGE_DEPTH ((1))
#define IB1_SOCKET_QP_INIT ((IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS))
#define IB1_DATAGRAM_QP_INIT ((IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_QKEY))
#define IB1_SOCKET_QP_RTR_UC ((IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN))
#define IB1_SOCKET_QP_RTR_RC ((IB1_SOCKET_QP_RTR_UC | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER))
#define IB1_SOCKET_QP_RTS_UC ((IBV_QP_STATE | IBV_QP_SQ_PSN))
#define IB1_SOCKET_QP_RTS_RC ((IB1_SOCKET_QP_RTS_UC | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_MAX_QP_RD_ATOMIC))
#define IB1_MR_FLAGS ((IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_ATOMIC))
#define IB1_PSN ((1000))
#define IB1_MAX_INLINE_SIZE ((60))
struct ib_port {
struct ibv_context * ctx; // context of device
u8 port_id;
struct ibv_pd * pd; // protection domain
};
struct ib_peer {
int lid;
int qpn;
};
// }}} define/struct
// port/memory {{{
struct ib_port *
ib_port_create(const u64 index)
{
int num = 0;
struct ibv_device ** const list = ibv_get_device_list(&num);
u64 base = 0;
for (int i = 0; i < num; i++) {
struct ibv_context * const ctx = ibv_open_device(list[i]);
if (ctx == NULL) {
ibv_free_device_list(list);
return NULL;
}
struct ibv_device_attr attr = {};
const int q = ibv_query_device(ctx, &attr);
(void)q;
debug_assert(q == 0);
const u8 np = attr.phys_port_cnt;
if ((base + np) > index) {
struct ib_port * const port = (typeof(port))malloc(sizeof(*port));
debug_assert(port);
port->ctx = ctx;
port->port_id = index - base + 1;
port->pd = ibv_alloc_pd(ctx);
debug_assert(port->pd);
ibv_free_device_list(list);
return port;
}
ibv_close_device(ctx);
base += np;
}
ibv_free_device_list(list);
return NULL;
}
inline void
ib_port_destroy(struct ib_port * const port)
{
ibv_dealloc_pd(port->pd);
ibv_close_device(port->ctx);
free(port);
}
inline struct ibv_mr *
ib_mr_helper(struct ib_port *port, void *addr, size_t length)
{
return ibv_reg_mr(port->pd, addr, length, IB1_MR_FLAGS);
}
inline bool
ib_mr_send_rptr_fd(const int fd, struct ibv_mr * const mr)
{
struct ib_remote_ptr rptr = {.rptr = mr->addr, .len = mr->length, .rkey = mr->rkey};
return sizeof(rptr) == write(fd, &rptr, sizeof(rptr));
}
inline bool
ib_mr_recv_rptr_fd(const int fd, struct ib_remote_ptr * const rptr)
{
return sizeof(*rptr) == read(fd, rptr, sizeof(*rptr));
}
inline void *
ib_sge_fill_off(struct ibv_sge * const sg, struct ibv_mr * const mr, const size_t off, const size_t size)
{
debug_assert((off + size) <= mr->length);
sg->addr = ((u64)mr->addr) + off;
sg->length = size;
sg->lkey = mr->lkey;
return (void *)sg->addr;
}
inline u64
ib_sge_fill_ptr(struct ibv_sge * const sg, struct ibv_mr * const mr, void * const ptr, const size_t size)
{
debug_assert((((u64)ptr) + size - ((u64)mr->addr)) <= mr->length);
debug_assert(ptr >= mr->addr);
sg->addr = (u64)ptr;
sg->length = size;
sg->lkey = mr->lkey;
return ptr - mr->addr;
}
// }}} port/memory
// create/destroy {{{
struct ib_socket *
ib_socket_create(struct ib_port * const port, const u64 depth)
{
struct ib_socket * const s = (typeof(s))calloc(1, sizeof(*s) + (sizeof(s->wcs[0]) * depth));
debug_assert(s);
s->port = port;
s->depth = depth;
// cq
s->cq = ibv_create_cq(port->ctx, depth*2, NULL, NULL, 0);
debug_assert(s->cq);
// qp
struct ibv_qp_init_attr iattr = {};
iattr.send_cq = s->cq;
iattr.recv_cq = s->cq;
iattr.qp_type = IBV_QPT_RC;
iattr.sq_sig_all = 0;
iattr.cap.max_send_wr = depth*2;
iattr.cap.max_recv_wr = depth*2;
iattr.cap.max_send_sge = IB1_SGE_DEPTH;
iattr.cap.max_recv_sge = IB1_SGE_DEPTH;
iattr.cap.max_inline_data = IB1_MAX_INLINE_SIZE;
s->qp = ibv_create_qp(port->pd, &iattr);
debug_assert(s->qp);
// qp init state
struct ibv_qp_attr qattr = {};
qattr.qp_state = IBV_QPS_INIT;
qattr.pkey_index = 0;
qattr.port_num = port->port_id;
qattr.qp_access_flags = IB1_MR_FLAGS;
const int r = ibv_modify_qp(s->qp, &qattr, IB1_SOCKET_QP_INIT);
(void)r;
debug_assert(r == 0);
return s;
}
static bool
ib_socket_connect(struct ib_socket * const s, struct ib_peer * const r)
{
// RTR
struct ibv_qp_attr qattr = {};
qattr.qp_state = IBV_QPS_RTR;
qattr.path_mtu = IBV_MTU_4096;
qattr.dest_qp_num = r->qpn;
qattr.rq_psn = IB1_PSN;
qattr.ah_attr.is_global = 0;
qattr.ah_attr.dlid = r->lid;
qattr.ah_attr.sl = 0;
qattr.ah_attr.src_path_bits = 0;
qattr.ah_attr.port_num = s->port->port_id;
// RC only
qattr.max_dest_rd_atomic = 16;
qattr.min_rnr_timer = 12;
const int mask1 = IB1_SOCKET_QP_RTR_RC;
const int r1 = ibv_modify_qp(s->qp, &qattr, mask1);
if (r1 != 0)
return false;
// RTS
qattr.qp_state = IBV_QPS_RTS;
qattr.sq_psn = IB1_PSN;
// RC only
qattr.timeout = 14;
qattr.retry_cnt = 7;
qattr.rnr_retry = 7;
qattr.max_rd_atomic = 16;
qattr.max_dest_rd_atomic = 16;
const int mask2 = IB1_SOCKET_QP_RTS_RC;
const int r2 = ibv_modify_qp(s->qp, &qattr, mask2);
return r2 == 0;
}
bool
ib_socket_connect_fd(struct ib_socket * const s, const int fd)
{
struct ibv_port_attr attr = {};
if (ibv_query_port(s->port->ctx, s->port->port_id, &attr) != 0) {
fprintf(stderr, "ibv_query_port failed\n");
return false;
}
struct ib_peer local = {.lid = attr.lid, .qpn = s->qp->qp_num};
if (write(fd, &local, sizeof(local)) != sizeof(local)) {
fprintf(stderr, "send remote failed\n");
return false;
}
struct ib_peer remote = {};
if (read(fd, &remote, sizeof(remote)) != sizeof(remote)) {
fprintf(stderr, "recv remote failed\n");
return false;
}
const bool rc = ib_socket_connect(s, &remote);
if (rc == false) {
fprintf(stderr, "connect failed\n");
}
return rc;
}
void
ib_socket_destroy(struct ib_socket * const s)
{
ibv_destroy_qp(s->qp);
ibv_destroy_cq(s->cq);
free(s);
}
// }}} create/destroy
// post {{{
void
ib_wr_fill_send(struct ibv_send_wr * const wr, const u64 id, struct ibv_sge * const sg)
{
wr->wr_id = id;
wr->next = NULL;
wr->sg_list = sg;
wr->num_sge = (sg && sg->length) ? 1 : 0;
wr->opcode = IBV_WR_SEND;
wr->send_flags = 0;
if (sg && (sg->length <= IB1_MAX_INLINE_SIZE))
wr->send_flags |= IBV_SEND_INLINE;
}
void
ib_wr_fill_send_imm(struct ibv_send_wr * const wr, const u64 id, struct ibv_sge * const sg, const u32 imm)
{
wr->wr_id = id;
wr->next = NULL;
wr->sg_list = sg;
wr->num_sge = (sg && sg->length) ? 1 : 0;
wr->opcode = IBV_WR_SEND_WITH_IMM;
wr->send_flags = 0;
if (sg && (sg->length <= IB1_MAX_INLINE_SIZE))
wr->send_flags |= IBV_SEND_INLINE;
wr->imm_data = imm;
}
void
ib_wr_fill_write(struct ibv_send_wr * const wr, const u64 id, struct ibv_sge * const sg,
const struct ib_remote_ptr * const rptr, const u32 off)
{
wr->wr_id = id;
wr->next = NULL;
wr->sg_list = sg;
wr->num_sge = (sg && sg->length) ? 1 : 0;
wr->opcode = IBV_WR_RDMA_WRITE;
wr->send_flags = 0;
// RDMA
wr->wr.rdma.remote_addr = (u64)(rptr->rptr + off);
wr->wr.rdma.rkey = rptr->rkey;
}
void
ib_wr_fill_write_imm(struct ibv_send_wr * const wr, const u64 id, struct ibv_sge * const sg,
const struct ib_remote_ptr * const rptr, const u32 off, const u32 imm)
{
wr->wr_id = id;
wr->next = NULL;
wr->sg_list = sg;
wr->num_sge = (sg && sg->length) ? 1 : 0;
wr->opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
wr->send_flags = 0;
wr->imm_data = imm;
// RDMA
if (rptr) {
wr->wr.rdma.remote_addr = (u64)(rptr->rptr + off);
wr->wr.rdma.rkey = rptr->rkey;
} else {
wr->wr.rdma.remote_addr = 0;
wr->wr.rdma.rkey = 0;
}
}
void
ib_wr_fill_read(struct ibv_send_wr * const wr, const u64 id, struct ibv_sge * const sg,
const struct ib_remote_ptr * const rptr, const u32 off)
{
wr->wr_id = id;
wr->next = NULL;
wr->sg_list = sg;
wr->num_sge = (sg && sg->length) ? 1 : 0;
wr->opcode = IBV_WR_RDMA_READ;
wr->send_flags = 0;
// RDMA
wr->wr.rdma.remote_addr = (u64)(rptr->rptr + off);
wr->wr.rdma.rkey = rptr->rkey;
}
void
ib_wr_fill_recv(struct ibv_recv_wr * const wr, const u64 id, struct ibv_sge * const sg,
struct ibv_recv_wr * const prev)
{
wr->wr_id = id;
wr->next = NULL;
wr->sg_list = sg;
wr->num_sge = (sg && sg->length) ? 1 : 0;
if (prev)
prev->next = wr;
}
void
ib_swr_link(struct ibv_send_wr * const wrs, const u32 nr)
{
const u32 nr1 = nr-1;
for (u32 i = 0; i < nr1; i++)
wrs[i].next = wrs + i + 1;
wrs[nr1].next = NULL;
}
void
ib_rwr_link(struct ibv_recv_wr * const wrs, const u32 nr)
{
const u32 nr1 = nr-1;
for (u32 i = 0; i < nr1; i++)
wrs[i].next = wrs + i + 1;
wrs[nr1].next = NULL;
}
bool
ib_socket_post_send(struct ib_socket * const s, struct ibv_send_wr * const wr, const bool signaled)
{
if (signaled)
wr->send_flags |= IBV_SEND_SIGNALED;
struct ibv_send_wr * bad_wr = NULL;
const int r = ibv_post_send(s->qp, wr, &bad_wr);
if (r)
fprintf(stderr, "%s errno %d\n", __func__, r);
return r == 0;
}
bool
ib_socket_post_recv_sge(struct ib_socket * const s, const u64 id, struct ibv_sge * const sg)
{
struct ibv_recv_wr rwr;
rwr.wr_id = id;
rwr.next = NULL;
rwr.sg_list = sg;
rwr.num_sge = (sg && sg->length) ? 1 : 0;
struct ibv_recv_wr * bad_wr = NULL;
const int r = ibv_post_recv(s->qp, &rwr, &bad_wr);
if (r)
fprintf(stderr, "%s errno %d\n", __func__, r);
return r == 0;
}
bool
ib_socket_post_recv(struct ib_socket * const s, struct ibv_recv_wr * const wr)
{
struct ibv_recv_wr * bad_wr = NULL;
const int r = ibv_post_recv(s->qp, wr, &bad_wr);
if (r)
fprintf(stderr, "%s errno %d\n", __func__, r);
return r == 0;
}
// }}} post
// cq {{{
void
ib_wc_fprint(struct ibv_wc * const wcs, int n, FILE * const fout)
{
if (n < 0) {
fprintf(fout, "poll failed\n");
return;
}
for (int i = 0; i < n; i++) {
struct ibv_wc * const wc = wcs + i;
if (wc->status != IBV_WC_SUCCESS)
fprintf(fout, "status %d\n", wc->status);
const char * opc = "";
switch (wc->opcode) {
case IBV_WC_SEND:
opc = TERMCLR(31) "SEND" TERMCLR(0);
break;
case IBV_WC_RDMA_WRITE:
opc = TERMCLR(32) "WRITE" TERMCLR(0);
break;
case IBV_WC_RDMA_READ:
opc = TERMCLR(34) "READ" TERMCLR(0);
break;
case IBV_WC_COMP_SWAP:
opc = "CAS";
break;
case IBV_WC_FETCH_ADD:
opc = "FADD";
break;
case IBV_WC_BIND_MW:
opc = "BMW";
break;
case IBV_WC_RECV:
opc = TERMCLR(35) "RECV" TERMCLR(0);
break;
case IBV_WC_RECV_RDMA_WITH_IMM:
opc = TERMCLR(35) "WIMM" TERMCLR(0);
break;
default:
break;
}
fprintf(fout, "%s %s len %u imm %u (0x%x)\n",
__func__, opc, wc->byte_len, wc->imm_data, wc->imm_data);
}
}
int
ib_socket_poll(struct ib_socket * const s)
{
int r = ibv_poll_cq(s->cq, s->depth, s->wcs);
#if 0
ib_wc_fprint(s->wcs, r, stderr);
#endif
return r;
}
// }}} cq
// vim:fdm=marker