diff --git a/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.c b/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.c index d495eb4..6a96b53 100644 --- a/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.c +++ b/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.c @@ -49,16 +49,15 @@ struct io_control { // This is used to make sure we don't return IOCB while something else is using them // this is to guarantee the submits could be done concurrently with polling - pthread_mutex_t iocbLock; + pthread_spinlock_t iocbLock; pthread_mutex_t pollLock; // a reusable pool of iocb struct iocb ** iocb; int queueSize; - int iocbPut; - int iocbGet; - int used; + //struct iocb* available_iocb = io_control->iocb[io_control->available-1] iff io_control->available > 0 + int available; }; @@ -444,22 +443,18 @@ static inline struct io_control * getIOControl(JNIEnv* env, jobject pointer) { static inline struct iocb * getIOCB(struct io_control * control) { struct iocb * iocb = 0; - pthread_mutex_lock(&(control->iocbLock)); + pthread_spin_lock(&(control->iocbLock)); #ifdef DEBUG - fprintf (stdout, "getIOCB::used=%d, queueSize=%d, get=%d, put=%d\n", control->used, control->queueSize, control->iocbGet, control->iocbPut); + fprintf (stdout, "getIOCB::available=%d, queueSize=%d\n", control->available, control->queueSize); #endif - if (control->used < control->queueSize) { - control->used++; - iocb = control->iocb[control->iocbGet++]; - - if (control->iocbGet >= control->queueSize) { - control->iocbGet = 0; - } + if (control->available > 0) { + control->available--; + iocb = control->iocb[control->available]; } - pthread_mutex_unlock(&(control->iocbLock)); + pthread_spin_unlock(&(control->iocbLock)); return iocb; } @@ -467,18 +462,19 @@ static inline struct iocb * getIOCB(struct io_control * control) { * Put an iocb back on the pool of IOCBs */ static inline void putIOCB(struct io_control * control, struct iocb * iocbBack) { - pthread_mutex_lock(&(control->iocbLock)); + if (iocbBack == NULL) { + return; + } + pthread_spin_lock(&(control->iocbLock)); #ifdef DEBUG - fprintf (stdout, "putIOCB::used=%d, queueSize=%d, get=%d, put=%d\n", control->used, control->queueSize, control->iocbGet, control->iocbPut); + fprintf (stdout, "putIOCB::available=%d, queueSize=%d\n", control->available, control->queueSize); #endif - - control->used--; - control->iocb[control->iocbPut++] = iocbBack; - if (control->iocbPut >= control->queueSize) { - control->iocbPut = 0; + if (control->available < control->queueSize) { + control->iocb[control->available] = iocbBack; + control->available++; } - pthread_mutex_unlock(&(control->iocbLock)); + pthread_spin_unlock(&(control->iocbLock)); } static inline short submit(JNIEnv * env, struct io_control * theControl, struct iocb * iocb) { @@ -537,7 +533,7 @@ static inline void iocb_destroy_bounded(struct io_control * theControl, int uppe * @param theControl */ static inline void iocb_destroy(struct io_control * theControl) { - iocb_destroy_bounded(theControl, theControl->queueSize); + iocb_destroy_bounded(theControl, theControl->available); } /** @@ -590,9 +586,10 @@ JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_Libaio } } theControl->queueSize = queueSize; + //iocb pool is full + theControl->available = queueSize; - - res = pthread_mutex_init(&(theControl->iocbLock), 0); + res = pthread_spin_init(&(theControl->iocbLock), 0); if (res) { iocb_destroy(theControl); @@ -625,10 +622,6 @@ JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_Libaio return NULL; } - - theControl->iocbPut = 0; - theControl->iocbGet = 0; - theControl->used = 0; theControl->thisObject = (*env)->NewGlobalRef(env, thisObject); return (*env)->NewDirectByteBuffer(env, theControl, sizeof(struct io_control)); @@ -670,7 +663,9 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioCon io_queue_release(theControl->ioContext); pthread_mutex_destroy(&(theControl->pollLock)); - pthread_mutex_destroy(&(theControl->iocbLock)); + pthread_spin_destroy(&(theControl->iocbLock)); + + const int leaked_iocbs = theControl->queueSize - theControl->available; iocb_destroy(theControl); @@ -678,6 +673,11 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioCon free(theControl->events); free(theControl); + + if (leaked_iocbs > 0) { + throwIOException(env, "There are pending I/O operations"); + return; + } } JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_close(JNIEnv* env, jclass clazz, jint fd) {