diff --git a/pool.go b/pool.go index a1cdded..17f2862 100644 --- a/pool.go +++ b/pool.go @@ -13,13 +13,13 @@ const ( ) type QueuePool interface { - Submit(key string, job func()) + Submit(key string, job func()) bool Drain() Kill() } type QueueWorker interface { - Submit(job func()) + Submit(job func()) bool Drain() Kill() } @@ -27,7 +27,6 @@ type QueueWorker interface { type QueueWorkerParams struct { QueueSize int DropWhenFull bool - OnDropped func() } type queuePool struct { @@ -61,11 +60,11 @@ func (p *queuePool) hash(key string) int { return int(h.Sum32()) % p.capacity } -func (p *queuePool) Submit(key string, job func()) { +func (p *queuePool) Submit(key string, job func()) bool { p.Lock() if p.kill.IsBroken() { p.Unlock() - return + return false } idx := p.hash(key) @@ -76,7 +75,7 @@ func (p *queuePool) Submit(key string, job func()) { } p.Unlock() - w.Submit(job) + return w.Submit(job) } func (p *queuePool) Drain() { @@ -164,13 +163,12 @@ func (w *worker) run() { } } -func (w *worker) Submit(job func()) { +func (w *worker) Submit(job func()) bool { + submitted := true w.Lock() if w.active { if w.DropWhenFull && w.deque.Len() == w.QueueSize { - if w.OnDropped != nil { - w.OnDropped() - } + submitted = false } else { w.deque.PushBack(job) } @@ -179,6 +177,7 @@ func (w *worker) Submit(job func()) { w.next <- job } w.Unlock() + return submitted } func (w *worker) Drain() { diff --git a/pool_test.go b/pool_test.go index 30bc52a..ba3c84f 100644 --- a/pool_test.go +++ b/pool_test.go @@ -38,18 +38,21 @@ func TestPool(t *testing.T) { return atomic.LoadUint32(&val2) } - p.Submit(key1, func() { + submitted := p.Submit(key1, func() { time.Sleep(time.Millisecond * 500) add1(1) }) + require.True(t, submitted) - p.Submit(key2, func() { + submitted = p.Submit(key2, func() { add2(2) }) + require.True(t, submitted) - p.Submit(key1, func() { + submitted = p.Submit(key1, func() { add1(3) }) + require.True(t, submitted) time.Sleep(time.Millisecond * 100) require.Equal(t, val(), get1()) @@ -58,17 +61,37 @@ func TestPool(t *testing.T) { time.Sleep(time.Millisecond * 500) require.Equal(t, val(1, 3), get1()) - p.Submit(key1, func() { + submitted = p.Submit(key1, func() { time.Sleep(time.Millisecond * 500) add1(4) }) + require.True(t, submitted) - p.Submit(key2, func() { + submitted = p.Submit(key2, func() { time.Sleep(time.Millisecond * 500) add2(5) }) + require.True(t, submitted) p.Drain() require.Equal(t, val(1, 3, 4), get1()) require.Equal(t, val(2, 5), get2()) } + +func TestPoolOverflow(t *testing.T) { + queueSize := 2 + p := NewQueuePool(2, QueueWorkerParams{QueueSize: queueSize, DropWhenFull: true}) + + for i := 0; i < queueSize+2; i++ { + submitted := p.Submit("key", func() { + time.Sleep(time.Millisecond * 500) + }) + if i < queueSize+1 { + require.True(t, submitted) + } else { + require.False(t, submitted) + } + } + + p.Drain() +}