diff --git a/buffer/buffer.go b/buffer/buffer.go index f420e8b..2957356 100644 --- a/buffer/buffer.go +++ b/buffer/buffer.go @@ -19,7 +19,6 @@ import ( "errors" "github.com/wildcatdb/wildcat/v2/queue" "sync/atomic" - "unsafe" ) // entry is a buffer entry @@ -29,9 +28,9 @@ type entry struct { // Buffer is a concurrent lock-free buffer with ID/Slot-based access type Buffer struct { - buffer []unsafe.Pointer // Slice of pointers to entries - capacity int64 // Maximum capacity of the buffer - availableSlots *queue.Queue // Queue of available slots for new entries + buffer []atomic.Pointer[entry] // Slice of pointers to entries + capacity int64 // Maximum capacity of the buffer + availableSlots *queue.Queue // Queue of available slots for new entries } // New creates a new atomic buffer with the specified capacity @@ -41,7 +40,7 @@ func New(capacity int) (*Buffer, error) { } buff := &Buffer{ - buffer: make([]unsafe.Pointer, capacity), + buffer: make([]atomic.Pointer[entry], capacity), capacity: int64(capacity), availableSlots: queue.New(), } @@ -80,7 +79,7 @@ func (buff *Buffer) Add(item interface{}) (int64, error) { value: item, } - atomic.StorePointer(&buff.buffer[slot], unsafe.Pointer(e)) + buff.buffer[slot].Store(e) return slot, nil } @@ -91,12 +90,11 @@ func (buff *Buffer) Get(slot int64) (interface{}, error) { return nil, errors.New("invalid slot ID") } - ptr := atomic.LoadPointer(&buff.buffer[slot]) - if ptr == nil { + e := buff.buffer[slot].Load() + if e == nil { return nil, errors.New("item not found") } - e := (*entry)(ptr) return e.value, nil } @@ -106,13 +104,13 @@ func (buff *Buffer) Remove(slot int64) error { return errors.New("invalid slot ID") } - ptr := atomic.LoadPointer(&buff.buffer[slot]) - if ptr == nil { + e := buff.buffer[slot].Load() + if e == nil { return errors.New("item not found") } // Atomically clear the slot using CompareAndSwap to prevent race conditions - if !atomic.CompareAndSwapPointer(&buff.buffer[slot], ptr, nil) { + if !buff.buffer[slot].CompareAndSwap(e, nil) { // Another thread removed the item between our load and CAS return errors.New("item was already removed") } @@ -130,16 +128,15 @@ func (buff *Buffer) Update(slot int64, newValue interface{}) error { } newEntry := &entry{value: newValue} - newPtr := unsafe.Pointer(newEntry) // Keep trying until we successfully update or determine slot is empty for { - oldPtr := atomic.LoadPointer(&buff.buffer[slot]) - if oldPtr == nil { + oldEntry := buff.buffer[slot].Load() + if oldEntry == nil { return errors.New("item not found") } - if atomic.CompareAndSwapPointer(&buff.buffer[slot], oldPtr, newPtr) { + if buff.buffer[slot].CompareAndSwap(oldEntry, newEntry) { return nil } // If CAS failed, retry - another thread might have updated the slot @@ -172,9 +169,8 @@ func (buff *Buffer) List() []interface{} { var result []interface{} for i := int64(0); i < buff.capacity; i++ { - ptr := atomic.LoadPointer(&buff.buffer[i]) - if ptr != nil { - e := (*entry)(ptr) + e := buff.buffer[i].Load() + if e != nil { result = append(result, e.value) } } @@ -187,9 +183,8 @@ func (buff *Buffer) List() []interface{} { // *This is not atomic across all slots func (buff *Buffer) ForEach(f func(slot int64, item interface{}) bool) { for i := int64(0); i < buff.capacity; i++ { - ptr := atomic.LoadPointer(&buff.buffer[i]) - if ptr != nil { - e := (*entry)(ptr) + e := buff.buffer[i].Load() + if e != nil { if !f(i, e.value) { return } diff --git a/queue/queue.go b/queue/queue.go index 7248a8d..3f1c307 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -17,42 +17,41 @@ package queue import ( "sync/atomic" - "unsafe" ) // Node represents a node in the queue type Node struct { value interface{} - next unsafe.Pointer // *Node + next atomic.Pointer[Node] } // Queue implements a concurrent non-blocking queue type Queue struct { - head unsafe.Pointer // *Node - tail unsafe.Pointer // *Node - size int64 // Atomic counter + head atomic.Pointer[Node] + tail atomic.Pointer[Node] + size int64 // Atomic counter } // New creates a new concurrent queue func New() *Queue { node := &Node{} - nodePtr := unsafe.Pointer(node) - return &Queue{ - head: nodePtr, - tail: nodePtr, - } + q := &Queue{} + q.head.Store(node) + q.tail.Store(node) + return q } // List returns a slice of all values in the queue func (q *Queue) List() []interface{} { var result []interface{} - headPtr := atomic.LoadPointer(&q.head) - head := (*Node)(headPtr) - nextPtr := atomic.LoadPointer(&head.next) - for nextPtr != nil { - next := (*Node)(nextPtr) + head := q.head.Load() + if head == nil { + return result + } + next := head.next.Load() + for next != nil { result = append(result, next.value) - nextPtr = atomic.LoadPointer(&next.next) + next = next.next.Load() } return result } @@ -60,26 +59,27 @@ func (q *Queue) List() []interface{} { // Enqueue adds a value to the queue func (q *Queue) Enqueue(value interface{}) { node := &Node{value: value} - nodePtr := unsafe.Pointer(node) for { - tailPtr := atomic.LoadPointer(&q.tail) - tail := (*Node)(tailPtr) - nextPtr := atomic.LoadPointer(&tail.next) + tail := q.tail.Load() + if tail == nil { + continue + } + next := tail.next.Load() // Check if tail is consistent - if tailPtr == atomic.LoadPointer(&q.tail) { - if nextPtr == nil { + if tail == q.tail.Load() { + if next == nil { // Try to link node at the end of the list - if atomic.CompareAndSwapPointer(&tail.next, nil, nodePtr) { + if tail.next.CompareAndSwap(nil, node) { // Enqueue is done, try to swing tail to the inserted node - atomic.CompareAndSwapPointer(&q.tail, tailPtr, nodePtr) + q.tail.CompareAndSwap(tail, node) atomic.AddInt64(&q.size, 1) return } } else { // Tail was not pointing to the last node, try to advance tail - atomic.CompareAndSwapPointer(&q.tail, tailPtr, nextPtr) + q.tail.CompareAndSwap(tail, next) } } } @@ -89,28 +89,32 @@ func (q *Queue) Enqueue(value interface{}) { // Returns nil if the queue is empty func (q *Queue) Dequeue() interface{} { for { - headPtr := atomic.LoadPointer(&q.head) - tailPtr := atomic.LoadPointer(&q.tail) - head := (*Node)(headPtr) - nextPtr := atomic.LoadPointer(&head.next) + head := q.head.Load() + tail := q.tail.Load() + if head == nil { + continue + } + next := head.next.Load() // Check if head, tail, and next are consistent - if headPtr == atomic.LoadPointer(&q.head) { + if head == q.head.Load() { // Is queue empty or tail falling behind? - if headPtr == tailPtr { + if head == tail { // Is queue empty? - if nextPtr == nil { + if next == nil { return nil // Queue is empty } // Tail is falling behind. Try to advance it - atomic.CompareAndSwapPointer(&q.tail, tailPtr, nextPtr) + q.tail.CompareAndSwap(tail, next) } else { // Queue is not empty, read value before CAS - next := (*Node)(nextPtr) + if next == nil { + continue + } value := next.value // Try to swing Head to the next node - if atomic.CompareAndSwapPointer(&q.head, headPtr, nextPtr) { + if q.head.CompareAndSwap(head, next) { atomic.AddInt64(&q.size, -1) // Decrement counter return value // Dequeue is done } @@ -121,35 +125,39 @@ func (q *Queue) Dequeue() interface{} { // IsEmpty returns true if the queue is empty func (q *Queue) IsEmpty() bool { - headPtr := atomic.LoadPointer(&q.head) - head := (*Node)(headPtr) - return atomic.LoadPointer(&head.next) == nil + head := q.head.Load() + if head == nil { + return true + } + return head.next.Load() == nil } // Peek returns the value at the front of the queue without removing it // Returns nil if the queue is empty func (q *Queue) Peek() interface{} { - headPtr := atomic.LoadPointer(&q.head) - head := (*Node)(headPtr) - nextPtr := atomic.LoadPointer(&head.next) - if nextPtr == nil { + head := q.head.Load() + if head == nil { + return nil + } + next := head.next.Load() + if next == nil { return nil // Queue is empty } - next := (*Node)(nextPtr) return next.value } // ForEach iterates over the queue and applies the function f to each item func (q *Queue) ForEach(f func(item interface{}) bool) { - headPtr := atomic.LoadPointer(&q.head) - head := (*Node)(headPtr) - nextPtr := atomic.LoadPointer(&head.next) - for nextPtr != nil { - next := (*Node)(nextPtr) + head := q.head.Load() + if head == nil { + return + } + next := head.next.Load() + for next != nil { if !f(next.value) { return } - nextPtr = atomic.LoadPointer(&next.next) + next = next.next.Load() } } diff --git a/skiplist/skiplist.go b/skiplist/skiplist.go index 34d9219..4b6087d 100644 --- a/skiplist/skiplist.go +++ b/skiplist/skiplist.go @@ -20,7 +20,6 @@ import ( "errors" "math/rand" "sync/atomic" - "unsafe" ) const MaxLevel = 16 @@ -40,18 +39,18 @@ const ( // ValueVersion represents a single version of a value in MVCC type ValueVersion struct { - Data []byte // Value data - Timestamp int64 // Version timestamp - Type ValueVersionType // Type of version (write or delete) - Next unsafe.Pointer // Atomic pointer to the previous version + Data []byte // Value data + Timestamp int64 // Version timestamp + Type ValueVersionType // Type of version (write or delete) + Next atomic.Pointer[ValueVersion] // Atomic pointer to the previous version } // Node represents a node in the skip list type Node struct { - forward [MaxLevel]unsafe.Pointer // Array of atomic pointers to Node - backward unsafe.Pointer // Atomic pointer to the previous node - key []byte // Key used for searches - versions unsafe.Pointer // Atomic pointer to the head of version chain + forward [MaxLevel]atomic.Pointer[Node] // Array of atomic pointers to Node + backward atomic.Pointer[Node] // Atomic pointer to the previous node + key []byte // Key used for searches + versions atomic.Pointer[ValueVersion] // Atomic pointer to the head of version chain } // SkipList represents a concurrent skip list data structure with MVCC @@ -124,10 +123,7 @@ func NewWithComparator(cmp KeyComparator) *SkipList { key: []byte{}, } - // Initialize all forward pointers to nil - for i := 0; i < MaxLevel; i++ { - header.forward[i] = nil - } + // Initialize all forward pointers to nil (zero value of atomic.Pointer is nil) sl := &SkipList{ header: header, @@ -159,8 +155,7 @@ func (n *Node) getLatestVersion() *ValueVersion { if n == nil { return nil } - ptr := atomic.LoadPointer(&n.versions) - return (*ValueVersion)(ptr) + return n.versions.Load() } // findVisibleVersion finds the latest version visible at the given timestamp @@ -184,7 +179,7 @@ func (n *Node) findVisibleVersion(readTimestamp int64) *ValueVersion { return version } // Try older version - atomic load - version = atomicLoadVersion(&version.Next) + version = version.Next.Load() } // No visible version found @@ -202,10 +197,10 @@ func (n *Node) addVersion(data []byte, timestamp int64, versionType ValueVersion for { currentHead := n.getLatestVersion() - atomicStoreVersion(&newVersion.Next, currentHead) + newVersion.Next.Store(currentHead) // Try to atomically update the head of the version chain - if atomic.CompareAndSwapPointer(&n.versions, unsafe.Pointer(currentHead), unsafe.Pointer(newVersion)) { + if n.versions.CompareAndSwap(currentHead, newVersion) { // Success - the new version is now the head break } @@ -217,7 +212,6 @@ func (n *Node) addVersion(data []byte, timestamp int64, versionType ValueVersion func (sl *SkipList) Get(searchKey []byte, readTimestamp int64) ([]byte, int64, bool) { var prev *Node var curr *Node - var currPtr unsafe.Pointer prev = sl.header @@ -226,8 +220,7 @@ func (sl *SkipList) Get(searchKey []byte, readTimestamp int64) ([]byte, int64, b // For each level, starting at the highest level in the list for i := currentLevel - 1; i >= 0; i-- { // Get the next node (atomic load) - currPtr = atomic.LoadPointer(&prev.forward[i]) - curr = (*Node)(currPtr) + curr = prev.forward[i].Load() // Traverse the current level for curr != nil { @@ -239,14 +232,12 @@ func (sl *SkipList) Get(searchKey []byte, readTimestamp int64) ([]byte, int64, b // Move forward prev = curr - currPtr = atomic.LoadPointer(&curr.forward[i]) - curr = (*Node)(currPtr) + curr = curr.forward[i].Load() } } // Check bottom level for exact match - currPtr = atomic.LoadPointer(&prev.forward[0]) - curr = (*Node)(currPtr) + curr = prev.forward[0].Load() // Search for the node at level 0 for curr != nil { @@ -267,8 +258,7 @@ func (sl *SkipList) Get(searchKey []byte, readTimestamp int64) ([]byte, int64, b } // Move to the next node - currPtr = atomic.LoadPointer(&curr.forward[0]) - curr = (*Node)(currPtr) + curr = curr.forward[0].Load() } return nil, 0, false @@ -284,30 +274,26 @@ func (sl *SkipList) Put(searchKey []byte, newValue []byte, writeTimestamp int64) var update [MaxLevel]*Node var prev *Node var curr *Node - var currPtr unsafe.Pointer prev = sl.header currentLevel := sl.getLevel() // Navigate to insertion point for i := currentLevel - 1; i >= 0; i-- { - currPtr = atomic.LoadPointer(&prev.forward[i]) - curr = (*Node)(currPtr) + curr = prev.forward[i].Load() for curr != nil { if sl.comparator(curr.key, searchKey) >= 0 { break } prev = curr - currPtr = atomic.LoadPointer(&curr.forward[i]) - curr = (*Node)(currPtr) + curr = curr.forward[i].Load() } update[i] = prev } // Check for existing key at level 0 - currPtr = atomic.LoadPointer(&prev.forward[0]) - curr = (*Node)(currPtr) + curr = prev.forward[0].Load() for curr != nil { cmp := sl.comparator(curr.key, searchKey) @@ -319,8 +305,7 @@ func (sl *SkipList) Put(searchKey []byte, newValue []byte, writeTimestamp int64) break } prev = curr - currPtr = atomic.LoadPointer(&curr.forward[0]) - curr = (*Node)(currPtr) + curr = curr.forward[0].Load() update[0] = prev } @@ -364,20 +349,19 @@ func (sl *SkipList) Put(searchKey []byte, newValue []byte, writeTimestamp int64) break } - nextPtr := atomic.LoadPointer(&next.forward[i]) - nextNode := (*Node)(nextPtr) + nextNode := next.forward[i].Load() // Set new node's forward pointer - atomic.StorePointer(&newNode.forward[i], unsafe.Pointer(nextNode)) + newNode.forward[i].Store(nextNode) // Try to insert atomically - if atomic.CompareAndSwapPointer(&next.forward[i], nextPtr, unsafe.Pointer(newNode)) { + if next.forward[i].CompareAndSwap(nextNode, newNode) { if i == 0 { // Update backward pointer for level 0 atomically if nextNode != nil { - atomic.StorePointer(&nextNode.backward, unsafe.Pointer(newNode)) + nextNode.backward.Store(newNode) } - atomic.StorePointer(&newNode.backward, unsafe.Pointer(next)) + newNode.backward.Store(next) } break } @@ -402,7 +386,6 @@ func (sl *SkipList) Put(searchKey []byte, newValue []byte, writeTimestamp int64) func (sl *SkipList) Delete(searchKey []byte, deleteTimestamp int64) bool { var prev *Node var curr *Node - var currPtr unsafe.Pointer prev = sl.header @@ -412,8 +395,7 @@ func (sl *SkipList) Delete(searchKey []byte, deleteTimestamp int64) bool { // For each level, starting at the highest level in the list for i := currentLevel - 1; i >= 0; i-- { // Get the next node (atomic load) - currPtr = atomic.LoadPointer(&prev.forward[i]) - curr = (*Node)(currPtr) + curr = prev.forward[i].Load() // Traverse the current level for curr != nil { @@ -425,14 +407,12 @@ func (sl *SkipList) Delete(searchKey []byte, deleteTimestamp int64) bool { // Move forward prev = curr - currPtr = atomic.LoadPointer(&curr.forward[i]) - curr = (*Node)(currPtr) + curr = curr.forward[i].Load() } } // Check bottom level for exact match - currPtr = atomic.LoadPointer(&prev.forward[0]) - curr = (*Node)(currPtr) + curr = prev.forward[0].Load() // Search for the node at level 0 for curr != nil { @@ -450,8 +430,7 @@ func (sl *SkipList) Delete(searchKey []byte, deleteTimestamp int64) bool { } // Move to the next node - currPtr = atomic.LoadPointer(&curr.forward[0]) - curr = (*Node)(currPtr) + curr = curr.forward[0].Load() } return false @@ -467,8 +446,7 @@ func (sl *SkipList) NewIterator(startKey []byte, readTimestamp int64) (*Iterator // Traverse to find the node right before the start key for i := sl.getLevel() - 1; i >= 0; i-- { for { - currPtr := atomic.LoadPointer(&curr.forward[i]) - next := (*Node)(currPtr) + next := curr.forward[i].Load() if next == nil || sl.comparator(next.key, startKey) >= 0 { break } @@ -490,8 +468,7 @@ func (sl *SkipList) NewIterator(startKey []byte, readTimestamp int64) (*Iterator func (it *Iterator) ToLast() { // Move to the last node for { - currPtr := atomic.LoadPointer(&it.current.forward[0]) - next := (*Node)(currPtr) + next := it.current.forward[0].Load() if next == nil { break } @@ -535,8 +512,7 @@ func (it *Iterator) Next() ([]byte, []byte, int64, bool) { } // Move to the next node - currPtr := atomic.LoadPointer(&it.current.forward[0]) - it.current = (*Node)(currPtr) + it.current = it.current.forward[0].Load() // Return the key and visible version at the read timestamp if it.current != nil { @@ -560,8 +536,7 @@ func (it *Iterator) Prev() ([]byte, []byte, int64, bool) { } // Move to the previous node - currPtr := atomic.LoadPointer(&it.current.backward) - it.current = (*Node)(currPtr) + it.current = it.current.backward.Load() // Return the key and visible version at the read timestamp if it.current != nil && it.current != it.SkipList.header { @@ -585,16 +560,14 @@ func (it *Iterator) Peek() ([]byte, []byte, int64, bool) { // If we're at the header, advance to first valid node if it.current == it.SkipList.header { - currPtr := atomic.LoadPointer(&it.current.forward[0]) - next := (*Node)(currPtr) + next := it.current.forward[0].Load() for next != nil { version := next.findVisibleVersion(it.readTimestamp) if version != nil && version.Type != Delete { return next.key, version.Data, version.Timestamp, true } - currPtr = atomic.LoadPointer(&next.forward[0]) - next = (*Node)(currPtr) + next = next.forward[0].Load() } return nil, nil, 0, false } @@ -614,8 +587,7 @@ func (sl *SkipList) GetMin(readTimestamp int64) ([]byte, []byte, bool) { curr := sl.header // Get the first node at level 0 - currPtr := atomic.LoadPointer(&curr.forward[0]) - curr = (*Node)(currPtr) + curr = curr.forward[0].Load() // If the list is empty, return false if curr == nil { @@ -630,8 +602,7 @@ func (sl *SkipList) GetMin(readTimestamp int64) ([]byte, []byte, bool) { } // Move to the next node - currPtr = atomic.LoadPointer(&curr.forward[0]) - curr = (*Node)(currPtr) + curr = curr.forward[0].Load() } // No visible nodes found @@ -647,8 +618,7 @@ func (sl *SkipList) GetMax(readTimestamp int64) ([]byte, []byte, bool) { // Traverse the list to find the last node with a visible version for { - currPtr := atomic.LoadPointer(&curr.forward[0]) - curr = (*Node)(currPtr) + curr = curr.forward[0].Load() if curr == nil { break @@ -679,8 +649,7 @@ func (sl *SkipList) Count(readTimestamp int64) int { count := int64(0) // Use int64 for atomic operations // Traverse all nodes at level 0 - currPtr := atomic.LoadPointer(&curr.forward[0]) - curr = (*Node)(currPtr) + curr = curr.forward[0].Load() for curr != nil { // Check if this node has a visible version at the given timestamp @@ -690,8 +659,7 @@ func (sl *SkipList) Count(readTimestamp int64) int { } // Move to the next node - currPtr = atomic.LoadPointer(&curr.forward[0]) - curr = (*Node)(currPtr) + curr = curr.forward[0].Load() } return int(atomic.LoadInt64(&count)) @@ -703,8 +671,7 @@ func (sl *SkipList) DeleteCount(readTimestamp int64) int { count := int64(0) // Traverse all nodes at level 0 - currPtr := atomic.LoadPointer(&curr.forward[0]) - curr = (*Node)(currPtr) + curr = curr.forward[0].Load() for curr != nil { // Check if this node has a visible version at the given timestamp @@ -714,8 +681,7 @@ func (sl *SkipList) DeleteCount(readTimestamp int64) int { } // Move to the next node - currPtr = atomic.LoadPointer(&curr.forward[0]) - curr = (*Node)(currPtr) + curr = curr.forward[0].Load() } return int(atomic.LoadInt64(&count)) @@ -753,8 +719,7 @@ func (sl *SkipList) NewPrefixIterator(prefix []byte, readTimestamp int64) (*Pref // Traverse to find the first node with the prefix or the position where it would be for i := sl.getLevel() - 1; i >= 0; i-- { for { - currPtr := atomic.LoadPointer(&curr.forward[i]) - next := (*Node)(currPtr) + next := curr.forward[i].Load() if next == nil || sl.comparator(next.key, prefix) >= 0 { break } @@ -790,8 +755,7 @@ func (sl *SkipList) NewRangeIterator(startKey, endKey []byte, readTimestamp int6 // Traverse to find the node right before the start key for i := sl.getLevel() - 1; i >= 0; i-- { for { - currPtr := atomic.LoadPointer(&curr.forward[i]) - next := (*Node)(currPtr) + next := curr.forward[i].Load() if next == nil || sl.comparator(next.key, startKey) >= 0 { break } @@ -821,8 +785,7 @@ func (it *PrefixIterator) Key() []byte { func (it *PrefixIterator) ToLast() { // Move to the last node with the prefix for { - currPtr := atomic.LoadPointer(&it.current.forward[0]) - next := (*Node)(currPtr) + next := it.current.forward[0].Load() if next == nil || !hasPrefix(next.key, it.prefix) { break } @@ -859,8 +822,7 @@ func (it *PrefixIterator) Next() ([]byte, []byte, int64, bool) { } // Move to the next node - currPtr := atomic.LoadPointer(&it.current.forward[0]) - it.current = (*Node)(currPtr) + it.current = it.current.forward[0].Load() // Check if we have a valid node and it matches the prefix if it.current != nil { @@ -893,8 +855,7 @@ func (it *PrefixIterator) Prev() ([]byte, []byte, int64, bool) { var lastValidNode *Node // Traverse to find the last node with the prefix - currPtr := atomic.LoadPointer(&curr.forward[0]) - curr = (*Node)(currPtr) + curr = curr.forward[0].Load() for curr != nil { if hasPrefix(curr.key, it.prefix) { @@ -907,8 +868,7 @@ func (it *PrefixIterator) Prev() ([]byte, []byte, int64, bool) { break } - currPtr = atomic.LoadPointer(&curr.forward[0]) - curr = (*Node)(currPtr) + curr = curr.forward[0].Load() } if lastValidNode != nil { @@ -920,8 +880,7 @@ func (it *PrefixIterator) Prev() ([]byte, []byte, int64, bool) { } // Move to the previous node - currPtr := atomic.LoadPointer(&it.current.backward) - it.current = (*Node)(currPtr) + it.current = it.current.backward.Load() // Check if we have a valid node and it's not the header if it.current != nil && it.current != it.SkipList.header { @@ -954,8 +913,7 @@ func (it *PrefixIterator) Peek() ([]byte, []byte, int64, bool) { // If we're at the header, find first valid node with prefix if it.current == it.SkipList.header { - currPtr := atomic.LoadPointer(&it.current.forward[0]) - next := (*Node)(currPtr) + next := it.current.forward[0].Load() for next != nil { if hasPrefix(next.key, it.prefix) { @@ -967,8 +925,7 @@ func (it *PrefixIterator) Peek() ([]byte, []byte, int64, bool) { // Gone past prefix range break } - currPtr = atomic.LoadPointer(&next.forward[0]) - next = (*Node)(currPtr) + next = next.forward[0].Load() } return nil, nil, 0, false } @@ -1012,8 +969,7 @@ func (it *RangeIterator) ToLast() { } // Move to next node - currPtr := atomic.LoadPointer(&curr.forward[0]) - next := (*Node)(currPtr) + next := curr.forward[0].Load() // Stop if next node is out of range if next == nil || !it.SkipList.isInRange(next.key, it.startKey, it.endKey) { @@ -1060,8 +1016,7 @@ func (it *RangeIterator) Next() ([]byte, []byte, int64, bool) { return nil, nil, 0, false } - currPtr := atomic.LoadPointer(&it.current.forward[0]) - it.current = (*Node)(currPtr) + it.current = it.current.forward[0].Load() // Check if we have a valid node and it's within the range if it.current != nil { @@ -1095,8 +1050,7 @@ func (it *RangeIterator) Prev() ([]byte, []byte, int64, bool) { var lastValidNode *Node // Traverse to find the last node in the range - currPtr := atomic.LoadPointer(&curr.forward[0]) - curr = (*Node)(currPtr) + curr = curr.forward[0].Load() for curr != nil { if it.SkipList.isInRange(curr.key, it.startKey, it.endKey) { @@ -1109,8 +1063,7 @@ func (it *RangeIterator) Prev() ([]byte, []byte, int64, bool) { break } - currPtr = atomic.LoadPointer(&curr.forward[0]) - curr = (*Node)(currPtr) + curr = curr.forward[0].Load() } if lastValidNode != nil { @@ -1122,8 +1075,7 @@ func (it *RangeIterator) Prev() ([]byte, []byte, int64, bool) { } // Move to the previous node - currPtr := atomic.LoadPointer(&it.current.backward) - it.current = (*Node)(currPtr) + it.current = it.current.backward.Load() // Check if we have a valid node and it's not the header if it.current != nil && it.current != it.SkipList.header { @@ -1156,8 +1108,7 @@ func (it *RangeIterator) Peek() ([]byte, []byte, int64, bool) { // If we're at the header, find first valid node in range if it.current == it.SkipList.header { - currPtr := atomic.LoadPointer(&it.current.forward[0]) - next := (*Node)(currPtr) + next := it.current.forward[0].Load() for next != nil { if it.SkipList.isInRange(next.key, it.startKey, it.endKey) { @@ -1169,8 +1120,7 @@ func (it *RangeIterator) Peek() ([]byte, []byte, int64, bool) { // Gone past end of range break } - currPtr = atomic.LoadPointer(&next.forward[0]) - next = (*Node)(currPtr) + next = next.forward[0].Load() } return nil, nil, 0, false } @@ -1192,8 +1142,7 @@ func (sl *SkipList) GetLatestTimestamp() int64 { curr := sl.header - currPtr := atomic.LoadPointer(&curr.forward[0]) - curr = (*Node)(currPtr) + curr = curr.forward[0].Load() for curr != nil { // Get the latest version for this node (head of version chain) @@ -1213,8 +1162,7 @@ func (sl *SkipList) GetLatestTimestamp() int64 { } // Move to the next node - currPtr = atomic.LoadPointer(&curr.forward[0]) - curr = (*Node)(currPtr) + curr = curr.forward[0].Load() } return atomic.LoadInt64(&latestTimestamp) @@ -1224,8 +1172,7 @@ func (sl *SkipList) GetLatestTimestamp() int64 { func (it *Iterator) ToFirst() { it.current = it.SkipList.header - currPtr := atomic.LoadPointer(&it.current.forward[0]) - it.current = (*Node)(currPtr) + it.current = it.current.forward[0].Load() if it.current != nil { version := it.current.findVisibleVersion(it.readTimestamp) @@ -1238,8 +1185,7 @@ func (it *Iterator) ToFirst() { // ToFirst moves the prefix iterator to the first node with the matching prefix func (it *PrefixIterator) ToFirst() { it.current = it.SkipList.header - currPtr := atomic.LoadPointer(&it.current.forward[0]) - next := (*Node)(currPtr) + next := it.current.forward[0].Load() for next != nil { if hasPrefix(next.key, it.prefix) { @@ -1253,8 +1199,7 @@ func (it *PrefixIterator) ToFirst() { return } - currPtr = atomic.LoadPointer(&next.forward[0]) - next = (*Node)(currPtr) + next = next.forward[0].Load() } it.current = nil } @@ -1263,8 +1208,7 @@ func (it *PrefixIterator) ToFirst() { func (it *RangeIterator) ToFirst() { it.current = it.SkipList.header - currPtr := atomic.LoadPointer(&it.current.forward[0]) - next := (*Node)(currPtr) + next := it.current.forward[0].Load() for next != nil { if it.SkipList.isInRange(next.key, it.startKey, it.endKey) { @@ -1278,19 +1222,9 @@ func (it *RangeIterator) ToFirst() { return } - currPtr = atomic.LoadPointer(&next.forward[0]) - next = (*Node)(currPtr) + next = next.forward[0].Load() } it.current = nil } -// atomicLoadVersion safely loads a ValueVersion pointer -func atomicLoadVersion(ptr *unsafe.Pointer) *ValueVersion { - return (*ValueVersion)(atomic.LoadPointer(ptr)) -} - -// atomicStoreVersion safely stores a ValueVersion pointer -func atomicStoreVersion(ptr *unsafe.Pointer, version *ValueVersion) { - atomic.StorePointer(ptr, unsafe.Pointer(version)) -} diff --git a/tree/tree.go b/tree/tree.go index 1b31c9e..3a21375 100644 --- a/tree/tree.go +++ b/tree/tree.go @@ -19,8 +19,9 @@ import ( "bytes" "errors" "fmt" - "go.mongodb.org/mongo-driver/bson" "sort" + + "go.mongodb.org/mongo-driver/bson" ) const ReservedMetadataBlockID = 2