Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Build Test

on:
push:
branches:
- "master"
tags:
- "v*"
pull_request:
branches:
- "master"

jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Setup Go 1.22
uses: actions/setup-go@v5
with:
go-version: "1.22"

- name: Test
run: |
sudo apt update -y
sudo apt install -y golang git
go test -v
15 changes: 8 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
</h4>

<p align="center">
<img alt="Go version" src="https://img.shields.io/github/go-mod/go-version/JustinTimperio/gpq">
<a href="https://pkg.go.dev/github.com/JustinTimperio/gpq"><img src="https://pkg.go.dev/badge/github.com/JustinTimperio/gpq.svg" alt="Go Reference"></a>
<img alt="GitHub License" src="https://img.shields.io/github/license/JustinTimperio/gpq">
<img alt="GitHub Release" src="https://img.shields.io/github/v/release/JustinTimperio/gpq">
<img alt="GitHub Issues or Pull Requests" src="https://img.shields.io/github/issues/JustinTimperio/gpq">
<a href="https://go.dev/dl/"><img alt="Go version" src="https://img.shields.io/github/go-mod/go-version/JustinTimperio/gpq"></a>
<a href="https://pkg.go.dev/github.com/JustinTimperio/gpq"><img alt="Go Reference" src="https://pkg.go.dev/badge/github.com/JustinTimperio/gpq.svg"></a>
<a href="https://github.com/JustinTimperio/gpq/blob/master/LICENSE"><img alt="GitHub License" src="https://img.shields.io/github/license/JustinTimperio/gpq"></a>
<a href="https://github.com/JustinTimperio/gpq/releases"><img alt="GitHub Release" src="https://img.shields.io/github/v/release/JustinTimperio/gpq"></a>
<a href="https://github.com/JustinTimperio/gpq/issues"><img alt="GitHub Issues" src="https://img.shields.io/github/issues/JustinTimperio/gpq"></a>
<a href="https://github.com/JustinTimperio/gpq/actions"><img alt="GitHub Branch Status" src="https://img.shields.io/github/checks-status/JustinTimperio/gpq/master"></a>
</p>

## Notice
Expand Down Expand Up @@ -71,7 +72,7 @@ import "github.com/JustinTimperio/gpq"
```

### Prerequisites
For this you will need Go >= `1.22` and gpq itself uses [hashmap](https://github.com/cornelk/hashmap) and [BadgerDB](https://github.com/dgraph-io/badger).
For this you will need Go >= `1.22` and gpq itself uses [hashmap](https://github.com/cornelk/hashmap), [btree](https://github.com/tidwall/btree) and [BadgerDB](https://github.com/dgraph-io/badger).

### API Reference
- `NewGPQ[d any](options schema.GPQOptions) (uint, *GPQ[d], error)` - Creates a new GPQ with the specified options and returns the number of restored items, the GPQ, and an error if one occurred.
Expand Down Expand Up @@ -102,7 +103,7 @@ func main() {
ShouldEscalate: true,
EscalationRate: time.Duration(time.Second),
CanTimeout: true,
Timeout: time.Duration(time.Second * 1),
Timeout: time.Duration(time.Second * 5),
}

opts := schema.GPQOptions{
Expand Down
12 changes: 6 additions & 6 deletions gpq.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ type GPQ[d any] struct {
// lazyDiskDeleteChan is a channel used to send messages to the lazy disk cache
lazyDiskDeleteChan chan schema.DeleteMessage
// batchHandler allows for synchronization of disk cache batches
batchHandler *BatchHandler[d]
batchHandler *batchHandler[d]
// batchCounter is used to keep track the current batch number
batchCounter *BatchCounter
batchCounter *batchCounter
}

// NewGPQ creates a new GPQ with the given number of buckets
Expand Down Expand Up @@ -72,8 +72,8 @@ func NewGPQ[d any](Options schema.GPQOptions) (uint, *GPQ[d], error) {

lazyDiskSendChan: sender,
lazyDiskDeleteChan: receiver,
batchHandler: NewBatchHandler(diskCache),
batchCounter: NewBatchCounter(Options.LazyDiskBatchSize),
batchHandler: newBatchHandler(diskCache),
batchCounter: newBatchCounter(Options.LazyDiskBatchSize),
}

var restored uint
Expand Down Expand Up @@ -130,7 +130,7 @@ func (g *GPQ[d]) Enqueue(item schema.Item[d]) error {
item.DiskUUID = key

if g.options.LazyDiskCacheEnabled {
item.BatchNumber = g.batchCounter.Increment()
item.BatchNumber = g.batchCounter.increment()
g.lazyDiskSendChan <- item
} else {
err = g.diskCache.WriteSingle(key, item)
Expand Down Expand Up @@ -169,7 +169,7 @@ func (g *GPQ[d]) EnqueueBatch(items []schema.Item[d]) []error {
items[i].DiskUUID = key

if g.options.LazyDiskCacheEnabled {
items[i].BatchNumber = g.batchCounter.Increment()
items[i].BatchNumber = g.batchCounter.increment()
g.lazyDiskSendChan <- items[i]
} else {
err = g.diskCache.WriteSingle(items[i].DiskUUID, items[i])
Expand Down
18 changes: 10 additions & 8 deletions gpq_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gpq_test
import (
"log"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -113,9 +114,9 @@ func TestPrioritize(t *testing.T) {
}

var (
escalated uint
removed uint
received uint
escalated uint64
removed uint64
received uint64
)

var wg sync.WaitGroup
Expand All @@ -135,9 +136,10 @@ func TestPrioritize(t *testing.T) {
if err != nil {
log.Fatalln(err)
}
removed += r
escalated += e
t.Log("Received:", received, "Removed:", removed, "Escalated:", escalated)

atomic.AddUint64(&removed, uint64(r))
atomic.AddUint64(&escalated, uint64(e))
t.Log("Received:", atomic.LoadUint64(&received), "Removed:", atomic.LoadUint64(&removed), "Escalated:", atomic.LoadUint64(&escalated))

case <-shutdown:
break forloop
Expand All @@ -164,15 +166,15 @@ func TestPrioritize(t *testing.T) {
go func() {
defer wg.Done()
for {
if received+removed >= tm {
if atomic.LoadUint64(&received)+atomic.LoadUint64(&removed) >= uint64(tm) {
break
}
time.Sleep(time.Millisecond * 10)
_, err := queue.Dequeue()
if err != nil {
continue
}
received++
atomic.AddUint64(&received, 1)
}
t.Log("Dequeued all items")
shutdown <- struct{}{}
Expand Down
13 changes: 7 additions & 6 deletions gpq_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func TestE2E(t *testing.T) {

var (
received uint64
removed uint
escalated uint
removed uint64
escalated uint64
)

var wg sync.WaitGroup
Expand All @@ -72,9 +72,10 @@ func TestE2E(t *testing.T) {
if err != nil {
log.Fatalln(err)
}
removed += r
escalated += e
t.Log("Received:", received, "Removed:", removed, "Escalated:", escalated)

atomic.AddUint64(&received, uint64(r))
atomic.AddUint64(&escalated, uint64(e))
t.Log("Received:", atomic.LoadUint64(&received), "Removed:", atomic.LoadUint64(&removed), "Escalated:", atomic.LoadUint64(&escalated))

case <-shutdown:
break breaker
Expand Down Expand Up @@ -111,7 +112,7 @@ func TestE2E(t *testing.T) {
go func() {
defer wg.Done()
for {
if atomic.LoadUint64(&received)+uint64(removed) >= total {
if atomic.LoadUint64(&received)+atomic.LoadUint64(&removed) >= total {
break
}
items, err := queue.DequeueBatch(batchSize)
Expand Down
4 changes: 2 additions & 2 deletions gpq_parallel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
// Tests pushing and pulling single messages in parallel
func TestSingleParallel(t *testing.T) {
var (
total uint = 10_000_000
total uint = 1_000_000
syncToDisk bool = false
lazySync bool = false
maxBuckets uint = 10
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestSingleParallel(t *testing.T) {
// Tests pushing and pulling batches of messages in parallel
func TestBatchParallel(t *testing.T) {
var (
total uint = 10_000_000
total uint = 1_000_000
syncToDisk bool = false
lazySync bool = false
maxBuckets uint = 10
Expand Down
18 changes: 9 additions & 9 deletions helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,23 @@ import (
"github.com/JustinTimperio/gpq/schema"
)

type BatchHandler[T any] struct {
type batchHandler[T any] struct {
mux *sync.Mutex
syncedBatches map[uint]bool
deletedBatches map[uint]bool
diskCache *disk.Disk[T]
}

func NewBatchHandler[T any](diskCache *disk.Disk[T]) *BatchHandler[T] {
return &BatchHandler[T]{
func newBatchHandler[T any](diskCache *disk.Disk[T]) *batchHandler[T] {
return &batchHandler[T]{
mux: &sync.Mutex{},
syncedBatches: make(map[uint]bool),
deletedBatches: make(map[uint]bool),
diskCache: diskCache,
}
}

func (bh *BatchHandler[T]) processBatch(batch []*schema.Item[T], batchNumber uint) {
func (bh *batchHandler[T]) processBatch(batch []*schema.Item[T], batchNumber uint) {
bh.mux.Lock()
defer bh.mux.Unlock()

Expand All @@ -36,7 +36,7 @@ func (bh *BatchHandler[T]) processBatch(batch []*schema.Item[T], batchNumber uin
bh.deletedBatches[batchNumber] = false
}

func (bh *BatchHandler[T]) deleteBatch(batch []*schema.DeleteMessage, batchNumber uint, wasRestored bool) {
func (bh *batchHandler[T]) deleteBatch(batch []*schema.DeleteMessage, batchNumber uint, wasRestored bool) {
bh.mux.Lock()
defer bh.mux.Unlock()

Expand All @@ -55,23 +55,23 @@ func (bh *BatchHandler[T]) deleteBatch(batch []*schema.DeleteMessage, batchNumbe

}

type BatchCounter struct {
type batchCounter struct {
mux *sync.Mutex
batchNumber uint
batchCounter uint
batchSize uint
}

func NewBatchCounter(batchSize uint) *BatchCounter {
return &BatchCounter{
func newBatchCounter(batchSize uint) *batchCounter {
return &batchCounter{
mux: &sync.Mutex{},
batchNumber: 0,
batchCounter: 0,
batchSize: batchSize,
}
}

func (bc *BatchCounter) Increment() (batchNumber uint) {
func (bc *batchCounter) increment() (batchNumber uint) {
bc.mux.Lock()
defer bc.mux.Unlock()

Expand Down
Loading