Skip to content

Conversation

@franz1981
Copy link
Contributor

@franz1981 franz1981 commented Oct 12, 2020

@franz1981 franz1981 force-pushed the simgle_async_datasync branch from 2a8f0a0 to 7a48876 Compare October 12, 2020 16:53
@franz1981
Copy link
Contributor Author

franz1981 commented Oct 13, 2020

I've noticed a thing I would like to raise here to be sure I'll remember in the future :)

I see in the original code that on submitWrite we used to do:

    // The GlobalRef will be deleted when poll is called. this is done so
    // the vm wouldn't crash if the Callback passed by the user is GCed between submission
    // and callback.
    // also as the real intention is to hold the reference until the life cycle is complete
    iocb->data = (void *) (*env)->NewGlobalRef(env, callback);

But I see that isn't happening for jobject bufferWrite (ie the direct ByteBuffer used to perform I/O); AFAIK it shouldn't be a problem for 2 reasons:

  1. the Artemis code hold a ByteBuffer reference into IOSequentialCallback::buffer until the callback is being called (done or onError).
  2. such buffers are being allocated using JNI NewDirectByteBuffer on newAlignedBuffer with the comment
    // This will allocate a buffer, aligned by alignment.
    // Buffers created here need to be manually destroyed by destroyBuffer, or this would leak on the process heap away of Java's GC managed memory
    // NOTE: this buffer will contain non initialized data, you must fill it up properly

Hence I can simplify a bit the java code here, because the ByteBuffer content used while submitting I/O won't ever be GCed while being used by the kernel.

@franz1981 franz1981 changed the title ARTEMIS-2945 Artemis native JNI code code be replaced by Java ARTEMIS-2945 Artemis native JNI code can be replaced by Java Oct 13, 2020
@franz1981 franz1981 force-pushed the simgle_async_datasync branch 3 times, most recently from b8c8e50 to e92f3c4 Compare October 15, 2020 05:58
@franz1981
Copy link
Contributor Author

franz1981 commented Oct 15, 2020

@clebertsuconic @michaelandrepearce
I've used the chance that writing Java code simplify changing and trying new logics and I've introduced a new mechanism to batch fdatasyncs while preserving some fairness to read operation.

Let's explain what master does first: on master the LibaioContext blocked poll logic batches completed writes on the same file descriptor altogether; given that fdatasync is blocking, it allows to pile up new incoming write requests in batches, favouring a bursty behaviour. This come for free by not processing the already completed writes, blocked by fdatasync: this would create backpressure both on kernel level (because the completion ring buffer head isn't moved forward) and both on Java side, because the Semaphore ioSpace used to backpressure write/read submissions would run out of permissions if the existing in-flight ones are not processed (ie completed and fdatasync'd).

This nice mechanism raise a legit question:

  • what happen to a read operations that has been correctly submitted?

Assuming that reads are rare on Artemis, they can still happen, during compaction, and a slow read means that compaction would suffer as well: on master a storm of writes could easily prevent a completed read to be processed because the blocked poll loop is busy processing fdatasync, that can take "long" arbitrary time.

There are several ways to fix this issue and TBH the first commit of this same PR already take care of this, but assume a smart submitter of writes/reads, that correctly batch writes and reads on user space (ie Java land - the artemis TimedBuffer).
Given that master instead is relying on a backpressuring mechanism to allow a misconfigured TimedBuffer to perform well, I've tried to batch fdatasync allowing 1 pending asynchronous fdatasync each time while fixing the latency of read, because the blocked poll event loop won't block on a long synchronous blocking operation anymore.

Not sure is the right path but is a nice way to show how easy is to bring in new behaviours :)

@franz1981 franz1981 force-pushed the simgle_async_datasync branch from e92f3c4 to 4de26fa Compare October 15, 2020 09:07
@franz1981
Copy link
Contributor Author

franz1981 commented Oct 15, 2020

Just FYI I've run some tests on a not recent hardware and I still get the best performance with lower number of producers/consumers (without compaction in the mix) from
using the original logic (but written in java) ie

            int lastFile = -1;
            for (int i = 0; i < events; i++) {
               final IoEventArray.IoEvent ioEvent = ioEventArray.get(i);
               assert ioEvent.obj() != 0;
               final int id = (int) ioEvent.data();
               final PooledIOCB pooledIOCB = iocbArray[id];
               assert ioEvent.obj() == pooledIOCB.address;
               assert IoCb.aioData(pooledIOCB.bytes) == id;
               final SubmitInfo submitInfo = pooledIOCB.submitInfo;
               if (submitInfo != null) {
                  pooledIOCB.submitInfo = null;
               }
               final long res = ioEvent.res();
               if (res >= 0) {
                  final int fd = IoCb.aioFildes(pooledIOCB.bytes);
                  if (fd != dumbFD) {
                     if (useFdatasync) {
                        if (lastFile != fd) {
                           lastFile = fd;
                           fdatasync(fd);
                        }
                     }
                  } else {
                     stop = true;
                  }
               }
               iocbPool.add(pooledIOCB);
               if (ioSpace != null) {
                  ioSpace.release();
               }
               if (submitInfo != null) {
                  if (res >= 0) {
                     submitInfo.done();
                  } else {
                     // TODO the error string can be cached?
                     submitInfo.onError((int) -res, strError(res));
                  }
               }
            }

On higher number of producers/consumers both the commits on this PR (ie not batching async fdsync and batching async fdsync) perform better then the original logic (in C or Java).

@franz1981
Copy link
Contributor Author

franz1981 commented Oct 18, 2020

@clebertsuconic @michaelandrepearce

I've just noticed that thanks to the AioRing abstraction we could create an AioRing.CompletionCallback interface and use it to be truly zero-copy, decoding on the fly IoEvents into their primitive fields (data, obj, res, res2) while looping trought completions: with the simplified code on #9 (comment) this should be very simple to be implemented and effective, given that a long fdatasync would likely let some more completions to accumulate while looping.

This is what I mean:
https://github.com/franz1981/activemq-artemis-native-1/blob/58696f37472eb722e6bd0af760b2f2beaa4e2a2b/src/main/java/org/apache/activemq/artemis/nativo/jlibaio/AioRing.java#L107-L145

   public interface AioRingCompletionCallback {

      void handle(long data, long obj, long res, long res2);
   }

   public int poll(AioRingCompletionCallback callback, int min, int max) {
      final long nrAddress = this.nrAddress;
      final long headAddress = this.headAddress;
      final long tailAddress = this.tailAddress;
      final long ioEventsAddress = this.ioEventsAddress;
      final int nr = UNSAFE.getInt(nrAddress);
      int head = UNSAFE.getInt(headAddress);
      // no need of membar here because Unsafe::getInt already provide it
      final int tail = UNSAFE.getIntVolatile(null, tailAddress);
      int available = tail - head;
      if (available < 0) {
         // a wrap has occurred
         available += nr;
      }
      if (available < min) {
         return 0;
      }
      if (available == 0) {
         return 0;
      }
      // this is to mitigate a RHEL BUG: see native code for more info
      if (available > nr) {
         return -1;
      }
      available = Math.min(available, max);

      for (int i = 0; i < available; i++) {
         final long ioEvent = ioEventsAddress + (head * SIZE_OF_IO_EVENT_STRUCT);
         final long data = UNSAFE.getLong(ioEvent + IoEventArray.IoEvent.DATA_OFFSET);
         final long obj = UNSAFE.getLong(ioEvent + IoEventArray.IoEvent.OBJ_OFFSET);
         final long res = UNSAFE.getLong(ioEvent + IoEventArray.IoEvent.RES_OFFSET);
         final long res2 = UNSAFE.getLong(ioEvent + IoEventArray.IoEvent.RES2_OFFSET);
         head++;
         head = head >= nr ? 0 : head;
         UNSAFE.putOrderedInt(null, headAddress, head);
         callback.handle(data, obj, res, res2);
      }
      return available;
   }

@franz1981
Copy link
Contributor Author

I'm going to close this and will later create a bug fix for the read fairness instead: #10 is a more straight one that just try to replace what we already have instead

@franz1981 franz1981 closed this Oct 20, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant