diff --git a/bucket.go b/bucket.go index d5b5d86..3d74032 100644 --- a/bucket.go +++ b/bucket.go @@ -124,10 +124,9 @@ func (b *bucket) Flush(reinsert func(*Timer)) { e = next } + b.SetExpiration(-1) b.mu.Unlock() - b.SetExpiration(-1) // TODO: Improve the coordination with b.Add() - for _, t := range ts { reinsert(t) } diff --git a/timingwheel.go b/timingwheel.go index e2c64e0..0676450 100644 --- a/timingwheel.go +++ b/timingwheel.go @@ -2,6 +2,7 @@ package timingwheel import ( "errors" + "sync" "sync/atomic" "time" "unsafe" @@ -9,6 +10,8 @@ import ( "github.com/RussellLuo/timingwheel/delayqueue" ) +var rwlock *sync.RWMutex + // TimingWheel is an implementation of Hierarchical Timing Wheels. type TimingWheel struct { tick int64 // in milliseconds @@ -109,7 +112,10 @@ func (tw *TimingWheel) add(t *Timer) bool { // addOrRun inserts the timer t into the current timing wheel, or run the // timer's task if it has already expired. func (tw *TimingWheel) addOrRun(t *Timer) { - if !tw.add(t) { + rwlock.RLock() + notExpired := tw.add(t) + rwlock.RUnlock() + if !notExpired { // Already expired // Like the standard time.AfterFunc (https://golang.org/pkg/time/#AfterFunc), @@ -134,6 +140,7 @@ func (tw *TimingWheel) advanceClock(expiration int64) { // Start starts the current timing wheel. func (tw *TimingWheel) Start() { + rwlock = new(sync.RWMutex) tw.waitGroup.Wrap(func() { tw.queue.Poll(tw.exitC, func() int64 { return timeToMs(time.Now().UTC()) @@ -145,7 +152,9 @@ func (tw *TimingWheel) Start() { select { case elem := <-tw.queue.C: b := elem.(*bucket) + rwlock.Lock() tw.advanceClock(b.Expiration()) + rwlock.Unlock() b.Flush(tw.addOrRun) case <-tw.exitC: return