From 3172c30a5eb56f6ee8080d6e688253ee5d1d5a9a Mon Sep 17 00:00:00 2001 From: yanyunlong Date: Fri, 22 Jan 2021 17:47:30 +0800 Subject: [PATCH 1/2] Fix bucket.Flush --- bucket.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bucket.go b/bucket.go index d5b5d86..3d74032 100644 --- a/bucket.go +++ b/bucket.go @@ -124,10 +124,9 @@ func (b *bucket) Flush(reinsert func(*Timer)) { e = next } + b.SetExpiration(-1) b.mu.Unlock() - b.SetExpiration(-1) // TODO: Improve the coordination with b.Add() - for _, t := range ts { reinsert(t) } From 2b2e27a0fe6308f5705f2c315cb033cad302c50d Mon Sep 17 00:00:00 2001 From: yanyunlong Date: Tue, 23 Feb 2021 11:04:36 +0800 Subject: [PATCH 2/2] Fix the not thread-safe issue between tw.add and tw.advanceClock --- timingwheel.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/timingwheel.go b/timingwheel.go index e2c64e0..0676450 100644 --- a/timingwheel.go +++ b/timingwheel.go @@ -2,6 +2,7 @@ package timingwheel import ( "errors" + "sync" "sync/atomic" "time" "unsafe" @@ -9,6 +10,8 @@ import ( "github.com/RussellLuo/timingwheel/delayqueue" ) +var rwlock *sync.RWMutex + // TimingWheel is an implementation of Hierarchical Timing Wheels. type TimingWheel struct { tick int64 // in milliseconds @@ -109,7 +112,10 @@ func (tw *TimingWheel) add(t *Timer) bool { // addOrRun inserts the timer t into the current timing wheel, or run the // timer's task if it has already expired. func (tw *TimingWheel) addOrRun(t *Timer) { - if !tw.add(t) { + rwlock.RLock() + notExpired := tw.add(t) + rwlock.RUnlock() + if !notExpired { // Already expired // Like the standard time.AfterFunc (https://golang.org/pkg/time/#AfterFunc), @@ -134,6 +140,7 @@ func (tw *TimingWheel) advanceClock(expiration int64) { // Start starts the current timing wheel. func (tw *TimingWheel) Start() { + rwlock = new(sync.RWMutex) tw.waitGroup.Wrap(func() { tw.queue.Poll(tw.exitC, func() int64 { return timeToMs(time.Now().UTC()) @@ -145,7 +152,9 @@ func (tw *TimingWheel) Start() { select { case elem := <-tw.queue.C: b := elem.(*bucket) + rwlock.Lock() tw.advanceClock(b.Expiration()) + rwlock.Unlock() b.Flush(tw.addOrRun) case <-tw.exitC: return