@@ -374,11 +374,10 @@ struct server_data {
374374 /* resources for messaging buffer from DRAM allocated by fio */
375375 struct rpma_mr_local * msg_mr ;
376376
377- uint32_t msg_sqe_available ; /* # of free SQ slots */
378-
379- /* in-memory queues */
380- struct ibv_wc * msgs_queued ;
381- uint32_t msg_queued_nr ;
377+ /* # of free SQ slots */
378+ uint32_t msg_sqe_available ;
379+ /* receive CQ */
380+ struct rpma_cq * rcq ;
382381
383382 librpma_fio_persist_fn persist ;
384383};
@@ -401,13 +400,6 @@ static int server_init(struct thread_data *td)
401400 goto err_server_cleanup ;
402401 }
403402
404- /* allocate in-memory queue */
405- sd -> msgs_queued = calloc (td -> o .iodepth , sizeof (* sd -> msgs_queued ));
406- if (sd -> msgs_queued == NULL ) {
407- td_verror (td , errno , "calloc" );
408- goto err_free_sd ;
409- }
410-
411403#ifdef CONFIG_LIBPMEM2_INSTALLED
412404 /* get libpmem2 persist function from pmem2_map */
413405 sd -> persist = pmem2_get_persist_fn (csd -> mem .map );
@@ -427,9 +419,6 @@ static int server_init(struct thread_data *td)
427419
428420 return 0 ;
429421
430- err_free_sd :
431- free (sd );
432-
433422err_server_cleanup :
434423 librpma_fio_server_cleanup (td );
435424
@@ -500,7 +489,6 @@ static void server_cleanup(struct thread_data *td)
500489 if ((ret = rpma_mr_dereg (& sd -> msg_mr )))
501490 librpma_td_verror (td , ret , "rpma_mr_dereg" );
502491
503- free (sd -> msgs_queued );
504492 free (sd );
505493 }
506494
@@ -533,6 +521,7 @@ static int prepare_connection(struct thread_data *td,
533521static int server_open_file (struct thread_data * td , struct fio_file * f )
534522{
535523 struct librpma_fio_server_data * csd = td -> io_ops_data ;
524+ struct server_data * sd = csd -> server_data ;
536525 struct rpma_conn_cfg * cfg = NULL ;
537526 uint16_t max_msg_num = td -> o .iodepth ;
538527 int ret ;
@@ -546,7 +535,7 @@ static int server_open_file(struct thread_data *td, struct fio_file *f)
546535 }
547536
548537 /*
549- * Calculate the required queue sizes where :
538+ * The required queue sizes are :
550539 * - the send queue (SQ) has to be big enough to accommodate
551540 * all possible flush requests (SENDs)
552541 * - the receive queue (RQ) has to be big enough to accommodate
@@ -562,12 +551,25 @@ static int server_open_file(struct thread_data *td, struct fio_file *f)
562551 librpma_td_verror (td , ret , "rpma_conn_cfg_set_rq_size" );
563552 goto err_cfg_delete ;
564553 }
565- if ((ret = rpma_conn_cfg_set_cq_size (cfg , max_msg_num * 2 ))) {
554+ if ((ret = rpma_conn_cfg_set_cq_size (cfg , max_msg_num ))) {
566555 librpma_td_verror (td , ret , "rpma_conn_cfg_set_cq_size" );
567556 goto err_cfg_delete ;
568557 }
558+ if ((ret = rpma_conn_cfg_set_rcq_size (cfg , max_msg_num ))) {
559+ librpma_td_verror (td , ret , "rpma_conn_cfg_set_rcq_size" );
560+ goto err_cfg_delete ;
561+ }
562+ if ((ret = rpma_conn_cfg_set_compl_channel (cfg , true))) {
563+ librpma_td_verror (td , ret , "rpma_conn_cfg_set_compl_channel" );
564+ goto err_cfg_delete ;
565+ }
569566
570- ret = librpma_fio_server_open_file (td , f , cfg );
567+ if ((ret = librpma_fio_server_open_file (td , f , cfg )))
568+ goto err_cfg_delete ;
569+
570+ /* get the connection's receive CQ */
571+ if ((ret = rpma_conn_get_rcq (csd -> conn , & sd -> rcq )))
572+ librpma_td_verror (td , ret , "rpma_conn_get_rcq" );
571573
572574err_cfg_delete :
573575 (void ) rpma_conn_cfg_delete (& cfg );
@@ -660,92 +662,120 @@ static int server_qe_process(struct thread_data *td, struct ibv_wc *wc)
660662 return -1 ;
661663}
662664
663- static inline int server_queue_process (struct thread_data * td )
665+ /*
666+ * server_cmpl_poll - poll and process a completion
667+ *
668+ * Return value:
669+ * 0 or 1 - number of received completions
670+ * -1 - in case of an error
671+ */
672+ static int server_cmpl_poll (struct thread_data * td , struct rpma_cq * cq ,
673+ struct ibv_wc * wc )
664674{
665675 struct librpma_fio_server_data * csd = td -> io_ops_data ;
666676 struct server_data * sd = csd -> server_data ;
667677 int ret ;
668- int i ;
669678
670- /* min(# of queue entries, # of SQ entries available) */
671- uint32_t qes_to_process = min ( sd -> msg_queued_nr , sd -> msg_sqe_available );
672- if ( qes_to_process == 0 )
679+ ret = rpma_cq_get_wc ( cq , 1 , wc , NULL );
680+ if ( ret == RPMA_E_NO_COMPLETION ) {
681+ /* lack of completion is not an error */
673682 return 0 ;
674-
675- /* process queued completions */
676- for (i = 0 ; i < qes_to_process ; ++ i ) {
677- if ((ret = server_qe_process (td , & sd -> msgs_queued [i ])))
678- return ret ;
679683 }
680-
681- /* progress the queue */
682- for (i = 0 ; i < sd -> msg_queued_nr - qes_to_process ; ++ i ) {
683- memcpy (& sd -> msgs_queued [i ],
684- & sd -> msgs_queued [qes_to_process + i ],
685- sizeof (sd -> msgs_queued [i ]));
684+ if (ret ) {
685+ librpma_td_verror (td , ret , "rpma_cq_get_wc" );
686+ goto err_terminate ;
686687 }
687688
688- sd -> msg_queued_nr -= qes_to_process ;
689+ /* validate the completion */
690+ if (wc -> status != IBV_WC_SUCCESS )
691+ goto err_terminate ;
692+
693+ if (wc -> opcode == IBV_WC_SEND )
694+ ++ sd -> msg_sqe_available ;
689695
690- return 0 ;
696+ return 1 ;
697+
698+ err_terminate :
699+ td -> terminate = true;
700+
701+ return -1 ;
691702}
692703
693- static int server_cmpl_process (struct thread_data * td )
704+ static int server_queue_poll (struct thread_data * td )
694705{
695706 struct librpma_fio_server_data * csd = td -> io_ops_data ;
696707 struct server_data * sd = csd -> server_data ;
697- struct ibv_wc * wc = & sd -> msgs_queued [sd -> msg_queued_nr ];
698- struct librpma_fio_options_values * o = td -> eo ;
708+ struct ibv_wc cq_wc , rcq_wc ;
699709 int ret ;
700710
701- ret = rpma_cq_get_wc (csd -> cq , 1 , wc , NULL );
702- if (ret == RPMA_E_NO_COMPLETION ) {
703- if (o -> busy_wait_polling )
704- return 0 ; /* lack of completion is not an error */
705-
706- ret = rpma_cq_wait (csd -> cq );
707- if (ret == RPMA_E_NO_COMPLETION )
708- return 0 ; /* lack of completion is not an error */
709- if (ret ) {
710- librpma_td_verror (td , ret , "rpma_cq_wait" );
711- goto err_terminate ;
712- }
711+ /* process the receive completion */
712+ ret = server_cmpl_poll (td , sd -> rcq , & rcq_wc );
713+ if (ret != 1 )
714+ return ret ;
713715
714- ret = rpma_cq_get_wc (csd -> cq , 1 , wc , NULL );
715- if (ret == RPMA_E_NO_COMPLETION )
716- return 0 ; /* lack of completion is not an error */
717- if (ret ) {
718- librpma_td_verror (td , ret , "rpma_cq_get_wc" );
719- goto err_terminate ;
720- }
721- } else if (ret ) {
722- librpma_td_verror (td , ret , "rpma_cq_get_wc" );
723- goto err_terminate ;
716+ /* ret == 1 means rcq_wc.opcode == IBV_WC_RECV */
717+
718+ /* ensure that at least one SQ slot is available */
719+ while (sd -> msg_sqe_available == 0 ) {
720+ /* process the send completion */
721+ ret = server_cmpl_poll (td , csd -> cq , & cq_wc );
722+ if (ret < 0 )
723+ return ret ;
724724 }
725725
726- /* validate the completion */
727- if (wc -> status != IBV_WC_SUCCESS )
728- goto err_terminate ;
726+ return server_qe_process (td , & rcq_wc );
727+ }
729728
730- if (wc -> opcode == IBV_WC_RECV )
731- ++ sd -> msg_queued_nr ;
732- else if (wc -> opcode == IBV_WC_SEND )
733- ++ sd -> msg_sqe_available ;
729+ static int server_queue_wait_poll (struct thread_data * td )
730+ {
731+ struct librpma_fio_server_data * csd = td -> io_ops_data ;
732+ struct server_data * sd = csd -> server_data ;
733+ struct rpma_cq * cq ;
734+ struct ibv_wc wc , cq_wc ;
735+ bool is_rcq = false;
736+ int ret ;
737+
738+ ret = rpma_conn_wait (csd -> conn , & cq , & is_rcq );
739+ if (ret ) {
740+ librpma_td_verror (td , ret , "rpma_conn_wait" );
741+ td -> terminate = true;
742+ return ret ;
743+ }
744+
745+ /* process the send or receive completion */
746+ ret = server_cmpl_poll (td , cq , & wc );
747+ if (ret != 1 )
748+ return ret ;
749+
750+ /* is_rcq == true means wc.opcode == IBV_WC_RECV */
751+ if (is_rcq ) {
752+ /* ensure that at least one SQ slot is available */
753+ while (sd -> msg_sqe_available == 0 ) {
754+ /* process the send completion */
755+ ret = server_cmpl_poll (td , csd -> cq , & cq_wc );
756+ if (ret < 0 )
757+ return ret ;
758+ }
759+
760+ return server_qe_process (td , & wc );
761+ }
734762
735763 return 0 ;
764+ }
736765
737- err_terminate :
738- td -> terminate = true;
766+ static inline int server_queue_process (struct thread_data * td )
767+ {
768+ struct librpma_fio_options_values * o = td -> eo ;
739769
740- return -1 ;
770+ if (o -> busy_wait_polling )
771+ return server_queue_poll (td );
772+ else
773+ return server_queue_wait_poll (td );
741774}
742775
743776static enum fio_q_status server_queue (struct thread_data * td , struct io_u * io_u )
744777{
745778 do {
746- if (server_cmpl_process (td ))
747- return FIO_Q_BUSY ;
748-
749779 if (server_queue_process (td ))
750780 return FIO_Q_BUSY ;
751781
0 commit comments