Skip to content

Commit 6874484

Browse files
committed
rpma: make gpspm server use separate RCQ
Also use shared completion channel. Signed-off-by: Xiao Yang <yangx.jy@fujitsu.com>
1 parent 1563444 commit 6874484

File tree

1 file changed

+133
-76
lines changed

1 file changed

+133
-76
lines changed

engines/librpma_gpspm.c

Lines changed: 133 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -374,11 +374,10 @@ struct server_data {
374374
/* resources for messaging buffer from DRAM allocated by fio */
375375
struct rpma_mr_local *msg_mr;
376376

377-
uint32_t msg_sqe_available; /* # of free SQ slots */
378-
379-
/* in-memory queues */
380-
struct ibv_wc *msgs_queued;
381-
uint32_t msg_queued_nr;
377+
/* # of free SQ slots */
378+
uint32_t msg_sqe_available;
379+
/* receive CQ */
380+
struct rpma_cq *rcq;
382381

383382
librpma_fio_persist_fn persist;
384383
};
@@ -401,13 +400,6 @@ static int server_init(struct thread_data *td)
401400
goto err_server_cleanup;
402401
}
403402

404-
/* allocate in-memory queue */
405-
sd->msgs_queued = calloc(td->o.iodepth, sizeof(*sd->msgs_queued));
406-
if (sd->msgs_queued == NULL) {
407-
td_verror(td, errno, "calloc");
408-
goto err_free_sd;
409-
}
410-
411403
#ifdef CONFIG_LIBPMEM2_INSTALLED
412404
/* get libpmem2 persist function from pmem2_map */
413405
sd->persist = pmem2_get_persist_fn(csd->mem.map);
@@ -427,9 +419,6 @@ static int server_init(struct thread_data *td)
427419

428420
return 0;
429421

430-
err_free_sd:
431-
free(sd);
432-
433422
err_server_cleanup:
434423
librpma_fio_server_cleanup(td);
435424

@@ -500,7 +489,6 @@ static void server_cleanup(struct thread_data *td)
500489
if ((ret = rpma_mr_dereg(&sd->msg_mr)))
501490
librpma_td_verror(td, ret, "rpma_mr_dereg");
502491

503-
free(sd->msgs_queued);
504492
free(sd);
505493
}
506494

@@ -533,6 +521,7 @@ static int prepare_connection(struct thread_data *td,
533521
static int server_open_file(struct thread_data *td, struct fio_file *f)
534522
{
535523
struct librpma_fio_server_data *csd = td->io_ops_data;
524+
struct server_data *sd = csd->server_data;
536525
struct rpma_conn_cfg *cfg = NULL;
537526
uint16_t max_msg_num = td->o.iodepth;
538527
int ret;
@@ -546,7 +535,7 @@ static int server_open_file(struct thread_data *td, struct fio_file *f)
546535
}
547536

548537
/*
549-
* Calculate the required queue sizes where:
538+
* The required queue sizes are:
550539
* - the send queue (SQ) has to be big enough to accommodate
551540
* all possible flush requests (SENDs)
552541
* - the receive queue (RQ) has to be big enough to accommodate
@@ -562,12 +551,25 @@ static int server_open_file(struct thread_data *td, struct fio_file *f)
562551
librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
563552
goto err_cfg_delete;
564553
}
565-
if ((ret = rpma_conn_cfg_set_cq_size(cfg, max_msg_num * 2))) {
554+
if ((ret = rpma_conn_cfg_set_cq_size(cfg, max_msg_num))) {
566555
librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
567556
goto err_cfg_delete;
568557
}
558+
if ((ret = rpma_conn_cfg_set_rcq_size(cfg, max_msg_num))) {
559+
librpma_td_verror(td, ret, "rpma_conn_cfg_set_rcq_size");
560+
goto err_cfg_delete;
561+
}
562+
if ((ret = rpma_conn_cfg_set_compl_channel(cfg, true))) {
563+
librpma_td_verror(td, ret, "rpma_conn_cfg_set_compl_channel");
564+
goto err_cfg_delete;
565+
}
566+
567+
if ((ret = librpma_fio_server_open_file(td, f, cfg)))
568+
goto err_cfg_delete;
569569

570-
ret = librpma_fio_server_open_file(td, f, cfg);
570+
/* get the connection's receive CQ */
571+
if ((ret = rpma_conn_get_rcq(csd->conn, &sd->rcq)))
572+
librpma_td_verror(td, ret, "rpma_conn_get_rcq");
571573

572574
err_cfg_delete:
573575
(void) rpma_conn_cfg_delete(&cfg);
@@ -660,92 +662,147 @@ static int server_qe_process(struct thread_data *td, struct ibv_wc *wc)
660662
return -1;
661663
}
662664

663-
static inline int server_queue_process(struct thread_data *td)
665+
/*
666+
* server_cmpl_poll - poll and process a completion
667+
*
668+
* Return value:
669+
* 0 or 1 - number of received completions
670+
* -1 - in case of an error
671+
*/
672+
static int server_cmpl_poll(struct thread_data *td, struct rpma_cq *cq,
673+
struct ibv_wc *wc)
664674
{
665675
struct librpma_fio_server_data *csd = td->io_ops_data;
666676
struct server_data *sd = csd->server_data;
667677
int ret;
668-
int i;
669678

670-
/* min(# of queue entries, # of SQ entries available) */
671-
uint32_t qes_to_process = min(sd->msg_queued_nr, sd->msg_sqe_available);
672-
if (qes_to_process == 0)
679+
ret = rpma_cq_get_wc(cq, 1, wc, NULL);
680+
if (ret == RPMA_E_NO_COMPLETION) {
681+
/* lack of completion is not an error */
673682
return 0;
674-
675-
/* process queued completions */
676-
for (i = 0; i < qes_to_process; ++i) {
677-
if ((ret = server_qe_process(td, &sd->msgs_queued[i])))
678-
return ret;
679683
}
680-
681-
/* progress the queue */
682-
for (i = 0; i < sd->msg_queued_nr - qes_to_process; ++i) {
683-
memcpy(&sd->msgs_queued[i],
684-
&sd->msgs_queued[qes_to_process + i],
685-
sizeof(sd->msgs_queued[i]));
684+
if (ret) {
685+
librpma_td_verror(td, ret, "rpma_cq_get_wc");
686+
goto err_terminate;
686687
}
687688

688-
sd->msg_queued_nr -= qes_to_process;
689+
/* validate the completion */
690+
if (wc->status != IBV_WC_SUCCESS)
691+
goto err_terminate;
689692

690-
return 0;
693+
if (wc->opcode == IBV_WC_SEND)
694+
++sd->msg_sqe_available;
695+
696+
return 1;
697+
698+
err_terminate:
699+
td->terminate = true;
700+
701+
return -1;
691702
}
692703

693-
static int server_cmpl_process(struct thread_data *td)
704+
static int server_queue_poll(struct thread_data *td)
694705
{
695706
struct librpma_fio_server_data *csd = td->io_ops_data;
696707
struct server_data *sd = csd->server_data;
697-
struct ibv_wc *wc = &sd->msgs_queued[sd->msg_queued_nr];
698-
struct librpma_fio_options_values *o = td->eo;
708+
struct ibv_wc cq_wc, rcq_wc;
699709
int ret;
700710

701-
ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL);
702-
if (ret == RPMA_E_NO_COMPLETION) {
703-
if (o->busy_wait_polling)
704-
return 0; /* lack of completion is not an error */
705-
706-
ret = rpma_cq_wait(csd->cq);
707-
if (ret == RPMA_E_NO_COMPLETION)
708-
return 0; /* lack of completion is not an error */
709-
if (ret) {
710-
librpma_td_verror(td, ret, "rpma_cq_wait");
711-
goto err_terminate;
712-
}
711+
/* process the receive completion */
712+
ret = server_cmpl_poll(td, sd->rcq, &rcq_wc);
713+
if (ret != 1)
714+
return ret;
713715

714-
ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL);
715-
if (ret == RPMA_E_NO_COMPLETION)
716-
return 0; /* lack of completion is not an error */
717-
if (ret) {
718-
librpma_td_verror(td, ret, "rpma_cq_get_wc");
719-
goto err_terminate;
720-
}
721-
} else if (ret) {
722-
librpma_td_verror(td, ret, "rpma_cq_get_wc");
723-
goto err_terminate;
716+
/* ret == 1 means rcq_wc.opcode == IBV_WC_RECV */
717+
718+
/* ensure that at least one SQ slot is available */
719+
while (sd->msg_sqe_available == 0) {
720+
/* process the send completion */
721+
ret = server_cmpl_poll(td, csd->cq, &cq_wc);
722+
if (ret < 0)
723+
return ret;
724724
}
725725

726-
/* validate the completion */
727-
if (wc->status != IBV_WC_SUCCESS)
728-
goto err_terminate;
726+
return server_qe_process(td, &rcq_wc);
727+
}
729728

730-
if (wc->opcode == IBV_WC_RECV)
731-
++sd->msg_queued_nr;
732-
else if (wc->opcode == IBV_WC_SEND)
733-
++sd->msg_sqe_available;
729+
static int server_queue_wait_poll(struct thread_data *td)
730+
{
731+
struct librpma_fio_server_data *csd = td->io_ops_data;
732+
struct server_data *sd = csd->server_data;
733+
struct rpma_cq *cq;
734+
struct ibv_wc cq_wc, rcq_wc;
735+
bool is_rcq;
736+
int ret;
734737

735-
return 0;
738+
/* process the receive completion */
739+
ret = server_cmpl_poll(td, sd->rcq, &rcq_wc);
740+
if (ret < 0)
741+
return ret;
736742

737-
err_terminate:
738-
td->terminate = true;
743+
if (ret == 0) {
744+
do {
745+
ret = rpma_conn_wait(csd->conn, &cq, &is_rcq);
746+
if (ret) {
747+
librpma_td_verror(td, ret, "rpma_conn_wait");
748+
td->terminate = true;
749+
return ret;
750+
}
751+
752+
/* process the receive or send completion */
753+
ret = server_cmpl_poll(td, cq, is_rcq ? &rcq_wc :
754+
&cq_wc);
755+
if (ret < 0)
756+
return ret;
757+
} while (!is_rcq);
758+
759+
/* return if there is still no receive completion */
760+
if (ret == 0)
761+
return ret;
762+
}
739763

740-
return -1;
764+
/* ret == 1 means rcq_wc.opcode == IBV_WC_RECV */
765+
766+
/* ensure that at least one SQ slot is available */
767+
while (sd->msg_sqe_available == 0) {
768+
/* process the send completion */
769+
ret = server_cmpl_poll(td, csd->cq, &cq_wc);
770+
if (ret < 0)
771+
return ret;
772+
773+
if (ret == 0) {
774+
do {
775+
ret = rpma_conn_wait(csd->conn, &cq, &is_rcq);
776+
if (ret) {
777+
librpma_td_verror(td, ret, "rpma_conn_wait");
778+
td->terminate = true;
779+
return ret;
780+
}
781+
} while (is_rcq);
782+
783+
/* process the send completion again */
784+
ret = server_cmpl_poll(td, cq, &cq_wc);
785+
if (ret < 0)
786+
return ret;
787+
}
788+
}
789+
790+
return server_qe_process(td, &rcq_wc);
791+
}
792+
793+
static inline int server_queue_process(struct thread_data *td)
794+
{
795+
struct librpma_fio_options_values *o = td->eo;
796+
797+
if (o->busy_wait_polling)
798+
return server_queue_poll(td);
799+
else
800+
return server_queue_wait_poll(td);
741801
}
742802

743803
static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u)
744804
{
745805
do {
746-
if (server_cmpl_process(td))
747-
return FIO_Q_BUSY;
748-
749806
if (server_queue_process(td))
750807
return FIO_Q_BUSY;
751808

0 commit comments

Comments
 (0)