Skip to content
Merged

RC4 #24

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ GPQ is a concurrency safe, embeddable priority queue that can be used in a varie
## Benchmarks
Due to the fact that most operations are done in constant time `O(1)` or logarithmic time `O(log n)`, with the exception of the prioritize function which happens in linear time `O(n)`, all GPQ operations are extremely fast. A single GPQ can handle a few million transactions a second and can be tuned depending on your work load. I have included some basic benchmarks using C++, Rust, Zig, Python, and Go to measure GPQ's performance against the standard implementations of other languages that can be found here at: [pq-bench](https://github.com/JustinTimperio/pq-bench)

| | |
|-------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------|
| ![Time-Spent](https://github.com/JustinTimperio/pq-bench/blob/master/docs/Time-Spent-vs-Implementation.png) | ![Queue-Speed-WITHOUT-Reprioritize](./docs/Queue-Speed-Without-Prioritize.png) |
| ![Queue-Speed-WITH-Reprioritize](./docs/Queue-Speed-With-Prioritize.png) | ![Average Time to Send and Receive VS Bucket Count](./docs/Time-to-Send-and-Receive-VS-Bucket-Count.png) |
| | |
|--------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------|
| ![Time-Spent](./docs/Real-Time-vs-Linear-Time.png) | ![Queue-Speed-WITHOUT-Reprioritize](./docs/Queue-Speed-Without-Prioritize.png) |
| ![Queue-Speed-WITH-Reprioritize](./docs/Queue-Speed-With-Prioritize.png) | ![Average Time to Send and Receive VS Bucket Count](./docs/Time-to-Send-and-Receive-VS-Bucket-Count.png) |

## Usage
GPQ at the core is a embeddable priority queue meant to be used at the core of critical workloads that require complex queueing and delivery order guarantees. The best way to use it is just to import it.
Expand All @@ -77,7 +77,7 @@ For this you will need Go >= `1.22` and gpq itself uses [hashmap](https://github
### API Reference
- `NewGPQ[d any](options schema.GPQOptions) (uint, *GPQ[d], error)` - Creates a new GPQ with the specified options and returns the number of restored items, the GPQ, and an error if one occurred.
- `ItemsInQueue() uint` - Returns the number of items in the queue.
- `ItemsInDB() uint` - Returns the number of items in the database.
- `ItemsInDB() (uint, error)` - Returns the number of items in the database.
- `ActiveBuckets() uint` - Returns the number of active buckets.
- `Enqueue(item schema.Item[d]) error` - Enqueues an item into the queue.
- `EnqueueBatch(items []schema.Item[d]) error` - Enqueues a batch of items into the queue.
Expand Down
33 changes: 22 additions & 11 deletions disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package disk

import (
"errors"
"fmt"

"github.com/JustinTimperio/gpq/schema"

Expand All @@ -16,7 +17,7 @@ type Disk[T any] struct {
func NewDiskCache[T any](bLogger badger.Logger, options schema.GPQOptions) (*Disk[T], error) {

if options.DiskCachePath == "" {
return nil, errors.New("Error creating disk cache: path is empty")
return nil, errors.New("disk cache path cannot be empty")
}

opts := badger.DefaultOptions(options.DiskCachePath)
Expand All @@ -29,7 +30,7 @@ func NewDiskCache[T any](bLogger badger.Logger, options schema.GPQOptions) (*Dis
}
db, err := badger.Open(opts)
if err != nil {
return nil, errors.New("Error opening disk cache: " + err.Error())
return nil, fmt.Errorf("failed to open disk cache at %s: %w", options.DiskCachePath, err)
}

return &Disk[T]{diskCache: db}, nil
Expand All @@ -40,10 +41,11 @@ func (d *Disk[T]) Close() error {
return d.diskCache.Close()
}

func (d *Disk[T]) ItemsInDB() uint {
func (d *Disk[T]) ItemsInDB() (uint, error) {
var count uint
_ = d.diskCache.View(func(txn *badger.Txn) error {
err := d.diskCache.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
it := txn.NewIterator(opts)
defer it.Close()

Expand All @@ -53,7 +55,10 @@ func (d *Disk[T]) ItemsInDB() uint {

return nil
})
return count
if err != nil {
return 0, fmt.Errorf("failed to count items in disk cache: %w", err)
}
return count, nil
}

func (d *Disk[T]) ProcessBatch(batch []*schema.Item[T]) error {
Expand All @@ -72,7 +77,9 @@ func (d *Disk[T]) ProcessBatch(batch []*schema.Item[T]) error {
return err
}
txn = d.diskCache.NewTransaction(true)
txn.Set(entry.DiskUUID, b)
if err := txn.Set(entry.DiskUUID, b); err != nil {
return fmt.Errorf("failed to write item after transaction split: %w", err)
}

} else if err != nil {
return err
Expand All @@ -98,7 +105,9 @@ func (d *Disk[T]) DeleteBatch(batch []*schema.DeleteMessage) error {
return err
}
txn = d.diskCache.NewTransaction(true)
txn.Delete(entry.DiskUUID)
if err := txn.Delete(entry.DiskUUID); err != nil {
return fmt.Errorf("failed to delete item after transaction split: %w", err)
}
} else if err != nil {
return err
}
Expand Down Expand Up @@ -156,11 +165,13 @@ func (d *Disk[T]) RestoreFromDisk() ([]*schema.Item[T], error) {
})

if len(value) == 0 {
return errors.New("Error reading from disk cache: value is empty")
return fmt.Errorf("empty value found in disk cache for key=%x", key)
}

obj := new(schema.Item[T])
obj.FromBytes(value)
if err := obj.FromBytes(value); err != nil {
return fmt.Errorf("corrupted data found in disk cache (key=%x): %w", key, err)
}
obj.WasRestored = true
items = append(items, obj)

Expand All @@ -169,8 +180,8 @@ func (d *Disk[T]) RestoreFromDisk() ([]*schema.Item[T], error) {
return nil
})
if err != nil {
return nil, errors.New("Error reading from disk cache: " + err.Error())
return nil, fmt.Errorf("failed to restore items from disk cache: %w", err)
}

return items, err
return items, nil
}
Binary file modified docs/Queue-Speed-With-Prioritize.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/Queue-Speed-Without-Prioritize.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/Real-Time-vs-Linear-Time.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/Time-to-Send-and-Receive-VS-Bucket-Count.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading