From 26ef53fe7a280087c4cc8ecd5889e0364a8f317d Mon Sep 17 00:00:00 2001 From: boks1971 Date: Tue, 11 Feb 2025 22:32:32 +0530 Subject: [PATCH 1/3] Return submit success from Submit. Useful to get a synchronous submit result so that there is context to log/record drops. For example, if an event is being queued, when it is dropped due to queue full, it is not possible to get that event's details in `onDropped` callback. With synchronous result, the dropped event details are available. --- pool.go | 15 +++++++++------ pool_test.go | 33 ++++++++++++++++++++++++++++----- 2 files changed, 37 insertions(+), 11 deletions(-) diff --git a/pool.go b/pool.go index a1cdded..9a7bfc3 100644 --- a/pool.go +++ b/pool.go @@ -13,13 +13,13 @@ const ( ) type QueuePool interface { - Submit(key string, job func()) + Submit(key string, job func()) bool Drain() Kill() } type QueueWorker interface { - Submit(job func()) + Submit(job func()) bool Drain() Kill() } @@ -61,11 +61,11 @@ func (p *queuePool) hash(key string) int { return int(h.Sum32()) % p.capacity } -func (p *queuePool) Submit(key string, job func()) { +func (p *queuePool) Submit(key string, job func()) bool { p.Lock() if p.kill.IsBroken() { p.Unlock() - return + return false } idx := p.hash(key) @@ -76,7 +76,7 @@ func (p *queuePool) Submit(key string, job func()) { } p.Unlock() - w.Submit(job) + return w.Submit(job) } func (p *queuePool) Drain() { @@ -164,13 +164,15 @@ func (w *worker) run() { } } -func (w *worker) Submit(job func()) { +func (w *worker) Submit(job func()) bool { + submitted := true w.Lock() if w.active { if w.DropWhenFull && w.deque.Len() == w.QueueSize { if w.OnDropped != nil { w.OnDropped() } + submitted = false } else { w.deque.PushBack(job) } @@ -179,6 +181,7 @@ func (w *worker) Submit(job func()) { w.next <- job } w.Unlock() + return submitted } func (w *worker) Drain() { diff --git a/pool_test.go b/pool_test.go index 30bc52a..563b736 100644 --- a/pool_test.go +++ b/pool_test.go @@ -38,18 +38,21 @@ func TestPool(t *testing.T) { return atomic.LoadUint32(&val2) } - p.Submit(key1, func() { + submitted := p.Submit(key1, func() { time.Sleep(time.Millisecond * 500) add1(1) }) + require.True(t, submitted) - p.Submit(key2, func() { + submitted = p.Submit(key2, func() { add2(2) }) + require.True(t, submitted) - p.Submit(key1, func() { + submitted = p.Submit(key1, func() { add1(3) }) + require.True(t, submitted) time.Sleep(time.Millisecond * 100) require.Equal(t, val(), get1()) @@ -58,17 +61,37 @@ func TestPool(t *testing.T) { time.Sleep(time.Millisecond * 500) require.Equal(t, val(1, 3), get1()) - p.Submit(key1, func() { + submitted = p.Submit(key1, func() { time.Sleep(time.Millisecond * 500) add1(4) }) + require.True(t, submitted) - p.Submit(key2, func() { + submitted = p.Submit(key2, func() { time.Sleep(time.Millisecond * 500) add2(5) }) + require.True(t, submitted) p.Drain() require.Equal(t, val(1, 3, 4), get1()) require.Equal(t, val(2, 5), get2()) } + +func TestPoolOverflow(t *testing.T) { + queueSize := 2 + p := NewQueuePool(2, QueueWorkerParams{QueueSize: queueSize, DropWhenFull: true}) + + for i := 0; i < queueSize+2; i++ { + submitted := p.Submit("key", func() { + time.Sleep(time.Millisecond * 500) + }) + if i < 3 { + require.True(t, submitted) + } else { + require.False(t, submitted) + } + } + + p.Drain() +} From 62d7d439c9ece19d57006077adecdf7169c92af7 Mon Sep 17 00:00:00 2001 From: boks1971 Date: Tue, 11 Feb 2025 22:44:39 +0530 Subject: [PATCH 2/3] use variable --- pool_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pool_test.go b/pool_test.go index 563b736..ba3c84f 100644 --- a/pool_test.go +++ b/pool_test.go @@ -86,7 +86,7 @@ func TestPoolOverflow(t *testing.T) { submitted := p.Submit("key", func() { time.Sleep(time.Millisecond * 500) }) - if i < 3 { + if i < queueSize+1 { require.True(t, submitted) } else { require.False(t, submitted) From f0e98c4d14ef60afc506ac8d7dac59f668f3d8f8 Mon Sep 17 00:00:00 2001 From: boks1971 Date: Wed, 12 Feb 2025 14:14:15 +0530 Subject: [PATCH 3/3] Remove OnDropped in favor of synchrounous return --- pool.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pool.go b/pool.go index 9a7bfc3..17f2862 100644 --- a/pool.go +++ b/pool.go @@ -27,7 +27,6 @@ type QueueWorker interface { type QueueWorkerParams struct { QueueSize int DropWhenFull bool - OnDropped func() } type queuePool struct { @@ -169,9 +168,6 @@ func (w *worker) Submit(job func()) bool { w.Lock() if w.active { if w.DropWhenFull && w.deque.Len() == w.QueueSize { - if w.OnDropped != nil { - w.OnDropped() - } submitted = false } else { w.deque.PushBack(job)