Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,15 @@ struct io_control {

// This is used to make sure we don't return IOCB while something else is using them
// this is to guarantee the submits could be done concurrently with polling
pthread_mutex_t iocbLock;
pthread_spinlock_t iocbLock;

pthread_mutex_t pollLock;

// a reusable pool of iocb
struct iocb ** iocb;
int queueSize;
int iocbPut;
int iocbGet;
int used;
//struct iocb* available_iocb = io_control->iocb[io_control->available-1] iff io_control->available > 0
int available;

};

Expand Down Expand Up @@ -444,41 +443,38 @@ static inline struct io_control * getIOControl(JNIEnv* env, jobject pointer) {
static inline struct iocb * getIOCB(struct io_control * control) {
struct iocb * iocb = 0;

pthread_mutex_lock(&(control->iocbLock));
pthread_spin_lock(&(control->iocbLock));

#ifdef DEBUG
fprintf (stdout, "getIOCB::used=%d, queueSize=%d, get=%d, put=%d\n", control->used, control->queueSize, control->iocbGet, control->iocbPut);
fprintf (stdout, "getIOCB::available=%d, queueSize=%d\n", control->available, control->queueSize);
#endif

if (control->used < control->queueSize) {
control->used++;
iocb = control->iocb[control->iocbGet++];

if (control->iocbGet >= control->queueSize) {
control->iocbGet = 0;
}
if (control->available > 0) {
control->available--;
iocb = control->iocb[control->available];
}

pthread_mutex_unlock(&(control->iocbLock));
pthread_spin_unlock(&(control->iocbLock));
return iocb;
}

/**
* Put an iocb back on the pool of IOCBs
*/
static inline void putIOCB(struct io_control * control, struct iocb * iocbBack) {
pthread_mutex_lock(&(control->iocbLock));
if (iocbBack == NULL) {
return;
}
pthread_spin_lock(&(control->iocbLock));

#ifdef DEBUG
fprintf (stdout, "putIOCB::used=%d, queueSize=%d, get=%d, put=%d\n", control->used, control->queueSize, control->iocbGet, control->iocbPut);
fprintf (stdout, "putIOCB::available=%d, queueSize=%d\n", control->available, control->queueSize);
#endif

control->used--;
control->iocb[control->iocbPut++] = iocbBack;
if (control->iocbPut >= control->queueSize) {
control->iocbPut = 0;
if (control->available < control->queueSize) {
control->iocb[control->available] = iocbBack;
control->available++;
}
pthread_mutex_unlock(&(control->iocbLock));
pthread_spin_unlock(&(control->iocbLock));
}

static inline short submit(JNIEnv * env, struct io_control * theControl, struct iocb * iocb) {
Expand Down Expand Up @@ -537,7 +533,7 @@ static inline void iocb_destroy_bounded(struct io_control * theControl, int uppe
* @param theControl
*/
static inline void iocb_destroy(struct io_control * theControl) {
iocb_destroy_bounded(theControl, theControl->queueSize);
iocb_destroy_bounded(theControl, theControl->available);
}

/**
Expand Down Expand Up @@ -590,9 +586,10 @@ JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_Libaio
}
}
theControl->queueSize = queueSize;
//iocb pool is full
theControl->available = queueSize;


res = pthread_mutex_init(&(theControl->iocbLock), 0);
res = pthread_spin_init(&(theControl->iocbLock), 0);
if (res) {
iocb_destroy(theControl);

Expand Down Expand Up @@ -625,10 +622,6 @@ JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_Libaio
return NULL;
}


theControl->iocbPut = 0;
theControl->iocbGet = 0;
theControl->used = 0;
theControl->thisObject = (*env)->NewGlobalRef(env, thisObject);

return (*env)->NewDirectByteBuffer(env, theControl, sizeof(struct io_control));
Expand Down Expand Up @@ -670,14 +663,21 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioCon
io_queue_release(theControl->ioContext);

pthread_mutex_destroy(&(theControl->pollLock));
pthread_mutex_destroy(&(theControl->iocbLock));
pthread_spin_destroy(&(theControl->iocbLock));

const int leaked_iocbs = theControl->queueSize - theControl->available;

iocb_destroy(theControl);

(*env)->DeleteGlobalRef(env, theControl->thisObject);

free(theControl->events);
free(theControl);

if (leaked_iocbs > 0) {
throwIOException(env, "There are pending I/O operations");
return;
}
}

JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_close(JNIEnv* env, jclass clazz, jint fd) {
Expand Down