Skip to content

Request for guidance on memory ordering and correctness in a lock-free ring buffer implementation #52

@FalkAurel

Description

@FalkAurel

How to Implement Synchronization in a Fixed-Size Ring Buffer

Dear ringbuf team,

I apologize for reaching out in such a direct way. Unfortunately, I couldn’t find a more appropriate or discreet channel for this question.
I’m currently working on an educational project in which I’m implementing a lock-free ring buffer. I’ve run into a few conceptual challenges and would greatly appreciate your guidance. In particular, I’m struggling with the following points:

  1. How can I guarantee the correctness of the head and tail indices, while also reliably detecting when the ring buffer is full (to avoid overwriting existing data)?
  2. How can I ensure that updates to these indices (during enqueue and dequeue operations) are performed correctly in a concurrent setting
  3. How can I guarantee that, once a writer determines it is safe to write, the write operation completes before any reader begins reading the same slot?
    One idea I considered was storing only pointers so that writes become simple pointer swaps, but this seems suboptimal for cache locality.
    Below is the code I’ve written so far to illustrate my current approach.
pub(crate) struct CircularBuffer<T, const SIZE: usize> {
    data: Box<[MaybeUninit<Job<T>>; SIZE]>,
    head: AtomicUsize,
    tail: AtomicUsize,
    is_full: AtomicBool
}

impl <T, const SIZE: usize> CircularBuffer<T, SIZE> {
    pub fn new() -> Self {
        let data: Box<[MaybeUninit<Job<T>>; SIZE]> = repeat_with(|| MaybeUninit::uninit())
        .take(SIZE)
        .collect::<Vec<MaybeUninit<Job<T>>>>()
        .into_boxed_slice()
        .try_into()
        .expect("Shouldn't fail");

        Self { data, head: AtomicUsize::new(0), tail: AtomicUsize::new(0), is_full: AtomicBool::new(false) }
    }

    pub fn enqueue(&mut self, job: Job<T>) -> Result<(), Job<T>> {
        let write_index: usize = self.head.load(Ordering::Acquire); // SIZE - 1 => SIZE
        
        if let Ok(write_index) = self.head.compare_exchange (
            write_index, (write_index + 1) % SIZE, 
            Ordering::Release, 
            Ordering::Relaxed
        ) {
            if let Ok(false) = self.is_full.compare_exchange(
                false, 
                (write_index + 1) % SIZE == self.tail.load(Ordering::Acquire),
                Ordering::Release, 
                Ordering::Relaxed
            ) {
                unsafe {
                    *self.data.get_unchecked_mut(write_index) = MaybeUninit::new(job);
                }
            }
        }

        Ok(())
    }

    pub fn dequeue(&mut self) -> Option<Job<T>> {
        unimplemented!("Implement first enqueue");
    }
}

impl <T, const SIZE: usize> Drop for CircularBuffer<T, SIZE> {
    fn drop(&mut self) {
        while let Some(job) = self.dequeue() {
            drop(job);
        }
    }
}

I would be very grateful for any insights or feedback you might be willing to share. If it’s more convenient, I’d also be happy to continue this discussion on Discord or another platform.

Best regards,
Falk

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions