Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 17 additions & 22 deletions buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"errors"
"github.com/wildcatdb/wildcat/v2/queue"
"sync/atomic"
"unsafe"
)

// entry is a buffer entry
Expand All @@ -29,9 +28,9 @@ type entry struct {

// Buffer is a concurrent lock-free buffer with ID/Slot-based access
type Buffer struct {
buffer []unsafe.Pointer // Slice of pointers to entries
capacity int64 // Maximum capacity of the buffer
availableSlots *queue.Queue // Queue of available slots for new entries
buffer []atomic.Pointer[entry] // Slice of pointers to entries
capacity int64 // Maximum capacity of the buffer
availableSlots *queue.Queue // Queue of available slots for new entries
}

// New creates a new atomic buffer with the specified capacity
Expand All @@ -41,7 +40,7 @@ func New(capacity int) (*Buffer, error) {
}

buff := &Buffer{
buffer: make([]unsafe.Pointer, capacity),
buffer: make([]atomic.Pointer[entry], capacity),
capacity: int64(capacity),
availableSlots: queue.New(),
}
Expand Down Expand Up @@ -80,7 +79,7 @@ func (buff *Buffer) Add(item interface{}) (int64, error) {
value: item,
}

atomic.StorePointer(&buff.buffer[slot], unsafe.Pointer(e))
buff.buffer[slot].Store(e)

return slot, nil
}
Expand All @@ -91,12 +90,11 @@ func (buff *Buffer) Get(slot int64) (interface{}, error) {
return nil, errors.New("invalid slot ID")
}

ptr := atomic.LoadPointer(&buff.buffer[slot])
if ptr == nil {
e := buff.buffer[slot].Load()
if e == nil {
return nil, errors.New("item not found")
}

e := (*entry)(ptr)
return e.value, nil
}

Expand All @@ -106,13 +104,13 @@ func (buff *Buffer) Remove(slot int64) error {
return errors.New("invalid slot ID")
}

ptr := atomic.LoadPointer(&buff.buffer[slot])
if ptr == nil {
e := buff.buffer[slot].Load()
if e == nil {
return errors.New("item not found")
}

// Atomically clear the slot using CompareAndSwap to prevent race conditions
if !atomic.CompareAndSwapPointer(&buff.buffer[slot], ptr, nil) {
if !buff.buffer[slot].CompareAndSwap(e, nil) {
// Another thread removed the item between our load and CAS
return errors.New("item was already removed")
}
Expand All @@ -130,16 +128,15 @@ func (buff *Buffer) Update(slot int64, newValue interface{}) error {
}

newEntry := &entry{value: newValue}
newPtr := unsafe.Pointer(newEntry)

// Keep trying until we successfully update or determine slot is empty
for {
oldPtr := atomic.LoadPointer(&buff.buffer[slot])
if oldPtr == nil {
oldEntry := buff.buffer[slot].Load()
if oldEntry == nil {
return errors.New("item not found")
}

if atomic.CompareAndSwapPointer(&buff.buffer[slot], oldPtr, newPtr) {
if buff.buffer[slot].CompareAndSwap(oldEntry, newEntry) {
return nil
}
// If CAS failed, retry - another thread might have updated the slot
Expand Down Expand Up @@ -172,9 +169,8 @@ func (buff *Buffer) List() []interface{} {
var result []interface{}

for i := int64(0); i < buff.capacity; i++ {
ptr := atomic.LoadPointer(&buff.buffer[i])
if ptr != nil {
e := (*entry)(ptr)
e := buff.buffer[i].Load()
if e != nil {
result = append(result, e.value)
}
}
Expand All @@ -187,9 +183,8 @@ func (buff *Buffer) List() []interface{} {
// *This is not atomic across all slots
func (buff *Buffer) ForEach(f func(slot int64, item interface{}) bool) {
for i := int64(0); i < buff.capacity; i++ {
ptr := atomic.LoadPointer(&buff.buffer[i])
if ptr != nil {
e := (*entry)(ptr)
e := buff.buffer[i].Load()
if e != nil {
if !f(i, e.value) {
return
}
Expand Down
106 changes: 57 additions & 49 deletions queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,69 +17,69 @@ package queue

import (
"sync/atomic"
"unsafe"
)

// Node represents a node in the queue
type Node struct {
value interface{}
next unsafe.Pointer // *Node
next atomic.Pointer[Node]
}

// Queue implements a concurrent non-blocking queue
type Queue struct {
head unsafe.Pointer // *Node
tail unsafe.Pointer // *Node
size int64 // Atomic counter
head atomic.Pointer[Node]
tail atomic.Pointer[Node]
size int64 // Atomic counter
}

// New creates a new concurrent queue
func New() *Queue {
node := &Node{}
nodePtr := unsafe.Pointer(node)
return &Queue{
head: nodePtr,
tail: nodePtr,
}
q := &Queue{}
q.head.Store(node)
q.tail.Store(node)
return q
}

// List returns a slice of all values in the queue
func (q *Queue) List() []interface{} {
var result []interface{}
headPtr := atomic.LoadPointer(&q.head)
head := (*Node)(headPtr)
nextPtr := atomic.LoadPointer(&head.next)
for nextPtr != nil {
next := (*Node)(nextPtr)
head := q.head.Load()
if head == nil {
return result
}
next := head.next.Load()
for next != nil {
result = append(result, next.value)
nextPtr = atomic.LoadPointer(&next.next)
next = next.next.Load()
}
return result
}

// Enqueue adds a value to the queue
func (q *Queue) Enqueue(value interface{}) {
node := &Node{value: value}
nodePtr := unsafe.Pointer(node)

for {
tailPtr := atomic.LoadPointer(&q.tail)
tail := (*Node)(tailPtr)
nextPtr := atomic.LoadPointer(&tail.next)
tail := q.tail.Load()
if tail == nil {
continue
}
next := tail.next.Load()

// Check if tail is consistent
if tailPtr == atomic.LoadPointer(&q.tail) {
if nextPtr == nil {
if tail == q.tail.Load() {
if next == nil {
// Try to link node at the end of the list
if atomic.CompareAndSwapPointer(&tail.next, nil, nodePtr) {
if tail.next.CompareAndSwap(nil, node) {
// Enqueue is done, try to swing tail to the inserted node
atomic.CompareAndSwapPointer(&q.tail, tailPtr, nodePtr)
q.tail.CompareAndSwap(tail, node)
atomic.AddInt64(&q.size, 1)
return
}
} else {
// Tail was not pointing to the last node, try to advance tail
atomic.CompareAndSwapPointer(&q.tail, tailPtr, nextPtr)
q.tail.CompareAndSwap(tail, next)
}
}
}
Expand All @@ -89,28 +89,32 @@ func (q *Queue) Enqueue(value interface{}) {
// Returns nil if the queue is empty
func (q *Queue) Dequeue() interface{} {
for {
headPtr := atomic.LoadPointer(&q.head)
tailPtr := atomic.LoadPointer(&q.tail)
head := (*Node)(headPtr)
nextPtr := atomic.LoadPointer(&head.next)
head := q.head.Load()
tail := q.tail.Load()
if head == nil {
continue
}
next := head.next.Load()

// Check if head, tail, and next are consistent
if headPtr == atomic.LoadPointer(&q.head) {
if head == q.head.Load() {
// Is queue empty or tail falling behind?
if headPtr == tailPtr {
if head == tail {
// Is queue empty?
if nextPtr == nil {
if next == nil {
return nil // Queue is empty
}
// Tail is falling behind. Try to advance it
atomic.CompareAndSwapPointer(&q.tail, tailPtr, nextPtr)
q.tail.CompareAndSwap(tail, next)
} else {
// Queue is not empty, read value before CAS
next := (*Node)(nextPtr)
if next == nil {
continue
}
value := next.value

// Try to swing Head to the next node
if atomic.CompareAndSwapPointer(&q.head, headPtr, nextPtr) {
if q.head.CompareAndSwap(head, next) {
atomic.AddInt64(&q.size, -1) // Decrement counter
return value // Dequeue is done
}
Expand All @@ -121,35 +125,39 @@ func (q *Queue) Dequeue() interface{} {

// IsEmpty returns true if the queue is empty
func (q *Queue) IsEmpty() bool {
headPtr := atomic.LoadPointer(&q.head)
head := (*Node)(headPtr)
return atomic.LoadPointer(&head.next) == nil
head := q.head.Load()
if head == nil {
return true
}
return head.next.Load() == nil
}

// Peek returns the value at the front of the queue without removing it
// Returns nil if the queue is empty
func (q *Queue) Peek() interface{} {
headPtr := atomic.LoadPointer(&q.head)
head := (*Node)(headPtr)
nextPtr := atomic.LoadPointer(&head.next)
if nextPtr == nil {
head := q.head.Load()
if head == nil {
return nil
}
next := head.next.Load()
if next == nil {
return nil // Queue is empty
}
next := (*Node)(nextPtr)
return next.value
}

// ForEach iterates over the queue and applies the function f to each item
func (q *Queue) ForEach(f func(item interface{}) bool) {
headPtr := atomic.LoadPointer(&q.head)
head := (*Node)(headPtr)
nextPtr := atomic.LoadPointer(&head.next)
for nextPtr != nil {
next := (*Node)(nextPtr)
head := q.head.Load()
if head == nil {
return
}
next := head.next.Load()
for next != nil {
if !f(next.value) {
return
}
nextPtr = atomic.LoadPointer(&next.next)
next = next.next.Load()
}
}

Expand Down
Loading