Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 18 additions & 28 deletions src/bounded-queue.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,28 @@
;; bounded queue based on the algorithm described by Dmitry Vyukov.
;; It is immune to livelock, which is why we use it for bounded queue over a michael scott variant.

(defclass bounded-queue ()
((buffer :reader bounded-queue-buffer :initarg :buffer
:type (simple-array t (*)))
(sequences :reader bounded-queue-sequences :initarg :sequences
:type (simple-array t (*)))
(capacity :reader bounded-queue-capacity :initarg :capacity
:type (unsigned-byte 32))
(mask :reader bounded-queue-mask :type (unsigned-byte 32))
;; Head/Tail counters track the total number of pops/pushes initiated.
(head :reader bounded-queue-head :initform (make-atomic-ref 0))
(tail :reader bounded-queue-tail :initform (make-atomic-ref 0))))

(defmethod initialize-instance :after ((queue bounded-queue) &key)
(declare (type bounded-queue queue))
(setf (slot-value queue 'mask)
(the (unsigned-byte 32) (1- (bounded-queue-capacity queue))))
;; Initialize the sequence array. Slot `i` is initially ready for push `i`.
(let ((sequences (bounded-queue-sequences queue)))
(declare (type (simple-array t (*)) sequences))
(dotimes (i (bounded-queue-capacity queue))
(setf (aref sequences i) (make-atomic-ref (the fixnum i))))))
(defstruct (bounded-queue (:constructor %make-bounded-queue (capacity buffer sequences mask)))
(buffer (make-array 0) :type (simple-array t (*)) :read-only t)
(sequences (make-array 0) :type (simple-array t (*)) :read-only t)
(capacity 0 :type fixnum :read-only t)
(mask 0 :type (unsigned-byte 32) :read-only t)
;; Head/Tail counters track the total number of pops/pushes initiated.
(head (make-atomic-ref 0) :type atomic-ref :read-only t)
(tail (make-atomic-ref 0) :type atomic-ref :read-only t))

(defun make-bounded-queue (capacity)
"Creates a new lock-free, bounded queue. Capacity MUST be a power of two."
(declare (type (unsigned-byte 32) capacity))
(unless (and (> capacity 0) (= (logcount capacity) 1))
(error "Bounded queue capacity must be a power of two."))
(make-instance 'bounded-queue
:capacity capacity
:buffer (make-array capacity :initial-element nil)
;; Provide a valid initial-element to satisfy the compiler.
:sequences (make-array capacity :initial-element nil)))
(let ((buffer (make-array capacity :initial-element nil))
(sequences (make-array capacity :initial-element nil))
(mask (1- capacity)))
;; Initialize the sequence array. Slot `i` is initially ready for push `i`.
(dotimes (i capacity)
(declare (type fixnum i))
(setf (aref sequences i) (make-atomic-ref i)))
(%make-bounded-queue (coerce capacity 'fixnum) buffer sequences mask)))

(declaim (inline bounded-queue-push))
(defun bounded-queue-push (queue object)
Expand Down Expand Up @@ -120,7 +110,7 @@
;; Check if there is enough space for the whole batch.
(when (> (+ (- tail head) batch-size) (bounded-queue-capacity queue))
(return-from bounded-queue-push-batch nil))

;; Try to claim the block of slots.
(when (eq tail (cas (bounded-queue-tail queue) tail (+ tail batch-size)))
;; Successfully claimed the block. Now we fill it.
Expand Down Expand Up @@ -148,7 +138,7 @@
(tail (the fixnum (atomic-ref-value (bounded-queue-tail queue))))
(available (- tail head))
(batch-size (min count available)))

(when (zerop batch-size)
(return-from bounded-queue-pop-batch (values nil nil)))

Expand Down
Loading