Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,20 @@ const (
)

type QueuePool interface {
Submit(key string, job func())
Submit(key string, job func()) bool
Drain()
Kill()
}

type QueueWorker interface {
Submit(job func())
Submit(job func()) bool
Drain()
Kill()
}

type QueueWorkerParams struct {
QueueSize int
DropWhenFull bool
OnDropped func()
}

type queuePool struct {
Expand Down Expand Up @@ -61,11 +60,11 @@ func (p *queuePool) hash(key string) int {
return int(h.Sum32()) % p.capacity
}

func (p *queuePool) Submit(key string, job func()) {
func (p *queuePool) Submit(key string, job func()) bool {
p.Lock()
if p.kill.IsBroken() {
p.Unlock()
return
return false
}

idx := p.hash(key)
Expand All @@ -76,7 +75,7 @@ func (p *queuePool) Submit(key string, job func()) {
}
p.Unlock()

w.Submit(job)
return w.Submit(job)
}

func (p *queuePool) Drain() {
Expand Down Expand Up @@ -164,13 +163,12 @@ func (w *worker) run() {
}
}

func (w *worker) Submit(job func()) {
func (w *worker) Submit(job func()) bool {
submitted := true
w.Lock()
if w.active {
if w.DropWhenFull && w.deque.Len() == w.QueueSize {
if w.OnDropped != nil {
w.OnDropped()
}
submitted = false
} else {
w.deque.PushBack(job)
}
Expand All @@ -179,6 +177,7 @@ func (w *worker) Submit(job func()) {
w.next <- job
}
w.Unlock()
return submitted
}

func (w *worker) Drain() {
Expand Down
33 changes: 28 additions & 5 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,21 @@ func TestPool(t *testing.T) {
return atomic.LoadUint32(&val2)
}

p.Submit(key1, func() {
submitted := p.Submit(key1, func() {
time.Sleep(time.Millisecond * 500)
add1(1)
})
require.True(t, submitted)

p.Submit(key2, func() {
submitted = p.Submit(key2, func() {
add2(2)
})
require.True(t, submitted)

p.Submit(key1, func() {
submitted = p.Submit(key1, func() {
add1(3)
})
require.True(t, submitted)

time.Sleep(time.Millisecond * 100)
require.Equal(t, val(), get1())
Expand All @@ -58,17 +61,37 @@ func TestPool(t *testing.T) {
time.Sleep(time.Millisecond * 500)
require.Equal(t, val(1, 3), get1())

p.Submit(key1, func() {
submitted = p.Submit(key1, func() {
time.Sleep(time.Millisecond * 500)
add1(4)
})
require.True(t, submitted)

p.Submit(key2, func() {
submitted = p.Submit(key2, func() {
time.Sleep(time.Millisecond * 500)
add2(5)
})
require.True(t, submitted)

p.Drain()
require.Equal(t, val(1, 3, 4), get1())
require.Equal(t, val(2, 5), get2())
}

func TestPoolOverflow(t *testing.T) {
queueSize := 2
p := NewQueuePool(2, QueueWorkerParams{QueueSize: queueSize, DropWhenFull: true})

for i := 0; i < queueSize+2; i++ {
submitted := p.Submit("key", func() {
time.Sleep(time.Millisecond * 500)
})
if i < queueSize+1 {
require.True(t, submitted)
} else {
require.False(t, submitted)
}
}

p.Drain()
}
Loading