From 40d4af6c642a614fdb8e203b1983a326778a5698 Mon Sep 17 00:00:00 2001 From: franz1981 Date: Thu, 1 Oct 2020 07:41:53 +0200 Subject: [PATCH 1/2] ARTEMIS-2945 Artemis native JNI code can be replaced by Java --- pom.xml | 8 +- ...emq_artemis_nativo_jlibaio_LibaioContext.c | 508 +++-------------- ...emq_artemis_nativo_jlibaio_LibaioContext.h | 68 ++- .../artemis/nativo/jlibaio/AioRing.java | 150 +++++ .../activemq/artemis/nativo/jlibaio/IoCb.java | 96 ++++ .../artemis/nativo/jlibaio/IoEventArray.java | 107 ++++ .../artemis/nativo/jlibaio/LibaioContext.java | 529 ++++++++++++++---- .../artemis/nativo/jlibaio/LibaioFile.java | 13 +- .../nativo/jlibaio/test/LibaioStressTest.java | 2 +- .../nativo/jlibaio/test/LibaioTest.java | 5 +- .../jlibaio/test/OpenCloseContextTest.java | 1 + 11 files changed, 920 insertions(+), 567 deletions(-) create mode 100644 src/main/java/org/apache/activemq/artemis/nativo/jlibaio/AioRing.java create mode 100644 src/main/java/org/apache/activemq/artemis/nativo/jlibaio/IoCb.java create mode 100644 src/main/java/org/apache/activemq/artemis/nativo/jlibaio/IoEventArray.java diff --git a/pom.xml b/pom.xml index 6cfdac0..95c1902 100644 --- a/pom.xml +++ b/pom.xml @@ -39,7 +39,7 @@ 3.3.1.Final 2.0.3.Final - + 3.1.0 2.4 0.7.9 0.7.9 @@ -148,6 +148,12 @@ ${jb.logmanager.version} + + org.jctools + jctools-core + ${jctools.version} + + diff --git a/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.c b/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.c index d495eb4..c1df433 100644 --- a/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.c +++ b/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.c @@ -42,24 +42,8 @@ #define store_barrier() __asm__ __volatile__("":::"memory") struct io_control { - io_context_t ioContext; - struct io_event * events; - - jobject thisObject; - - // This is used to make sure we don't return IOCB while something else is using them - // this is to guarantee the submits could be done concurrently with polling - pthread_mutex_t iocbLock; - - pthread_mutex_t pollLock; - - // a reusable pool of iocb - struct iocb ** iocb; - int queueSize; - int iocbPut; - int iocbGet; - int used; - + io_context_t ioContext; + jobject thisObject; }; //These should be used to check if the user-space io_getevents is supported: @@ -100,15 +84,15 @@ JNIEXPORT jboolean JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_Libai This is an implementation detail, that is a binary contract. it is safe to use the feature though. */ struct aio_ring { - unsigned id; /* kernel internal index number */ - unsigned nr; /* number of io_events */ - unsigned head; - unsigned tail; + unsigned id; /* kernel internal index number */ + unsigned nr; /* number of io_events */ + unsigned head; + unsigned tail; - unsigned magic; - unsigned compat_features; - unsigned incompat_features; - unsigned header_length; /* size of aio_ring */ + unsigned magic; + unsigned compat_features; + unsigned incompat_features; + unsigned header_length; /* size of aio_ring */ struct io_event io_events[0]; @@ -127,7 +111,7 @@ static inline struct aio_ring* to_aio_ring(io_context_t aio_ctx) { //It implements a user space batch read io events implementation that attempts to read io avoiding any sys calls // This implementation will look at the internal structure (aio_ring) and move along the memory result -static int ringio_get_events(io_context_t aio_ctx, long min_nr, long max, +static inline int ringio_get_events(io_context_t aio_ctx, long min_nr, long max, struct io_event *events, struct timespec *timeout) { struct aio_ring *ring = to_aio_ring(aio_ctx); //checks if it could be completed in user space, saving a sys call @@ -137,6 +121,9 @@ static int ringio_get_events(io_context_t aio_ctx, long min_nr, long max, unsigned head = ring->head; mem_barrier(); const unsigned tail = ring->tail; + //the kernel has written ring->tail from an interrupt: + //we need to load acquire the completed events here + read_barrier(); int available = tail - head; if (available < 0) { //a wrap has occurred @@ -150,7 +137,7 @@ static int ringio_get_events(io_context_t aio_ctx, long min_nr, long max, return 0; } - if (available >= max) { + if (available > ring_nr) { // This is to trap a possible bug from the kernel: // https://bugzilla.redhat.com/show_bug.cgi?id=1845326 // https://issues.apache.org/jira/browse/ARTEMIS-2800 @@ -167,20 +154,13 @@ static int ringio_get_events(io_context_t aio_ctx, long min_nr, long max, // and I did not want to create another memory flag to stop the rest of the code } - //the kernel has written ring->tail from an interrupt: - //we need to load acquire the completed events here - read_barrier(); const int available_nr = available < max? available : max; - //if isn't needed to wrap we can avoid % operations that are quite expansive - const int needMod = ((head + available_nr) >= ring_nr) ? 1 : 0; for (int i = 0; i= ring_nr ? 0 : head; events[i] = ring->io_events[head]; - if (needMod == 1) { - head = (head + 1) % ring_nr; - } else { - head = (head + 1); - } + head++; } + head = head >= ring_nr ? 0 : head; //it allow the kernel to build its own view of the ring buffer size //and push new events if there are any store_barrier(); @@ -214,12 +194,6 @@ char dumbPath[PATH_MAX]; void * oneMegaBuffer = 0; pthread_mutex_t oneMegaMutex; - -jclass submitClass = NULL; -jmethodID errorMethod = NULL; -jmethodID doneMethod = NULL; -jmethodID libaioContextDone = NULL; - jclass libaioContextClass = NULL; jclass runtimeExceptionClass = NULL; jclass ioExceptionClass = NULL; @@ -290,13 +264,11 @@ static inline short verifyBuffer(int alignment) { } - jint JNI_OnLoad(JavaVM* vm, void* reserved) { JNIEnv* env; if ((*vm)->GetEnv(vm, (void**) &env, JNI_VERSION_1_6) != JNI_OK) { return JNI_ERR; } else { - int res = pthread_mutex_init(&oneMegaMutex, 0); if (res) { fprintf(stderr, "could not initialize mutex on on_load, %d", res); @@ -348,34 +320,12 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { return JNI_ERR; } - submitClass = (*env)->FindClass(env, "org/apache/activemq/artemis/nativo/jlibaio/SubmitInfo"); - if (submitClass == NULL) { - return JNI_ERR; - } - - submitClass = (jclass)(*env)->NewGlobalRef(env, (jobject)submitClass); - - errorMethod = (*env)->GetMethodID(env, submitClass, "onError", "(ILjava/lang/String;)V"); - if (errorMethod == NULL) { - return JNI_ERR; - } - - doneMethod = (*env)->GetMethodID(env, submitClass, "done", "()V"); - if (doneMethod == NULL) { - return JNI_ERR; - } - libaioContextClass = (*env)->FindClass(env, "org/apache/activemq/artemis/nativo/jlibaio/LibaioContext"); if (libaioContextClass == NULL) { return JNI_ERR; } libaioContextClass = (jclass)(*env)->NewGlobalRef(env, (jobject)libaioContextClass); - libaioContextDone = (*env)->GetMethodID(env, libaioContextClass, "done", "(Lorg/apache/activemq/artemis/nativo/jlibaio/SubmitInfo;)V"); - if (libaioContextDone == NULL) { - return JNI_ERR; - } - return JNI_VERSION_1_6; } } @@ -414,10 +364,6 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) { (*env)->DeleteGlobalRef(env, ioExceptionClass); } - if (submitClass != NULL) { - (*env)->DeleteGlobalRef(env, (jobject)submitClass); - } - if (libaioContextClass != NULL) { (*env)->DeleteGlobalRef(env, (jobject)libaioContextClass); } @@ -438,59 +384,14 @@ static inline struct io_control * getIOControl(JNIEnv* env, jobject pointer) { return ioControl; } -/** - * remove an iocb from the pool of IOCBs. Returns null if full - */ -static inline struct iocb * getIOCB(struct io_control * control) { - struct iocb * iocb = 0; - - pthread_mutex_lock(&(control->iocbLock)); - - #ifdef DEBUG - fprintf (stdout, "getIOCB::used=%d, queueSize=%d, get=%d, put=%d\n", control->used, control->queueSize, control->iocbGet, control->iocbPut); - #endif - - if (control->used < control->queueSize) { - control->used++; - iocb = control->iocb[control->iocbGet++]; - - if (control->iocbGet >= control->queueSize) { - control->iocbGet = 0; - } - } - - pthread_mutex_unlock(&(control->iocbLock)); - return iocb; -} - -/** - * Put an iocb back on the pool of IOCBs - */ -static inline void putIOCB(struct io_control * control, struct iocb * iocbBack) { - pthread_mutex_lock(&(control->iocbLock)); - - #ifdef DEBUG - fprintf (stdout, "putIOCB::used=%d, queueSize=%d, get=%d, put=%d\n", control->used, control->queueSize, control->iocbGet, control->iocbPut); - #endif - - control->used--; - control->iocb[control->iocbPut++] = iocbBack; - if (control->iocbPut >= control->queueSize) { - control->iocbPut = 0; - } - pthread_mutex_unlock(&(control->iocbLock)); +JNIEXPORT jlong JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_getIOContextAddress(JNIEnv* env, jclass clazz, jobject ioControlBuffer) { + return (jlong) (getIOControl(env, ioControlBuffer)->ioContext); } -static inline short submit(JNIEnv * env, struct io_control * theControl, struct iocb * iocb) { - int result = io_submit(theControl->ioContext, 1, &iocb); +static inline short submit(JNIEnv * env, io_context_t io_context, struct iocb * iocb) { + int result = io_submit(io_context, 1, &iocb); if (result < 0) { - // Putting the Global Ref and IOCB back in case of a failure - if (iocb->data != NULL && iocb->data != (void *) -1) { - (*env)->DeleteGlobalRef(env, (jobject)iocb->data); - } - putIOCB(theControl, iocb); - throwIOExceptionErrorNo(env, "Error while submitting IO: ", -result); return 0; } @@ -498,54 +399,15 @@ static inline short submit(JNIEnv * env, struct io_control * theControl, struct return 1; } -static inline void * getBuffer(JNIEnv* env, jobject pointer) { - return (*env)->GetDirectBufferAddress(env, pointer); -} - JNIEXPORT jboolean JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_lock (JNIEnv * env, jclass clazz, jint handle) { return flock(handle, LOCK_EX | LOCK_NB) == 0; } - -/** - * Destroys the individual members of the IOCB pool - * @param theControl the IO Control structure containing an IOCB pool - * @param upperBound the number of elements contained within the pool - */ -static inline void iocb_destroy_members(struct io_control * theControl, int upperBound) { - for (int i = 0; i < upperBound; i++) { - free(theControl->iocb[i]); - } -} - - -/** - * Destroys an IOCB pool and its members up to a certain limit. Should be used when the IOCB - * pool fails to initialize completely - * @param theControl the IO Control structure containing an IOCB pool - * @param upperBound the number of elements contained within the pool - */ -static inline void iocb_destroy_bounded(struct io_control * theControl, int upperBound) { - iocb_destroy_members(theControl, upperBound); - free(theControl->iocb); -} - - -/** - * Destroys an IOCB pool and all its members - * @param theControl - */ -static inline void iocb_destroy(struct io_control * theControl) { - iocb_destroy_bounded(theControl, theControl->queueSize); -} - /** * Everything that is allocated here will be freed at deleteContext when the class is unloaded. */ JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_newContext(JNIEnv* env, jobject thisObject, jint queueSize) { - int i = 0; - #ifdef DEBUG fprintf (stdout, "Initializing context\n"); #endif @@ -565,118 +427,29 @@ JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_Libaio throwRuntimeExceptionErrorNo(env, "Cannot initialize queue:", res); return NULL; } - - theControl->iocb = (struct iocb **)malloc((sizeof(struct iocb *) * (size_t)queueSize)); - if (theControl->iocb == NULL) { - io_queue_release(theControl->ioContext); - free(theControl); - - throwOutOfMemoryError(env); - return NULL; - } - - for (i = 0; i < queueSize; i++) { - theControl->iocb[i] = (struct iocb *)malloc(sizeof(struct iocb)); - if (theControl->iocb[i] == NULL) { - - // It may not have been fully initialized, therefore limit the cleanup up to 'i' members. - iocb_destroy_bounded(theControl, i); - - io_queue_release(theControl->ioContext); - free(theControl); - - throwOutOfMemoryError(env); - return NULL; - } - } - theControl->queueSize = queueSize; - - - res = pthread_mutex_init(&(theControl->iocbLock), 0); - if (res) { - iocb_destroy(theControl); - - io_queue_release(theControl->ioContext); - free(theControl); - - throwRuntimeExceptionErrorNo(env, "Can't initialize mutext:", res); - return NULL; - } - - res = pthread_mutex_init(&(theControl->pollLock), 0); - if (res) { - iocb_destroy(theControl); - - io_queue_release(theControl->ioContext); - free(theControl); - - throwRuntimeExceptionErrorNo(env, "Can't initialize mutext:", res); - return NULL; - } - - theControl->events = (struct io_event *)malloc(sizeof(struct io_event) * (size_t)queueSize); - if (theControl->events == NULL) { - iocb_destroy(theControl); - - io_queue_release(theControl->ioContext); - free(theControl); - - throwRuntimeExceptionErrorNo(env, "Can't initialize mutext (not enough memory for the events member): ", res); - return NULL; - } - - - theControl->iocbPut = 0; - theControl->iocbGet = 0; - theControl->used = 0; theControl->thisObject = (*env)->NewGlobalRef(env, thisObject); return (*env)->NewDirectByteBuffer(env, theControl, sizeof(struct io_control)); } -JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_deleteContext(JNIEnv* env, jclass clazz, jobject contextPointer) { - int i; - struct io_control * theControl = getIOControl(env, contextPointer); - if (theControl == NULL) { - return; - } - - struct iocb * iocb = getIOCB(theControl); - - if (iocb == NULL) { - throwIOException(env, "Not enough space in libaio queue"); - return; - } - - // Submitting a dumb write so the loop finishes - io_prep_pwrite(iocb, dumbWriteHandler, 0, 0, 0); - iocb->data = (void *) -1; - if (!submit(env, theControl, iocb)) { - return; - } +JNIEXPORT jstring JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_strError(JNIEnv* env, jclass clazz, jlong eventResult) { + return (*env)->NewStringUTF(env, strerror(-eventResult)); +} - // to make sure the poll has finished - pthread_mutex_lock(&(theControl->pollLock)); - pthread_mutex_unlock(&(theControl->pollLock)); +JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_dumbFD(JNIEnv* env, jclass clazz) { + return dumbWriteHandler; +} - // To return any pending IOCBs - int result = ringio_get_events(theControl->ioContext, 0, 1, theControl->events, 0); - for (i = 0; i < result; i++) { - struct io_event * event = &(theControl->events[i]); - struct iocb * iocbp = event->obj; - putIOCB(theControl, iocbp); +JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_deleteContext(JNIEnv* env, jclass clazz, jobject ioControlBuffer) { + struct io_control * theControl = getIOControl(env, ioControlBuffer); + if (theControl == NULL) { + return; } io_queue_release(theControl->ioContext); - pthread_mutex_destroy(&(theControl->pollLock)); - pthread_mutex_destroy(&(theControl->iocbLock)); - - iocb_destroy(theControl); - (*env)->DeleteGlobalRef(env, theControl->thisObject); - free(theControl->events); free(theControl); } @@ -706,214 +479,65 @@ JNIEXPORT int JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioCont return res; } -JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_submitWrite - (JNIEnv * env, jclass clazz, jint fileHandle, jobject contextPointer, jlong position, jint size, jobject bufferWrite, jobject callback) { - struct io_control * theControl = getIOControl(env, contextPointer); - if (theControl == NULL) { - return; - } +JNIEXPORT jlong Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_memoryAddress0(JNIEnv* env, jclass clazz, jobject buffer) { + return (jlong) (*env)->GetDirectBufferAddress(env, buffer); +} +JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_submitWrite + (JNIEnv * env, jclass clazz, jint fileHandle, jlong ioContextAddress, jlong iocbAddress, jlong position, jint size, jlong bufferAddress, jlong requestId) { #ifdef DEBUG fprintf (stdout, "submitWrite position %ld, size %d\n", position, size); #endif - struct iocb * iocb = getIOCB(theControl); + io_context_t io_context = (io_context_t) ioContextAddress; - if (iocb == NULL) { - throwIOException(env, "Not enough space in libaio queue"); - return; - } + struct iocb * iocb = (struct iocb *) iocbAddress; - io_prep_pwrite(iocb, fileHandle, getBuffer(env, bufferWrite), (size_t)size, position); + io_prep_pwrite(iocb, fileHandle, (void *)bufferAddress, (size_t)size, position); - // The GlobalRef will be deleted when poll is called. this is done so - // the vm wouldn't crash if the Callback passed by the user is GCed between submission - // and callback. - // also as the real intention is to hold the reference until the life cycle is complete - iocb->data = (void *) (*env)->NewGlobalRef(env, callback); + iocb->data = (void *)requestId; - submit(env, theControl, iocb); + submit(env, io_context, iocb); } -JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_submitRead - (JNIEnv * env, jclass clazz, jint fileHandle, jobject contextPointer, jlong position, jint size, jobject bufferRead, jobject callback) { - struct io_control * theControl = getIOControl(env, contextPointer); - if (theControl == NULL) { - return; - } +JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_submitFDataSync + (JNIEnv * env, jclass clazz, jint fileHandle, jlong ioContextAddress, jlong iocbAddress, jlong requestId) { + #ifdef DEBUG + fprintf (stdout, "submitFDataSync fd %d\n", (int)fileHandle); + #endif - struct iocb * iocb = getIOCB(theControl); + io_context_t io_context = (io_context_t) ioContextAddress; - if (iocb == NULL) { - throwIOException(env, "Not enough space in libaio queue"); - return; - } + struct iocb * iocb = (struct iocb *) iocbAddress; - io_prep_pread(iocb, fileHandle, getBuffer(env, bufferRead), (size_t)size, position); + io_prep_fdsync(iocb, fileHandle); - // The GlobalRef will be deleted when poll is called. this is done so - // the vm wouldn't crash if the Callback passed by the user is GCed between submission - // and callback. - // also as the real intention is to hold the reference until the life cycle is complete - iocb->data = (void *) (*env)->NewGlobalRef(env, callback); + iocb->data = (void *)requestId; - submit(env, theControl, iocb); + submit(env, io_context, iocb); } -JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_blockedPoll - (JNIEnv * env, jobject thisObject, jobject contextPointer, jboolean useFdatasync) { - +JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_submitRead + (JNIEnv * env, jclass clazz, jint fileHandle, jlong ioContextAddress, jlong iocbAddress, jlong position, jint size, jlong bufferAddress, jlong requestId) { #ifdef DEBUG - fprintf (stdout, "Running blockedPoll\n"); - fflush(stdout); + fprintf (stdout, "submitRead position %ld, size %d\n", position, size); #endif + io_context_t io_context = (io_context_t) ioContextAddress; - int i; - struct io_control * theControl = getIOControl(env, contextPointer); - if (theControl == NULL) { - return; - } - int max = theControl->queueSize; - pthread_mutex_lock(&(theControl->pollLock)); - - short running = 1; - - int lastFile = -1; - - while (running) { - - int result = ringio_get_events(theControl->ioContext, 1, max, theControl->events, 0); - - if (result == -EINTR) - { - // ARTEMIS-353: jmap will issue some weird interrupt signal what would break the execution here - // we need to ignore such calls here - continue; - } - - if (result < 0) - { - throwIOExceptionErrorNo(env, "Error while calling io_getevents IO: ", -result); - break; - } - #ifdef DEBUG - fprintf (stdout, "blockedPoll returned %d events\n", result); - fflush(stdout); - #endif - - lastFile = -1; + struct iocb * iocb = (struct iocb *) iocbAddress; - for (i = 0; i < result; i++) - { - #ifdef DEBUG - fprintf (stdout, "blockedPoll treating event %d\n", i); - fflush(stdout); - #endif - struct io_event * event = &(theControl->events[i]); - struct iocb * iocbp = event->obj; - - if (iocbp->aio_fildes == dumbWriteHandler) { - #ifdef DEBUG - fprintf (stdout, "Dumb write arrived, giving up the loop\n"); - fflush(stdout); - #endif - putIOCB(theControl, iocbp); - running = 0; - break; - } - - if (useFdatasync && lastFile != iocbp->aio_fildes) { - lastFile = iocbp->aio_fildes; - fdatasync(lastFile); - } + io_prep_pread(iocb, fileHandle, (void *)bufferAddress, (size_t)size, position); + iocb->data = (void *)requestId; - int eventResult = (int)event->res; - - #ifdef DEBUG - fprintf (stdout, "Poll res: %d totalRes=%d\n", eventResult, result); - fflush (stdout); - #endif - - if (eventResult < 0) { - #ifdef DEBUG - fprintf (stdout, "Error: %s\n", strerror(-eventResult)); - fflush (stdout); - #endif - - jstring jstrError = (*env)->NewStringUTF(env, strerror(-eventResult)); - - if (iocbp->data != NULL) { - (*env)->CallVoidMethod(env, (jobject)(iocbp->data), errorMethod, (jint)(-eventResult), jstrError); - } - } - - jobject obj = (jobject)iocbp->data; - iocbp->data = NULL; // this is to detect invalid elements on the buffer. - - if (obj != NULL) { - putIOCB(theControl, iocbp); - (*env)->CallVoidMethod(env, theControl->thisObject, libaioContextDone,obj); - // We delete the globalRef after the completion of the callback - (*env)->DeleteGlobalRef(env, obj); - } else { - if (!forceSysCall) { - fprintf (stdout, "Warning from ActiveMQ Artemis Native Layer: Your system is hitting duplicate / invalid records from libaio, which is a bug on the Linux Kernel you are using.\nYou should set property org.apache.activemq.artemis.native.jlibaio.FORCE_SYSCALL=1\nor upgrade to a kernel version that contains a fix"); - fflush(stdout); - } - forceSysCall = JNI_TRUE; - } - - } - } - - pthread_mutex_unlock(&(theControl->pollLock)); - + submit(env, io_context, iocb); } JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_poll - (JNIEnv * env, jobject obj, jobject contextPointer, jobjectArray callbacks, jint min, jint max) { - int i = 0; - struct io_control * theControl = getIOControl(env, contextPointer); - if (theControl == NULL) { - return 0; - } - - - int result = ringio_get_events(theControl->ioContext, min, max, theControl->events, 0); - int retVal = result; - - for (i = 0; i < result; i++) { - struct io_event * event = &(theControl->events[i]); - struct iocb * iocbp = event->obj; - int eventResult = (int)event->res; - - #ifdef DEBUG - fprintf (stdout, "Poll res: %d totalRes=%d\n", eventResult, result); - #endif - - if (eventResult < 0) { - #ifdef DEBUG - fprintf (stdout, "Error: %s\n", strerror(-eventResult)); - #endif - - if (iocbp->data != NULL && iocbp->data != (void *) -1) { - jstring jstrError = (*env)->NewStringUTF(env, strerror(-eventResult)); - - (*env)->CallVoidMethod(env, (jobject)(iocbp->data), errorMethod, (jint)(-eventResult), jstrError); - } - } - - if (iocbp->data != NULL && iocbp->data != (void *) -1) { - (*env)->SetObjectArrayElement(env, callbacks, i, (jobject)iocbp->data); - // We delete the globalRef after the completion of the callback - (*env)->DeleteGlobalRef(env, (jobject)iocbp->data); - } - - putIOCB(theControl, iocbp); - } - - return retVal; + (JNIEnv * env, jclass clazz, jlong ioContextAddress, jlong ioEventsAddress, jint min, jint max) { + io_context_t io_context = (io_context_t) ioContextAddress; + struct io_event * events = (struct io_event *) ioEventsAddress; + return ringio_get_events(io_context, min, max, events, 0); } JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_newAlignedBuffer @@ -939,6 +563,10 @@ JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_Libaio return (*env)->NewDirectByteBuffer(env, buffer, size); } +JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_fdatasync(JNIEnv * env, jclass clazz, jint fd) { + fdatasync(fd); +} + JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_freeBuffer (JNIEnv * env, jclass clazz, jobject jbuffer) { if (jbuffer == NULL) diff --git a/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.h b/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.h index 7adfd43..8900b8d 100644 --- a/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.h +++ b/src/main/c/org_apache_activemq_artemis_nativo_jlibaio_LibaioContext.h @@ -8,7 +8,7 @@ extern "C" { #endif #undef org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_EXPECTED_NATIVE_VERSION -#define org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_EXPECTED_NATIVE_VERSION 10L +#define org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_EXPECTED_NATIVE_VERSION 11L /* * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext * Method: shutdownHook @@ -33,6 +33,30 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioCon JNIEXPORT jboolean JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_isForceSyscall (JNIEnv *, jclass); +/* + * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext + * Method: strError + * Signature: (J)Ljava/lang/String; + */ +JNIEXPORT jstring JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_strError + (JNIEnv *, jclass, jlong); + +/* + * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext + * Method: dumbFD + * Signature: ()I + */ +JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_dumbFD + (JNIEnv *, jclass); + +/* + * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext + * Method: memoryAddress0 + * Signature: (Ljava/nio/ByteBuffer;)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_memoryAddress0 + (JNIEnv *, jclass, jobject); + /* * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext * Method: newContext @@ -47,7 +71,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_Libaio * Signature: (Ljava/nio/ByteBuffer;)V */ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_deleteContext - (JNIEnv *, jobject, jobject); + (JNIEnv *, jclass, jobject); /* * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext @@ -84,34 +108,34 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioCon /* * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext * Method: submitWrite - * Signature: (ILjava/nio/ByteBuffer;JILjava/nio/ByteBuffer;Lorg/apache/activemq/artemis/nativo/jlibaio/SubmitInfo;)V + * Signature: (IJJJIJJ)V */ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_submitWrite - (JNIEnv *, jobject, jint, jobject, jlong, jint, jobject, jobject); + (JNIEnv *, jclass, jint, jlong, jlong, jlong, jint, jlong, jlong); /* * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext * Method: submitRead - * Signature: (ILjava/nio/ByteBuffer;JILjava/nio/ByteBuffer;Lorg/apache/activemq/artemis/nativo/jlibaio/SubmitInfo;)V + * Signature: (IJJJIJJ)V */ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_submitRead - (JNIEnv *, jobject, jint, jobject, jlong, jint, jobject, jobject); + (JNIEnv *, jclass, jint, jlong, jlong, jlong, jint, jlong, jlong); /* * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext - * Method: poll - * Signature: (Ljava/nio/ByteBuffer;[Lorg/apache/activemq/artemis/nativo/jlibaio/SubmitInfo;II)I + * Method: submitFDataSync + * Signature: (IJJJ)V */ -JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_poll - (JNIEnv *, jobject, jobject, jobjectArray, jint, jint); +JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_submitFDataSync + (JNIEnv *, jclass, jint, jlong, jlong, jlong); /* * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext - * Method: blockedPoll - * Signature: (Ljava/nio/ByteBuffer;Z)V + * Method: poll + * Signature: (JJII)I */ -JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_blockedPoll - (JNIEnv *, jobject, jobject, jboolean); +JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_poll + (JNIEnv *, jclass, jlong, jlong, jint, jint); /* * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext @@ -153,6 +177,14 @@ JNIEXPORT jlong JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioCo JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_getBlockSizeFD (JNIEnv *, jclass, jint); +/* + * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext + * Method: getIOContextAddress + * Signature: (Ljava/nio/ByteBuffer;)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_getIOContextAddress + (JNIEnv *, jclass, jobject); + /* * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext * Method: getBlockSize @@ -179,11 +211,11 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioCon /* * Class: org_apache_activemq_artemis_nativo_jlibaio_LibaioContext - * Method: writeInternal - * Signature: (IJJLjava/nio/ByteBuffer;)V + * Method: fdatasync + * Signature: (I)V */ -JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_writeInternal - (JNIEnv *, jclass, jint, jlong, jlong, jobject); +JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_nativo_jlibaio_LibaioContext_fdatasync + (JNIEnv *, jclass, jint); #ifdef __cplusplus } diff --git a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/AioRing.java b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/AioRing.java new file mode 100644 index 0000000..9ab9b34 --- /dev/null +++ b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/AioRing.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.nativo.jlibaio; + +import static org.jctools.util.UnsafeAccess.UNSAFE; + +/** + * Unsafe-based access representation for: + *
+ * struct aio_ring {
+ *    unsigned	id;
+ *    unsigned nr;
+ *    unsigned head;
+ *    unsigned tail;
+ *    unsigned magic;
+ *    unsigned compat_features;
+ *    unsigned incompat_features;
+ *    unsigned header_length;
+ *
+ *    struct io_event io_events[0];
+ * };
+ * 
+ */ +public final class AioRing { + + private static final int AIO_RING_MAGIC = 0xa10a10a1; + private static final int AIO_RING_INCOMPAT_FEATURES = 0; + private static final int ID_OFFSET = 0; + private static final int NR_OFFSET = ID_OFFSET + Integer.BYTES; + private static final int HEAD_OFFSET = NR_OFFSET + Integer.BYTES; + private static final int TAIL_OFFSET = HEAD_OFFSET + Integer.BYTES; + private static final int MAGIC_OFFSET = TAIL_OFFSET + Integer.BYTES; + private static final int COMPAT_FEATURES_OFFSET = MAGIC_OFFSET + Integer.BYTES; + private static final int INCOMPAT_FEATURES_OFFSET = COMPAT_FEATURES_OFFSET + Integer.BYTES; + private static final int HEADER_LENGTH_OFFSET = INCOMPAT_FEATURES_OFFSET + Integer.BYTES; + private static final int IO_EVENT_OFFSET = HEADER_LENGTH_OFFSET + Integer.BYTES; + public static final int SIZE_OF_AIO_RING = IO_EVENT_OFFSET + Long.BYTES; + /** + *
+    * struct io_event {
+    *    __u64		data;
+    *    __u64 obj;
+    *    __s64 res;
+    *    __s64 res2;
+    * };
+    * 
+ */ + private static final int SIZE_OF_IO_EVENT_STRUCT = 32; + private final long nrAddress; + private final long headAddress; + private final long tailAddress; + private final long ioEventsAddress; + + public AioRing(long aioRingAddress) { + if (!hasUsableRing(aioRingAddress)) { + throw new IllegalStateException("Unsafe kernel bypass cannot be used!"); + } + this.nrAddress = aioRingAddress + NR_OFFSET; + this.headAddress = aioRingAddress + HEAD_OFFSET; + this.tailAddress = aioRingAddress + TAIL_OFFSET; + this.ioEventsAddress = aioRingAddress + IO_EVENT_OFFSET; + } + + public static boolean hasUsableRing(long aioRingAddress) { + return UNSAFE.getInt(aioRingAddress + MAGIC_OFFSET) == AIO_RING_MAGIC && + UNSAFE.getInt(aioRingAddress + INCOMPAT_FEATURES_OFFSET) == AIO_RING_INCOMPAT_FEATURES; + } + + public int size() { + final long nrAddress = this.nrAddress; + final long headAddress = this.headAddress; + final long tailAddress = this.tailAddress; + final int nr = UNSAFE.getInt(nrAddress); + final int head = UNSAFE.getInt(headAddress); + // no need of membar here because Unsafe::getInt already provide it + final int tail = UNSAFE.getIntVolatile(null, tailAddress); + int available = tail - head; + if (available < 0) { + available += nr; + } + // this is to mitigate a RHEL BUG: see native code for more info + if (available > nr) { + return 0; + } + return available; + } + + public int poll(long completedIoEvents, int min, int max) { + final long nrAddress = this.nrAddress; + final long headAddress = this.headAddress; + final long tailAddress = this.tailAddress; + final int nr = UNSAFE.getInt(nrAddress); + int head = UNSAFE.getInt(headAddress); + // no need of membar here because Unsafe::getInt already provide it + final int tail = UNSAFE.getIntVolatile(null, tailAddress); + int available = tail - head; + if (available < 0) { + // a wrap has occurred + available += nr; + } + if (available < min) { + return 0; + } + if (available == 0) { + return 0; + } + // this is to mitigate a RHEL BUG: see native code for more info + if (available > nr) { + return -1; + } + available = Math.min(available, max); + final long ringIoEvents = this.ioEventsAddress; + final long startRingAioEvents = ringIoEvents + (head * SIZE_OF_IO_EVENT_STRUCT); + head += available; + final long availableBytes = available * SIZE_OF_IO_EVENT_STRUCT; + if (head < nr) { + // no wrap: JDK should account to save long safepoint pauses + UNSAFE.copyMemory(startRingAioEvents, completedIoEvents, availableBytes); + } else { + head -= nr; + // copy trail + final long trailBytes = (available - head) * SIZE_OF_IO_EVENT_STRUCT; + UNSAFE.copyMemory(startRingAioEvents, completedIoEvents, trailBytes); + completedIoEvents += trailBytes; + final long headerBytes = availableBytes - trailBytes; + UNSAFE.copyMemory(ringIoEvents, completedIoEvents, headerBytes); + // copy header + } + assert head >= 0 && head <= nr; + // it allow the kernel to build its own view of the ring buffer size + // and push new events if there are any + UNSAFE.putOrderedInt(null, headAddress, head); + return available; + } + +} diff --git a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/IoCb.java b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/IoCb.java new file mode 100644 index 0000000..de836c5 --- /dev/null +++ b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/IoCb.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.nativo.jlibaio; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + *
+ * struct iocb {
+ *    __u64   aio_data;
+ *    __u32   PADDED(aio_key, aio_rw_flags);
+ *    __u16   aio_lio_opcode;
+ *    __s16   aio_reqprio;
+ *    __u32   aio_fildes;
+ *    __u64   aio_buf;
+ *    __u64   aio_nbytes;
+ *    __s64   aio_offset;
+ *    __u64   aio_reserved2;
+ *    __u32   aio_flags;
+ *    __u32   aio_resfd;
+ * };
+ * 
+ */ +public final class IoCb { + + private IoCb() { + + } + + /** + * Supported aio_lio_opcode + */ + static final short IOCB_CMD_PREAD = 0; + static final short IOCB_CMD_PWRITE = 1; + static final short IOCB_CMD_FDSYNC = 3; + + private static final int DATA_OFFSET = 0; + private static final int PADDED_OFFSET = DATA_OFFSET + 8; + private static final int LIO_OPCODE_OFFSET = PADDED_OFFSET + 8; + private static final int REQ_PRIO_OFFSET = LIO_OPCODE_OFFSET + 2; + private static final int FILDES_OFFSET = REQ_PRIO_OFFSET + 2; + private static final int BUF_OFFSET = FILDES_OFFSET + 4; + private static final int N_BYTES = BUF_OFFSET + 8; + private static final int OFFSET_OFFSET = N_BYTES + 8; + private static final int RESERVED_OFFSET = OFFSET_OFFSET + 8; + private static final int FLAGS_OFFSET = RESERVED_OFFSET + 8; + private static final int REST_FD_OFFSET = FLAGS_OFFSET + 4; + public static final int SIZE_OF_IOCB_STRUCT = REST_FD_OFFSET + 4; + + public static int aioFildes(ByteBuffer byteBuffer) { + return byteBuffer.getInt(FILDES_OFFSET); + } + + public static long aioData(ByteBuffer byteBuffer) { + return byteBuffer.getLong(DATA_OFFSET); + } + + public static short lioOpCode(ByteBuffer byteBuffer) { + return byteBuffer.getShort(LIO_OPCODE_OFFSET); + } + + public static final int SIZE_OF() { + return SIZE_OF_IOCB_STRUCT; + } + + public static final class Array { + + private final ByteBuffer buffer; + + public Array(int capacity) { + this.buffer = ByteBuffer.allocateDirect(capacity * SIZE_OF_IOCB_STRUCT).order(ByteOrder.nativeOrder()); + } + + public ByteBuffer sliceOf(int index) { + final int start = index * SIZE_OF_IOCB_STRUCT; + buffer.clear().position(start).limit(start + SIZE_OF_IOCB_STRUCT); + return buffer.slice().order(ByteOrder.nativeOrder()); + } + } + +} diff --git a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/IoEventArray.java b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/IoEventArray.java new file mode 100644 index 0000000..65f11f9 --- /dev/null +++ b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/IoEventArray.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.nativo.jlibaio; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * Unsafe access representation for an array of: + *
+ * struct io_event {
+ *    __u64		data;		// the data field from the iocb
+* 	   __u64		obj;		// what iocb this event came from
+ *    __s64		res;	   // result code for this event
+ *    __s64		res2;		// secondary result
+ * };
+ * 
+ *

+ * Support is just for 64 bits for now ie sizeof(unsigned) == Integer.BYTES + */ +public final class IoEventArray { + + // TODO use JNI to obtain this + public static final int SIZE_OF_IO_EVENT_STRUCT = 32; + private ByteBuffer array; + private final long arrayAddress; + private final IoEvent ioEvent; + private final int capacity; + + public IoEventArray(int capacity) { + array = ByteBuffer.allocateDirect(capacity * SIZE_OF_IO_EVENT_STRUCT).order(ByteOrder.nativeOrder()); + arrayAddress = LibaioContext.RuntimeDependent.directBufferAddress(array); + ioEvent = new IoEvent(); + this.capacity = capacity; + } + + public long address() { + return arrayAddress; + } + + // Flyweight approach: this could use Unsafe too ;) + public final class IoEvent { + + private static final int DATA_OFFSET = 0; + private static final int OBJ_OFFSET = DATA_OFFSET + 8; + private static final int RES_OFFSET = OBJ_OFFSET + 8; + private int offset; + + private IoEvent() { + offset = -1; + } + + public long data() { + return array.getLong(offset + DATA_OFFSET); + } + + public IoEvent data(long value) { + array.putLong(offset + DATA_OFFSET, value); + return this; + } + + public long obj() { + return array.getLong(offset + OBJ_OFFSET); + } + + public IoEvent obj(long value) { + array.putLong(offset + OBJ_OFFSET, value); + return this; + } + + public long res() { + return array.getLong(offset + RES_OFFSET); + } + + public IoEvent res(long value) { + array.putLong(offset + RES_OFFSET, value); + return this; + } + + } + + public IoEvent get(int index) { + assert index >= 0 && index < capacity; + final int offset = index * SIZE_OF_IO_EVENT_STRUCT; + ioEvent.offset = offset; + return ioEvent; + } + + public void close() { + this.array = null; + } + +} diff --git a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioContext.java b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioContext.java index 794e112..72a1fbe 100644 --- a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioContext.java +++ b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioContext.java @@ -19,12 +19,26 @@ import java.io.Closeable; import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.Buffer; import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Arrays; +import java.util.Objects; +import java.util.Queue; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + +import org.jctools.queues.MpmcArrayQueue; +import org.jctools.queues.atomic.MpmcAtomicArrayQueue; +import org.jctools.util.UnsafeAccess; +import sun.misc.Unsafe; /** * This class is used as an aggregator for the {@link LibaioFile}. @@ -47,7 +61,7 @@ public class LibaioContext implements Closeable { /** * The Native layer will look at this version. */ - private static final int EXPECTED_NATIVE_VERSION = 10; + private static final int EXPECTED_NATIVE_VERSION = 11; private static boolean loaded = false; @@ -55,6 +69,25 @@ public class LibaioContext implements Closeable { private static final AtomicInteger contexts = new AtomicInteger(0); + private static final String FORCE_SYSCALL_PROPERTY_NAME = "org.apache.activemq.artemis.native.jlibaio.FORCE_SYSCALL"; + + private static final class PooledIOCB { + + final int id; + final ByteBuffer bytes; + final long address; + SubmitInfo submitInfo; + + PooledIOCB(int id, ByteBuffer iocbBytes) { + this.id = id; + assert iocbBytes.capacity() == IoCb.SIZE_OF(); + assert iocbBytes.order() == ByteOrder.nativeOrder(); + this.bytes = iocbBytes; + this.address = RuntimeDependent.directBufferAddress(bytes); + this.submitInfo = null; + } + } + public static boolean isLoaded() { return loaded; } @@ -77,11 +110,10 @@ private static boolean loadLibrary(final String name) { static { String[] libraries = new String[]{"artemis-native-64", "artemis-native-32"}; - for (String library : libraries) { if (loadLibrary(library)) { loaded = true; - if (System.getProperty("org.apache.activemq.artemis.native.jlibaio.FORCE_SYSCALL") != null) { + if (System.getProperty(FORCE_SYSCALL_PROPERTY_NAME) != null) { LibaioContext.setForceSyscall(true); } Runtime.getRuntime().addShutdownHook(new Thread() { @@ -96,7 +128,6 @@ public void run() { NativeLogger.LOGGER.debug("Library " + library + " not found!"); } } - if (!loaded) { NativeLogger.LOGGER.debug("Couldn't locate LibAIO Wrapper"); } @@ -144,15 +175,18 @@ public static void resetMaxAIO() { /** * the native ioContext including the structure created. */ - private final ByteBuffer ioContext; - + private final ByteBuffer ioControl; + private final long ioContextAddress; private final AtomicBoolean closed = new AtomicBoolean(false); - - final Semaphore ioSpace; - - final int queueSize; - - final boolean useFdatasync; + private final Semaphore ioSpace; + private final int queueSize; + private final boolean useFdatasync; + private final Queue iocbPool; + private final PooledIOCB[] iocbArray; + private final IoEventArray ioEventArray; + private final AioRing aioRing; + private final int dumbFD; + private final ReentrantLock pollLock; /** * The queue size here will use resources defined on the kernel parameter @@ -166,8 +200,22 @@ public static void resetMaxAIO() { public LibaioContext(int queueSize, boolean useSemaphore, boolean useFdatasync) { try { contexts.incrementAndGet(); - this.ioContext = newContext(queueSize); + this.ioControl = newContext(queueSize); + // Better use JNI here, because the context address size depends on the machine word size + this.ioContextAddress = getIOContextAddress(ioControl); + this.iocbPool = RuntimeDependent.newMpmcQueue(queueSize); + this.iocbArray = new PooledIOCB[queueSize]; + final IoCb.Array arrayOfIocb = new IoCb.Array(queueSize); + for (int i = 0; i < queueSize; i++) { + final PooledIOCB pooledIOCB = new PooledIOCB(i, arrayOfIocb.sliceOf(i)); + this.iocbArray[i] = pooledIOCB; + this.iocbPool.add(pooledIOCB); + } + this.ioEventArray = new IoEventArray(queueSize); this.useFdatasync = useFdatasync; + this.aioRing = createAioRing(); + this.dumbFD = dumbFD(); + this.pollLock = new ReentrantLock(); } catch (Exception e) { throw e; } @@ -180,21 +228,20 @@ public LibaioContext(int queueSize, boolean useSemaphore, boolean useFdatasync) } } - /** - * Documented at {@link LibaioFile#write(long, int, java.nio.ByteBuffer, SubmitInfo)} - * - * @param fd the file descriptor - * @param position the write position - * @param size number of bytes to use - * @param bufferWrite the native buffer - * @param callback a callback - * @throws IOException in case of error - */ - public void submitWrite(int fd, - long position, - int size, - ByteBuffer bufferWrite, - Callback callback) throws IOException { + private AioRing createAioRing() { + if (isForceSyscall()) { + return null; + } + if (!RuntimeDependent.HAS_UNSAFE) { + return null; + } + if (!AioRing.hasUsableRing(ioContextAddress)) { + return null; + } + return new AioRing(ioContextAddress); + } + + public int submitWrite(int fd, long position, int size, ByteBuffer bufferWrite, Callback callback) throws IOException { if (closed.get()) { throw new IOException("Libaio Context is closed!"); } @@ -206,14 +253,28 @@ public void submitWrite(int fd, Thread.currentThread().interrupt(); throw new IOException(e.getMessage(), e); } - submitWrite(fd, this.ioContext, position, size, bufferWrite, callback); + PooledIOCB iocb = iocbPool.poll(); + if (iocb == null) { + assert ioSpace == null; + throw new IOException("Not enough space in libaio queue"); + } + try { + assert iocb.submitInfo == null; + // set submitted *before* submitWrite in order to guarantee safe publication thanks to JNI + iocb.submitInfo = callback; + submitWrite(fd, ioContextAddress, iocb.address, position, size, RuntimeDependent.directBufferAddress(bufferWrite), iocb.id); + return iocb.id; + } catch (IOException ioException) { + iocb.submitInfo = null; + iocbPool.add(iocb); + if (ioSpace != null) { + ioSpace.release(); + } + throw ioException; + } } - public void submitRead(int fd, - long position, - int size, - ByteBuffer bufferWrite, - Callback callback) throws IOException { + public int submitRead(int fd, long position, int size, ByteBuffer bufferWrite, Callback callback) throws IOException { if (closed.get()) { throw new IOException("Libaio Context is closed!"); } @@ -225,7 +286,25 @@ public void submitRead(int fd, Thread.currentThread().interrupt(); throw new IOException(e.getMessage(), e); } - submitRead(fd, this.ioContext, position, size, bufferWrite, callback); + PooledIOCB iocb = iocbPool.poll(); + if (iocb == null) { + assert ioSpace == null; + throw new IOException("Not enough space in libaio queue"); + } + try { + assert iocb.submitInfo == null; + // set submitted *before* submitRead in order to guarantee safe publication thanks to JNI + iocb.submitInfo = callback; + submitRead(fd, ioContextAddress, iocb.address, position, size, RuntimeDependent.directBufferAddress(bufferWrite), iocb.id); + return iocb.id; + } catch (IOException ioException) { + iocb.submitInfo = null; + iocbPool.add(iocb); + if (ioSpace != null) { + ioSpace.release(); + } + throw ioException; + } } /** @@ -247,8 +326,44 @@ public void close() { } totalMaxIO.addAndGet(-queueSize); - if (ioContext != null) { - deleteContext(ioContext); + if (ioControl != null) { + // submit a dumbFD write + PooledIOCB iocb = iocbPool.poll(); + if (iocb == null) { + throw new IllegalStateException("Not enough space in libaio queue to stop the context"); + } + try { + // Submitting a dumb write so the loop finishes + submitWrite(dumbFD, ioContextAddress, iocb.address, 0, 0, 0, iocb.id); + } catch (IOException ioException) { + // TODO handle this + } + // await until there are no more pending I/O or the blocked poll has completed + do { + pollLock.lock(); + try { + // poll is locked, will await any blocked pool to complete + if (unsafePoll(null, 0, 1) == 0) { + Thread.yield(); + } + } finally { + pollLock.unlock(); + } + } while (iocbPool.size() < queueSize); + deleteContext(ioControl); + pollLock.lock(); + try { + if (iocb.bytes == null) { + throw new NullPointerException("THIS IS IMPOSSIBLE: IT'S JUST TO CREATE AN REACHABILITY FENCE"); + } + assert iocbPool.size() == queueSize; + iocbPool.clear(); + Arrays.fill(iocbArray, null); + ioEventArray.close(); + } finally { + pollLock.unlock(); + } + } contexts.decrementAndGet(); checkShutdown(); @@ -285,7 +400,7 @@ public LibaioFile openFile(File file, boolean direct) throws IOExcepti */ public LibaioFile openFile(String file, boolean direct) throws IOException { checkNotNull(file, "path"); - checkNotNull(ioContext, "IOContext"); + checkNotNull(ioControl, "IOContext"); // note: the native layer will throw an IOException in case of errors int res = LibaioContext.open(file, direct); @@ -322,32 +437,71 @@ private static T checkNotNull(T arg, String text) { return arg; } - /** - * It will poll the libaio queue for results. It should block until min is reached - * Results are placed on the callback. - *
- * This shouldn't be called concurrently. You should provide your own synchronization if you need more than one - * Thread polling for any reason. - *
- * Notice that the native layer will invoke {@link SubmitInfo#onError(int, String)} in case of failures, - * but it won't call done method for you. - * - * @param callbacks area to receive the callbacks passed on submission.The size of this callback has to - * be greater than the parameter max. - * @param min the minimum number of elements to receive. It will block until this is achieved. - * @param max The maximum number of elements to receive. - * @return Number of callbacks returned. - * @see LibaioFile#write(long, int, java.nio.ByteBuffer, SubmitInfo) - * @see LibaioFile#read(long, int, java.nio.ByteBuffer, SubmitInfo) - */ - public int poll(Callback[] callbacks, int min, int max) { - int released = poll(ioContext, callbacks, min, max); - if (ioSpace != null) { - if (released > 0) { - ioSpace.release(released); + public int poll(SubmitInfo[] callbacks, int min, int max) { + if (min > max || min < 0 || max > queueSize) { + throw new IllegalArgumentException("cannot request more events then the configured queueSize"); + } + if (closed.get()) { + return 0; + } + pollLock.lock(); + try { + return unsafePoll(callbacks, min, max); + } finally { + pollLock.unlock(); + } + } + + private int unsafePoll(SubmitInfo[] callbacks, int min, int max) { + final AioRing aioRing = this.aioRing; + final IoEventArray ioEventArray = this.ioEventArray; + final Semaphore ioSpace = this.ioSpace; + final PooledIOCB[] iocbArray = this.iocbArray; + final Queue iocbPool = this.iocbPool; + final long ioContextAddress = this.ioContextAddress; + final long ioEventArrayAddress = ioEventArray.address(); + int events = -1; + if (aioRing != null) { + events = aioRing.poll(ioEventArrayAddress, min, max); + } + if (events < min) { + // perform a blocking call + events = poll(ioContextAddress, ioEventArrayAddress, min, max); + } + assert events >= min && events <= max; + for (int i = 0; i < events; i++) { + final IoEventArray.IoEvent ioEvent = ioEventArray.get(i); + assert ioEvent.obj() != 0; + final int id = (int) ioEvent.data(); + final PooledIOCB pooledIOCB = iocbArray[id]; + assert ioEvent.obj() == pooledIOCB.address; + assert IoCb.aioData(pooledIOCB.bytes) == id; + SubmitInfo submitInfo = pooledIOCB.submitInfo; + if (submitInfo != null) { + pooledIOCB.submitInfo = null; + } + // NOTE: + // First we return back the IOCB then we release the semaphore, to let submitInfo::done + // to be able to issue a further write/read. + iocbPool.add(pooledIOCB); + if (ioSpace != null) { + ioSpace.release(); + } + if (callbacks != null) { + // this could be NULL! + callbacks[i] = submitInfo; + } + if (submitInfo != null) { + final long res = ioEvent.res(); + if (res >= 0) { + submitInfo.done(); + } else { + // TODO the error string can be cached? + submitInfo.onError((int) -res, strError(res)); + } } } - return released; + return events; } /** @@ -358,21 +512,111 @@ public int poll(Callback[] callbacks, int min, int max) { * {@link SubmitInfo#done()} are called. */ public void poll() { - if (!closed.get()) { - blockedPoll(ioContext, useFdatasync); + if (closed.get()) { + return; } - } - - /** - * Called from the native layer - */ - private void done(SubmitInfo info) { - info.done(); - if (ioSpace != null) { - ioSpace.release(); + pollLock.lock(); + try { + boolean asyncFDataSync = true; + final int dumbFD = this.dumbFD; + final AioRing aioRing = this.aioRing; + final IoEventArray ioEventArray = this.ioEventArray; + final boolean useFdatasync = this.useFdatasync; + final Semaphore ioSpace = this.ioSpace; + final PooledIOCB[] iocbArray = this.iocbArray; + final Queue iocbPool = this.iocbPool; + final int queueSize = this.queueSize; + final long ioContextAddress = this.ioContextAddress; + final long ioEventArrayAddress = ioEventArray.address(); + while (true) { + int events = 0; + if (aioRing != null) { + // we need to limit max, because that's the capacity of ioEventArray + events = aioRing.poll(ioEventArrayAddress, 0, queueSize); + } + // it could be either a RHEL bug or no new events + if (events <= 0) { + // blocked events here + events = poll(ioContextAddress, ioEventArrayAddress, 1, queueSize); + } + assert events > 0 && events <= queueSize; + boolean stop = false; + for (int i = 0; i < events; i++) { + final IoEventArray.IoEvent ioEvent = ioEventArray.get(i); + assert ioEvent.obj() != 0; + final int id = (int) ioEvent.data(); + PooledIOCB pooledIOCB = iocbArray[id]; + final ByteBuffer pooledIOCBBytes = pooledIOCB.bytes; + assert ioEvent.obj() == pooledIOCB.address; + assert IoCb.aioData(pooledIOCBBytes) == id; + SubmitInfo submitInfo = pooledIOCB.submitInfo; + long res = ioEvent.res(); + if (res >= 0) { + final short liOpCode = IoCb.lioOpCode(pooledIOCBBytes); + final int fd = IoCb.aioFildes(pooledIOCBBytes); + if (liOpCode == IoCb.IOCB_CMD_PWRITE) { + if (fd == dumbFD) { + stop = true; + } else if (useFdatasync) { + // submit an async fdatasync by re-using the current IOPS + assert submitInfo != null; + if (asyncFDataSync) { + try { + submitFDataSync(fd, ioContextAddress, pooledIOCB.address, id); + // allow to reuse the pooledIOCB and IOPS used for pwrite + // and to preserve submitInfo + pooledIOCB = null; + submitInfo = null; + } catch (IOException ioException) { + NativeLogger.LOGGER.error("Impossible to submit an async fdatasync: try with sync one from now on", ioException); + asyncFDataSync = false; + } + } + if (!asyncFDataSync) { + assert pooledIOCB != null && submitInfo != null; + fdatasync(fd); + } + } + } + // PREAD and FDSYNC just keep going + } + if (submitInfo != null) { + pooledIOCB.submitInfo = null; + } + // NOTE: + // First we return back the IOCB then we release the semaphore, to let submitInfo::done + // to be able to issue a further write/read. + if (pooledIOCB != null) { + assert pooledIOCB.submitInfo == null; + iocbPool.add(pooledIOCB); + if (ioSpace != null) { + ioSpace.release(); + } + } + if (submitInfo != null) { + if (res >= 0) { + submitInfo.done(); + } else { + // TODO the error string can be cached? + submitInfo.onError((int) -res, strError(res)); + } + } + } + if (stop) { + return; + } + } + } finally { + pollLock.unlock(); } } + private static native String strError(long eventError); + + private static native int dumbFD(); + + private static native long memoryAddress0(ByteBuffer buffer); + /** * This is the queue for libaio, initialized with queueSize. */ @@ -381,7 +625,7 @@ private void done(SubmitInfo info) { /** * Internal method to be used when closing the controller. */ - private native void deleteContext(ByteBuffer buffer); + private static native void deleteContext(ByteBuffer buffer); /** * it will return a file descriptor. @@ -415,25 +659,26 @@ private void done(SubmitInfo info) { */ public static native void freeBuffer(ByteBuffer buffer); - /** - * Documented at {@link LibaioFile#write(long, int, java.nio.ByteBuffer, SubmitInfo)}. - */ - native void submitWrite(int fd, - ByteBuffer libaioContext, + private static native void submitWrite(int fd, + long ioContextAddress, + long iocbAddress, long position, int size, - ByteBuffer bufferWrite, - Callback callback) throws IOException; + long bufferAddress, + long requestId) throws IOException; - /** - * Documented at {@link LibaioFile#read(long, int, java.nio.ByteBuffer, SubmitInfo)}. - */ - native void submitRead(int fd, - ByteBuffer libaioContext, + private static native void submitRead(int fd, + long ioContextAddress, + long iocbAddress, long position, int size, - ByteBuffer bufferWrite, - Callback callback) throws IOException; + long bufferAddress, + long requestId) throws IOException; + + private static native void submitFDataSync(int fd, + long ioContextAddress, + long iocbAddress, + long requestId) throws IOException; /** * Note: this shouldn't be done concurrently. @@ -441,18 +686,13 @@ native void submitRead(int fd, *

* The callbacks will include the original callback sent at submit (read or write). */ - native int poll(ByteBuffer libaioContext, Callback[] callbacks, int min, int max); + private static native int poll(long ioContextAddress, long ioEventAddress, int min, int max); - /** - * This method will block as long as the context is open. - */ - native void blockedPoll(ByteBuffer libaioContext, boolean useFdatasync); - - static native int getNativeVersion(); + private static native int getNativeVersion(); - public static native boolean lock(int fd); + static native boolean lock(int fd); - public static native void memsetBuffer(ByteBuffer buffer, int size); + private static native void memsetBuffer(ByteBuffer buffer, int size); static native long getSize(int fd); @@ -462,11 +702,104 @@ public static int getBlockSize(File path) { return getBlockSize(path.getAbsolutePath()); } + static native long getIOContextAddress(ByteBuffer ioControl); + public static native int getBlockSize(String path); static native void fallocate(int fd, long size); static native void fill(int fd, int alignment, long size); - static native void writeInternal(int fd, long position, long size, ByteBuffer bufferWrite) throws IOException; + static native void fdatasync(int fd); + + /** + * Utility class built to detect runtime dependent features safely. + */ + static final class RuntimeDependent { + + private static final long BUFFER_ADDRESS_FIELD_OFFSET; + private static final boolean HAS_UNSAFE; + + static { + boolean hasUnsafe = hasUnsafe(); + long bufferAddressFieldOffset = -1; + if (hasUnsafe) { + Field addressField = unsafeAddressField(); + if (addressField != null) { + bufferAddressFieldOffset = unsafeAddressFiledOffset(addressField); + hasUnsafe = true; + } else { + bufferAddressFieldOffset = -1; + hasUnsafe = false; + } + } + BUFFER_ADDRESS_FIELD_OFFSET = bufferAddressFieldOffset; + HAS_UNSAFE = hasUnsafe; + } + + static boolean hasUnsafe() { + try { + Unsafe unsafe = UnsafeAccess.UNSAFE; + if (unsafe == null) { + throw new NullPointerException(); + } + return true; + } catch (Throwable t) { + return false; + } + } + + private static long unsafeAddressFiledOffset(Field addressField) { + Objects.requireNonNull(addressField); + return UnsafeAccess.UNSAFE.objectFieldOffset(addressField); + } + + private static Field unsafeAddressField() { + ByteBuffer direct = ByteBuffer.allocateDirect(1); + final Object maybeAddressField = AccessController.doPrivileged(new PrivilegedAction() { + @Override + public Object run() { + try { + final Field field = Buffer.class.getDeclaredField("address"); + // Use Unsafe to read value of the address field. This way it will not fail on JDK9+ which + // will forbid changing the access level via reflection. + final long offset = UnsafeAccess.UNSAFE.objectFieldOffset(field); + final long address = UnsafeAccess.UNSAFE.getLong(direct, offset); + + // if direct really is a direct buffer, address will be non-zero + if (address == 0) { + return null; + } + return field; + } catch (NoSuchFieldException e) { + return e; + } catch (SecurityException e) { + return e; + } + } + }); + + if (maybeAddressField instanceof Field) { + return (Field) maybeAddressField; + } else { + return null; + } + } + + public static Queue newMpmcQueue(int capacity) { + capacity = Math.max(2, capacity); + return HAS_UNSAFE ? new MpmcArrayQueue<>(capacity) : new MpmcAtomicArrayQueue<>(capacity); + } + + public static long directBufferAddress(ByteBuffer byteBuffer) { + if (HAS_UNSAFE) { + return unsafeDirectBufferAddress(byteBuffer); + } + return memoryAddress0(byteBuffer); + } + + private static long unsafeDirectBufferAddress(ByteBuffer buffer) { + return UnsafeAccess.UNSAFE.getLong(buffer, BUFFER_ADDRESS_FIELD_OFFSET); + } + } } diff --git a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioFile.java b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioFile.java index 373e48c..bf7caab 100644 --- a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioFile.java +++ b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioFile.java @@ -29,9 +29,8 @@ public final class LibaioFile implements AutoClosea * This represents a structure allocated on the native * this is a io_context_t */ - final LibaioContext ctx; - - private int fd; + private final LibaioContext ctx; + private final int fd; LibaioFile(int fd, LibaioContext ctx) { this.ctx = ctx; @@ -73,8 +72,8 @@ public long getSize() { * @param callback A callback to be returned on the poll method. * @throws java.io.IOException in case of error */ - public void write(long position, int size, ByteBuffer buffer, Callback callback) throws IOException { - ctx.submitWrite(fd, position, size, buffer, callback); + public int write(long position, int size, ByteBuffer buffer, Callback callback) throws IOException { + return ctx.submitWrite(fd, position, size, buffer, callback); } /** @@ -93,8 +92,8 @@ public void write(long position, int size, ByteBuffer buffer, Callback callback) * @throws java.io.IOException in case of error * @see LibaioContext#poll(SubmitInfo[], int, int) */ - public void read(long position, int size, ByteBuffer buffer, Callback callback) throws IOException { - ctx.submitRead(fd, position, size, buffer, callback); + public int read(long position, int size, ByteBuffer buffer, Callback callback) throws IOException { + return ctx.submitRead(fd, position, size, buffer, callback); } /** diff --git a/src/test/java/org/apache/activemq/artemis/nativo/jlibaio/test/LibaioStressTest.java b/src/test/java/org/apache/activemq/artemis/nativo/jlibaio/test/LibaioStressTest.java index 4fc1d0c..f4158b9 100644 --- a/src/test/java/org/apache/activemq/artemis/nativo/jlibaio/test/LibaioStressTest.java +++ b/src/test/java/org/apache/activemq/artemis/nativo/jlibaio/test/LibaioStressTest.java @@ -67,7 +67,7 @@ public void setUpFactory() { } @After - public void deleteFactory() { + public void deleteFactory() throws IOException { control.close(); validateLibaio(); } diff --git a/src/test/java/org/apache/activemq/artemis/nativo/jlibaio/test/LibaioTest.java b/src/test/java/org/apache/activemq/artemis/nativo/jlibaio/test/LibaioTest.java index 074597f..ebe3b97 100644 --- a/src/test/java/org/apache/activemq/artemis/nativo/jlibaio/test/LibaioTest.java +++ b/src/test/java/org/apache/activemq/artemis/nativo/jlibaio/test/LibaioTest.java @@ -25,6 +25,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.nativo.jlibaio.AioRing; import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext; import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile; import org.apache.activemq.artemis.nativo.jlibaio.SubmitInfo; @@ -84,7 +85,7 @@ public void setUpFactory() { } @After - public void deleteFactory() { + public void deleteFactory() throws IOException { control.close(); validateLibaio(); } @@ -591,7 +592,7 @@ public void testIOExceptionConditions() throws Exception { // it should be possible to write now after queue space being released fileDescriptor.write(0, 4096, buffer, new TestInfo()); - Assert.assertEquals(1, control.poll(callbacks, 1, 100)); + Assert.assertEquals(1, control.poll(callbacks, 1, 50)); TestInfo errorCallback = new TestInfo(); // odd positions will have failures through O_DIRECT diff --git a/src/test/java/org/apache/activemq/artemis/nativo/jlibaio/test/OpenCloseContextTest.java b/src/test/java/org/apache/activemq/artemis/nativo/jlibaio/test/OpenCloseContextTest.java index e6c27f2..ded2ff0 100644 --- a/src/test/java/org/apache/activemq/artemis/nativo/jlibaio/test/OpenCloseContextTest.java +++ b/src/test/java/org/apache/activemq/artemis/nativo/jlibaio/test/OpenCloseContextTest.java @@ -150,6 +150,7 @@ public void onError(int errno, String message) { @Override public void done() { + System.out.println("empty done"); } }); From 4de26fab544baefd904d0b4e0889803590b2f990 Mon Sep 17 00:00:00 2001 From: franz1981 Date: Thu, 15 Oct 2020 07:30:08 +0200 Subject: [PATCH 2/2] ARTEMIS-2945 Adding batching async fdatasync --- .../artemis/nativo/jlibaio/LibaioContext.java | 141 ++++++++++++++++-- 1 file changed, 130 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioContext.java b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioContext.java index 72a1fbe..fd21070 100644 --- a/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioContext.java +++ b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/LibaioContext.java @@ -25,6 +25,8 @@ import java.nio.ByteOrder; import java.security.AccessController; import java.security.PrivilegedAction; +import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Arrays; import java.util.Objects; import java.util.Queue; @@ -517,7 +519,6 @@ public void poll() { } pollLock.lock(); try { - boolean asyncFDataSync = true; final int dumbFD = this.dumbFD; final AioRing aioRing = this.aioRing; final IoEventArray ioEventArray = this.ioEventArray; @@ -528,6 +529,15 @@ public void poll() { final int queueSize = this.queueSize; final long ioContextAddress = this.ioContextAddress; final long ioEventArrayAddress = ioEventArray.address(); + // there is just one allowerd in-flight fdatasync + int inflightFDSyncFd = -1; + // we accumulate write callbacks in order to not block reads to happen + // until there are no IOCBs + final ArrayDeque awaitingFDSync = new ArrayDeque<>(queueSize); + // this are the already completed writes on the fd that's fdsync'ed + // there are being flushed as soon as the inflightFDSyncFd land. + // Right after, the awaitingFDSync will be processed to issue a new fdatasync + final ArrayDeque completedFDSyncCallbacks = new ArrayDeque<>(queueSize); while (true) { int events = 0; if (aioRing != null) { @@ -541,6 +551,7 @@ public void poll() { } assert events > 0 && events <= queueSize; boolean stop = false; + boolean completedWrites = false; for (int i = 0; i < events; i++) { final IoEventArray.IoEvent ioEvent = ioEventArray.get(i); assert ioEvent.obj() != 0; @@ -551,31 +562,41 @@ public void poll() { assert IoCb.aioData(pooledIOCBBytes) == id; SubmitInfo submitInfo = pooledIOCB.submitInfo; long res = ioEvent.res(); + final short liOpCode = IoCb.lioOpCode(pooledIOCBBytes); if (res >= 0) { - final short liOpCode = IoCb.lioOpCode(pooledIOCBBytes); final int fd = IoCb.aioFildes(pooledIOCBBytes); if (liOpCode == IoCb.IOCB_CMD_PWRITE) { if (fd == dumbFD) { stop = true; } else if (useFdatasync) { - // submit an async fdatasync by re-using the current IOPS - assert submitInfo != null; - if (asyncFDataSync) { + if (inflightFDSyncFd >= 0) { + if (completedWrites) { + if (inflightFDSyncFd == fd) { + completedFDSyncCallbacks.add(pooledIOCB); + } else { + awaitingFDSync.add(pooledIOCB); + } + } else { + awaitingFDSync.add(pooledIOCB); + } + // this create backpressure, in case of awaitingFDSyncs + pooledIOCB = null; + submitInfo = null; + } else { try { submitFDataSync(fd, ioContextAddress, pooledIOCB.address, id); // allow to reuse the pooledIOCB and IOPS used for pwrite // and to preserve submitInfo pooledIOCB = null; submitInfo = null; + completedWrites = true; + inflightFDSyncFd = fd; } catch (IOException ioException) { - NativeLogger.LOGGER.error("Impossible to submit an async fdatasync: try with sync one from now on", ioException); - asyncFDataSync = false; + NativeLogger.LOGGER.error("Impossible to submit async fdatasync", ioException); + // emulate a generic error: this could be improved + res = -1; } } - if (!asyncFDataSync) { - assert pooledIOCB != null && submitInfo != null; - fdatasync(fd); - } } } // PREAD and FDSYNC just keep going @@ -600,9 +621,21 @@ public void poll() { // TODO the error string can be cached? submitInfo.onError((int) -res, strError(res)); } + if (liOpCode == IoCb.IOCB_CMD_FDSYNC) { + inflightFDSyncFd = onCompletedFDSync(awaitingFDSync, completedFDSyncCallbacks, res, ioContextAddress, iocbPool, ioSpace, false); + // that's important to correctly accumulate already completed writes + completedWrites = inflightFDSyncFd >= 0; + } } } if (stop) { + if (!useFdatasync) { + return; + } + // flush any existing pending write calling a sync datasync + while (onCompletedFDSync(awaitingFDSync, completedFDSyncCallbacks, 0, ioContextAddress, iocbPool, ioSpace, true) >= 0) { + + } return; } } @@ -611,6 +644,92 @@ public void poll() { } } + private static int onCompletedFDSync(ArrayDeque awaitingFDSync, + ArrayDeque completedFDSyncCallbacks, + long res, + long ioContextAddress, + Queue iocbPool, + Semaphore ioSpace, + boolean forceSync) { + final String error = res >= 0 ? null : strError(res); + // complete write callbacks related to the completed fdatasync + for (int i = 0, size = completedFDSyncCallbacks.size(); i < size; i++) { + final PooledIOCB pooledIOCB = completedFDSyncCallbacks.poll(); + final SubmitInfo submitInfo = pooledIOCB.submitInfo; + assert submitInfo != null; + pooledIOCB.submitInfo = null; + // NOTE: + // First we return back the IOCB then we release the semaphore, to let submitInfo::done + // to be able to issue a further write/read + iocbPool.add(pooledIOCB); + if (ioSpace != null) { + ioSpace.release(); + } + if (res >= 0) { + submitInfo.done(); + } else { + submitInfo.onError((int) -res, error); + } + } + assert completedFDSyncCallbacks.isEmpty(); + // search the next (if any) fd to fdsync + int inflightFDSyncFd; + do { + PooledIOCB pooledIOCB = awaitingFDSync.poll(); + if (pooledIOCB == null) { + return -1; + } + inflightFDSyncFd = IoCb.aioFildes(pooledIOCB.bytes); + try { + if (!forceSync) { + submitFDataSync(inflightFDSyncFd, ioContextAddress, pooledIOCB.address, pooledIOCB.id); + } else { + fdatasync(inflightFDSyncFd); + forceDone(pooledIOCB, iocbPool, ioSpace); + } + break; + } catch (IOException ioException) { + NativeLogger.LOGGER.error("Impossible to submit async fdatasync", ioException); + // return this back into the pool + forceGenericError(pooledIOCB, iocbPool, ioSpace); + } + } while (true); + assert inflightFDSyncFd >= 0; + // move any awaitingFDSync on this same fd as completedFDSyncCallbacks + for (int i = 0, size = awaitingFDSync.size(); i < size; i++) { + final PooledIOCB pooledIOCB = awaitingFDSync.poll(); + final int fd = IoCb.aioFildes(pooledIOCB.bytes); + if (fd != inflightFDSyncFd) { + // return it back into this same + awaitingFDSync.add(pooledIOCB); + } else { + completedFDSyncCallbacks.add(pooledIOCB); + } + } + return inflightFDSyncFd; + } + + private static void forceDone(PooledIOCB pooledIOCB, Queue iocbPool, Semaphore ioSpace) { + final SubmitInfo submitInfo = pooledIOCB.submitInfo; + pooledIOCB.submitInfo = null; + iocbPool.add(pooledIOCB); + if (ioSpace != null) { + ioSpace.release(); + } + submitInfo.done(); + } + + private static void forceGenericError(PooledIOCB pooledIOCB, Queue iocbPool, Semaphore ioSpace) { + SubmitInfo submitInfo = pooledIOCB.submitInfo; + pooledIOCB.submitInfo = null; + iocbPool.add(pooledIOCB); + if (ioSpace != null) { + ioSpace.release(); + } + // immediately call onError on this write + submitInfo.onError(1, strError(-1)); + } + private static native String strError(long eventError); private static native int dumbFD();