@@ -367,12 +367,6 @@ struct server_data {
367367
368368 /* resources for messaging buffer from DRAM allocated by fio */
369369 struct rpma_mr_local * msg_mr ;
370-
371- uint32_t msg_sqe_available ; /* # of free SQ slots */
372-
373- /* in-memory queues */
374- struct ibv_wc * msgs_queued ;
375- uint32_t msg_queued_nr ;
376370};
377371
378372static int server_init (struct thread_data * td )
@@ -393,13 +387,6 @@ static int server_init(struct thread_data *td)
393387 goto err_server_cleanup ;
394388 }
395389
396- /* allocate in-memory queue */
397- sd -> msgs_queued = calloc (td -> o .iodepth , sizeof (* sd -> msgs_queued ));
398- if (sd -> msgs_queued == NULL ) {
399- td_verror (td , errno , "calloc" );
400- goto err_free_sd ;
401- }
402-
403390 /*
404391 * Assure a single io_u buffer can store both SEND and RECV messages and
405392 * an io_us buffer allocation is page-size-aligned which is required
@@ -412,9 +399,6 @@ static int server_init(struct thread_data *td)
412399
413400 return 0 ;
414401
415- err_free_sd :
416- free (sd );
417-
418402err_server_cleanup :
419403 librpma_fio_server_cleanup (td );
420404
@@ -485,7 +469,6 @@ static void server_cleanup(struct thread_data *td)
485469 if ((ret = rpma_mr_dereg (& sd -> msg_mr )))
486470 librpma_td_verror (td , ret , "rpma_mr_dereg" );
487471
488- free (sd -> msgs_queued );
489472 free (sd );
490473 }
491474
@@ -501,7 +484,6 @@ static int prepare_connection(struct thread_data *td,
501484 int i ;
502485
503486 /* prepare buffers for a flush requests */
504- sd -> msg_sqe_available = td -> o .iodepth ;
505487 for (i = 0 ; i < td -> o .iodepth ; i ++ ) {
506488 size_t offset_recv_msg = IO_U_BUFF_OFF_SERVER (i ) + RECV_OFFSET ;
507489 if ((ret = rpma_conn_req_recv (conn_req , sd -> msg_mr ,
@@ -536,8 +518,10 @@ static int server_open_file(struct thread_data *td, struct fio_file *f)
536518 * all possible flush requests (SENDs)
537519 * - the receive queue (RQ) has to be big enough to accommodate
538520 * all flush responses (RECVs)
539- * - the completion queue (CQ) has to be big enough to accommodate
540- * all success and error completions (sq_size + rq_size)
521+ * - the main completion queue (CQ) has to be big enough to
522+ * accommodate all success and error completions (sq_size)
523+ * - the receive completion queue (RCQ) has to be big enough to
524+ * accommodate all success and error completions (rq_size)
541525 */
542526 if ((ret = rpma_conn_cfg_set_sq_size (cfg , max_msg_num ))) {
543527 librpma_td_verror (td , ret , "rpma_conn_cfg_set_sq_size" );
@@ -547,11 +531,16 @@ static int server_open_file(struct thread_data *td, struct fio_file *f)
547531 librpma_td_verror (td , ret , "rpma_conn_cfg_set_rq_size" );
548532 goto err_cfg_delete ;
549533 }
550- if ((ret = rpma_conn_cfg_set_cq_size (cfg , max_msg_num * 2 ))) {
534+ if ((ret = rpma_conn_cfg_set_cq_size (cfg , max_msg_num ))) {
551535 librpma_td_verror (td , ret , "rpma_conn_cfg_set_cq_size" );
552536 goto err_cfg_delete ;
553537 }
554538
539+ if ((ret = rpma_conn_cfg_set_rcq_size (cfg , max_msg_num ))) {
540+ librpma_td_verror (td , ret , "rpma_conn_cfg_set_rcq_size" );
541+ goto err_cfg_delete ;
542+ }
543+
555544 ret = librpma_fio_server_open_file (td , f , cfg );
556545
557546err_cfg_delete :
@@ -630,7 +619,6 @@ static int server_qe_process(struct thread_data *td, struct ibv_wc *wc)
630619 librpma_td_verror (td , ret , "rpma_send" );
631620 goto err_free_unpacked ;
632621 }
633- -- sd -> msg_sqe_available ;
634622
635623 gpspm_flush_request__free_unpacked (flush_req , NULL );
636624
@@ -645,48 +633,17 @@ static int server_qe_process(struct thread_data *td, struct ibv_wc *wc)
645633 return -1 ;
646634}
647635
648- static inline int server_queue_process (struct thread_data * td )
636+ static int server_cmpl_process (struct thread_data * td , struct ibv_wc * wc , bool use_rcq )
649637{
650638 struct librpma_fio_server_data * csd = td -> io_ops_data ;
651- struct server_data * sd = csd -> server_data ;
652- int ret ;
653- int i ;
654-
655- /* min(# of queue entries, # of SQ entries available) */
656- uint32_t qes_to_process = min (sd -> msg_queued_nr , sd -> msg_sqe_available );
657- if (qes_to_process == 0 )
658- return 0 ;
659-
660- /* process queued completions */
661- for (i = 0 ; i < qes_to_process ; ++ i ) {
662- if ((ret = server_qe_process (td , & sd -> msgs_queued [i ])))
663- return ret ;
664- }
665-
666- /* progress the queue */
667- for (i = 0 ; i < sd -> msg_queued_nr - qes_to_process ; ++ i ) {
668- memcpy (& sd -> msgs_queued [i ],
669- & sd -> msgs_queued [qes_to_process + i ],
670- sizeof (sd -> msgs_queued [i ]));
671- }
672-
673- sd -> msg_queued_nr -= qes_to_process ;
674-
675- return 0 ;
676- }
677-
678- static int server_cmpl_process (struct thread_data * td )
679- {
680- struct librpma_fio_server_data * csd = td -> io_ops_data ;
681- struct server_data * sd = csd -> server_data ;
682- struct ibv_wc * wc = & sd -> msgs_queued [sd -> msg_queued_nr ];
683639 struct librpma_fio_options_values * o = td -> eo ;
640+ struct rpma_cq * cq = use_rcq ? csd -> rcq : csd -> cq ;
684641 int ret ;
685642
686- ret = rpma_cq_get_wc (csd -> cq , 1 , wc , NULL );
643+ ret = rpma_cq_get_wc (cq , 1 , wc , NULL );
687644 if (ret == RPMA_E_NO_COMPLETION ) {
688645 if (o -> busy_wait_polling == 0 ) {
689- ret = rpma_cq_wait (csd -> cq );
646+ ret = rpma_cq_wait (cq );
690647 if (ret == RPMA_E_NO_COMPLETION ) {
691648 /* lack of completion is not an error */
692649 return 0 ;
@@ -695,7 +652,7 @@ static int server_cmpl_process(struct thread_data *td)
695652 goto err_terminate ;
696653 }
697654
698- ret = rpma_cq_get_wc (csd -> cq , 1 , wc , NULL );
655+ ret = rpma_cq_get_wc (cq , 1 , wc , NULL );
699656 if (ret == RPMA_E_NO_COMPLETION ) {
700657 /* lack of completion is not an error */
701658 return 0 ;
@@ -716,10 +673,11 @@ static int server_cmpl_process(struct thread_data *td)
716673 if (wc -> status != IBV_WC_SUCCESS )
717674 goto err_terminate ;
718675
719- if (wc -> opcode == IBV_WC_RECV )
720- ++ sd -> msg_queued_nr ;
721- else if (wc -> opcode == IBV_WC_SEND )
722- ++ sd -> msg_sqe_available ;
676+ if (use_rcq && wc -> opcode != IBV_WC_RECV )
677+ goto err_terminate ;
678+
679+ if (!use_rcq && wc -> opcode != IBV_WC_SEND )
680+ goto err_terminate ;
723681
724682 return 0 ;
725683
@@ -729,12 +687,33 @@ static int server_cmpl_process(struct thread_data *td)
729687 return -1 ;
730688}
731689
690+ static inline int server_queue_process (struct thread_data * td )
691+ {
692+ struct ibv_wc wc ;
693+ int ret ;
694+
695+ /* process the receive completion */
696+ ret = server_cmpl_process (td , & wc , 1 );
697+ if (ret )
698+ return ret ;
699+
700+ if (wc .opcode == IBV_WC_RECV ) {
701+ ret = server_qe_process (td , & wc );
702+ if (ret )
703+ return ret ;
704+ }
705+
706+ /* process the send completion */
707+ ret = server_cmpl_process (td , & wc , 0 );
708+ if (ret )
709+ return ret ;
710+
711+ return 0 ;
712+ }
713+
732714static enum fio_q_status server_queue (struct thread_data * td , struct io_u * io_u )
733715{
734716 do {
735- if (server_cmpl_process (td ))
736- return FIO_Q_BUSY ;
737-
738717 if (server_queue_process (td ))
739718 return FIO_Q_BUSY ;
740719
0 commit comments