From 7f2a0cf153015cab17345fec72504a6d519b9582 Mon Sep 17 00:00:00 2001 From: cpu1 Date: Wed, 3 Jan 2024 16:52:49 +0530 Subject: [PATCH 1/4] Fix incorrect usage of Timer.Reset There are a few places in the codebase where `Timer.Reset` is being called right before attempting to receive from the `Timer.C` channel. If the timer has already fired, this can lead to a receive from `Timer.C` returning earlier than the duration supplied to `Timer.Reset` as `Reset` does not drain the `Timer.C` channel. Here's an example to demonstrate this: ```golang timer := time.NewTimer(100 * time.Millisecond) time.Sleep(200 * time.Millisecond) timer.Reset(24 * time.Hour) select { case <-time.After(1 * time.Millisecond): case <-timer.C: panic("fired before 24 hours") } ``` Additionally, it's not advisable to call `Timer.Reset` while it's still active on a `Timer` obtained from a call to `time.NewTimer`, as mentioned in the [docs](https://pkg.go.dev/time#Timer.Reset). The correct usage is to call `Reset` only on timers that are stopped or have had their channels drained. This changelist fixes the issue by creating a new `Timer` before waiting on `Timer.C` and `Stop`ping it before the function returns. --- lib/executor/constant_arrival_rate.go | 7 ++++--- lib/executor/ramping_arrival_rate.go | 4 ++-- lib/executor/ramping_vus.go | 4 ++-- output/cloud/expv2/output.go | 4 +--- 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/lib/executor/constant_arrival_rate.go b/lib/executor/constant_arrival_rate.go index 7b576e3a96..8af0d37b0b 100644 --- a/lib/executor/constant_arrival_rate.go +++ b/lib/executor/constant_arrival_rate.go @@ -313,8 +313,7 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- metrics } start, offsets, _ := car.et.GetStripedOffsets() - timer := time.NewTimer(time.Hour * 24) - // here the we need the not scaled one + // Here we need the not scaled one. notScaledTickerPeriod := getTickerPeriod( big.NewRat( car.config.Rate.Int64, @@ -326,7 +325,8 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- metrics metricTags := car.getMetricTags(nil) for li, gi := 0, start; ; li, gi = li+1, gi+offsets[li%len(offsets)] { t := notScaledTickerPeriod*time.Duration(gi) - time.Since(startTime) - timer.Reset(t) + + timer := time.NewTimer(t) select { case <-timer.C: if vusPool.TryRunIteration() { @@ -362,6 +362,7 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- metrics } case <-regDurationCtx.Done(): + timer.Stop() return nil } } diff --git a/lib/executor/ramping_arrival_rate.go b/lib/executor/ramping_arrival_rate.go index 2846517e69..a9cce7a223 100644 --- a/lib/executor/ramping_arrival_rate.go +++ b/lib/executor/ramping_arrival_rate.go @@ -436,7 +436,6 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- metrics } regDurationDone := regDurationCtx.Done() - timer := time.NewTimer(time.Hour) start := time.Now() ch := make(chan time.Duration, 10) // buffer 10 iteration times ahead var prevTime time.Duration @@ -453,10 +452,11 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- metrics prevTime = nextTime b := time.Until(start.Add(nextTime)) if b > 0 { // TODO: have a minimal ? - timer.Reset(b) + timer := time.NewTimer(b) select { case <-timer.C: case <-regDurationDone: + timer.Stop() return nil } } diff --git a/lib/executor/ramping_vus.go b/lib/executor/ramping_vus.go index 2b12bc63c8..de39a374d4 100644 --- a/lib/executor/ramping_vus.go +++ b/lib/executor/ramping_vus.go @@ -692,11 +692,11 @@ func (rs *rampingVUsRunState) scheduledVUsHandlerStrategy() func(lib.ExecutionSt // TODO set start here? // TODO move it to a struct type or something and benchmark if that makes a difference func waiter(ctx context.Context, start time.Time) func(offset time.Duration) bool { - timer := time.NewTimer(time.Hour * 24) return func(offset time.Duration) bool { diff := offset - time.Since(start) if diff > 0 { // wait until time of event arrives // TODO have a mininum - timer.Reset(diff) + timer := time.NewTimer(diff) + defer timer.Stop() select { case <-ctx.Done(): return true // exit if context is cancelled diff --git a/output/cloud/expv2/output.go b/output/cloud/expv2/output.go index 55a9332b04..e8d27142ae 100644 --- a/output/cloud/expv2/output.go +++ b/output/cloud/expv2/output.go @@ -181,11 +181,9 @@ func (o *Output) StopWithTestError(_ error) error { } func (o *Output) runPeriodicFlush() { - t := time.NewTicker(o.config.MetricPushInterval.TimeDuration()) - o.wg.Add(1) - go func() { + t := time.NewTicker(o.config.MetricPushInterval.TimeDuration()) defer func() { t.Stop() o.wg.Done() From e6aaf855c65e3acc8d97f1f14dd79ccc12507f76 Mon Sep 17 00:00:00 2001 From: cpu1 Date: Wed, 3 Jan 2024 18:05:16 +0530 Subject: [PATCH 2/4] Honour Context deadlines when waiting for timers --- cloudapi/logs.go | 37 ++++++++++++++++++++++++--------- cloudapi/logs_test.go | 48 +++++++++++++++++++++++++++++++++---------- cmd/cloud.go | 38 +++++++++++++++++++--------------- 3 files changed, 86 insertions(+), 37 deletions(-) diff --git a/cloudapi/logs.go b/cloudapi/logs.go index dea95e6001..fe30a9334d 100644 --- a/cloudapi/logs.go +++ b/cloudapi/logs.go @@ -104,7 +104,12 @@ func (c *Config) logtailConn(ctx context.Context, referenceID string, since time headers.Add("X-K6TestRun-Id", referenceID) var conn *websocket.Conn - err = retry(sleeperFunc(time.Sleep), 3, 5*time.Second, 2*time.Minute, func() (err error) { + makeTimer := func(d time.Duration) cancellableTimer { + return &wrappedTimer{ + Timer: time.NewTimer(d), + } + } + err = retry(ctx, makeTimer, 3, 5*time.Second, 2*time.Minute, func() (err error) { // We don't need to close the http body or use it for anything until we want to actually log // what the server returned as body when it errors out conn, _, err = websocket.DefaultDialer.DialContext(ctx, u.String(), headers) //nolint:bodyclose @@ -207,24 +212,29 @@ func (c *Config) StreamLogsToLogger( } } -// sleeper represents an abstraction for waiting an amount of time. -type sleeper interface { - Sleep(d time.Duration) +// cancellableTimer represents an abstraction for waiting an amount of time. +type cancellableTimer interface { + C() <-chan time.Time + Stop() bool } -// sleeperFunc uses the underhood function for implementing the wait operation. -type sleeperFunc func(time.Duration) +type wrappedTimer struct { + *time.Timer +} -func (sfn sleeperFunc) Sleep(d time.Duration) { - sfn(d) +func (w *wrappedTimer) C() <-chan time.Time { + return w.Timer.C } +// timerFunc is a function that returns a cancellableTimer for a duration value. +type timerFunc func(time.Duration) cancellableTimer + // retry retries to execute a provided function until it isn't successful // or the maximum number of attempts is hit. It waits the specified interval // between the latest iteration and the next retry. // Interval is used as the base to compute an exponential backoff, // if the computed interval overtakes the max interval then max will be used. -func retry(s sleeper, attempts uint, interval, max time.Duration, do func() error) (err error) { +func retry(ctx context.Context, makeTimer timerFunc, attempts uint, interval, max time.Duration, do func() error) (err error) { baseInterval := math.Abs(interval.Truncate(time.Second).Seconds()) r := rand.New(rand.NewSource(time.Now().UnixNano())) //nolint:gosec @@ -237,7 +247,14 @@ func retry(s sleeper, attempts uint, interval, max time.Duration, do func() erro if wait > max { wait = max } - s.Sleep(wait) + + t := makeTimer(wait) + select { + case <-t.C(): + case <-ctx.Done(): + t.Stop() + return ctx.Err() + } } err = do() if err == nil { diff --git a/cloudapi/logs_test.go b/cloudapi/logs_test.go index d861dba036..96a9eed467 100644 --- a/cloudapi/logs_test.go +++ b/cloudapi/logs_test.go @@ -115,6 +115,30 @@ func TestMSGLog(t *testing.T) { } } +// fakeTimer is a timer for testing that satisfies the cancellableTimer interface. +type fakeTimer struct { + c chan time.Time +} + +// newFakeTimer creates a new fakeTimer that fires immediately. +func newFakeTimer() *fakeTimer { + c := make(chan time.Time, 1) + c <- time.Now() + return &fakeTimer{ + c: c, + } +} + +// C returns the timer channel. +func (t *fakeTimer) C() <-chan time.Time { + return t.c +} + +// Stop does nothing. +func (t *fakeTimer) Stop() bool { + return false +} + func TestRetry(t *testing.T) { t.Parallel() @@ -149,14 +173,15 @@ func TestRetry(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - var sleepRequests []time.Duration - // sleepCollector tracks the request duration value for sleep requests. - sleepCollector := sleeperFunc(func(d time.Duration) { - sleepRequests = append(sleepRequests, d) - }) + var startTimerRequests []time.Duration + // startTimerCollector tracks the request duration value for sleep requests. + startTimerCollector := func(d time.Duration) cancellableTimer { + startTimerRequests = append(startTimerRequests, d) + return newFakeTimer() + } var iterations int - err := retry(sleepCollector, 5, 5*time.Second, 2*time.Minute, func() error { + err := retry(context.Background(), startTimerCollector, 5, 5*time.Second, 2*time.Minute, func() error { iterations++ if iterations < tt.attempts { return fmt.Errorf("unexpected error") @@ -165,13 +190,13 @@ func TestRetry(t *testing.T) { }) require.NoError(t, err) require.Equal(t, tt.attempts, iterations) - require.Equal(t, len(tt.expWaits), len(sleepRequests)) + require.Equal(t, len(tt.expWaits), len(startTimerRequests)) // the added random milliseconds makes difficult to know the exact value // so it asserts that expwait <= actual <= expwait + 1s for i, expwait := range tt.expWaits { - assert.GreaterOrEqual(t, sleepRequests[i], expwait) - assert.LessOrEqual(t, sleepRequests[i], expwait+(1*time.Second)) + assert.GreaterOrEqual(t, startTimerRequests[i], expwait) + assert.LessOrEqual(t, startTimerRequests[i], expwait+(1*time.Second)) } }) } @@ -179,8 +204,9 @@ func TestRetry(t *testing.T) { t.Run("Fail", func(t *testing.T) { t.Parallel() - mock := sleeperFunc(func(time.Duration) { /* noop - nowait */ }) - err := retry(mock, 5, 5*time.Second, 30*time.Second, func() error { + err := retry(context.Background(), func(_ time.Duration) cancellableTimer { + return newFakeTimer() + }, 5, 5*time.Second, 30*time.Second, func() error { return fmt.Errorf("unexpected error") }) diff --git a/cmd/cloud.go b/cmd/cloud.go index edc124e69f..94b8ef51a7 100644 --- a/cmd/cloud.go +++ b/cmd/cloud.go @@ -226,10 +226,10 @@ func (c *cmdCloud) run(cmd *cobra.Command, args []string) error { ) progressCtx, progressCancel := context.WithCancel(globalCtx) - progressBarWG := &sync.WaitGroup{} + defer progressCancel() + var progressBarWG sync.WaitGroup progressBarWG.Add(1) defer progressBarWG.Wait() - defer progressCancel() go func() { showProgress(progressCtx, c.gs, []*pb.ProgressBar{progressBar}, logger) progressBarWG.Done() @@ -272,7 +272,6 @@ func (c *cmdCloud) run(cmd *cobra.Command, args []string) error { }), ) - ticker := time.NewTicker(time.Millisecond * 2000) if c.showCloudLogs { go func() { logger.Debug("Connecting to cloud logs server...") @@ -282,21 +281,28 @@ func (c *cmdCloud) run(cmd *cobra.Command, args []string) error { }() } - for range ticker.C { - newTestProgress, progressErr := client.GetTestProgress(refID) - if progressErr != nil { - logger.WithError(progressErr).Error("Test progress error") - continue - } + ticker := time.NewTicker(2 * time.Second) +loop: + for { + select { + case <-ticker.C: + newTestProgress, progressErr := client.GetTestProgress(refID) + if progressErr != nil { + logger.WithError(progressErr).Error("Test progress error") + continue + } - testProgressLock.Lock() - testProgress = newTestProgress - testProgressLock.Unlock() + testProgressLock.Lock() + testProgress = newTestProgress + testProgressLock.Unlock() - if (newTestProgress.RunStatus > cloudapi.RunStatusRunning) || - (c.exitOnRunning && newTestProgress.RunStatus == cloudapi.RunStatusRunning) { - globalCancel() - break + if (newTestProgress.RunStatus > cloudapi.RunStatusRunning) || + (c.exitOnRunning && newTestProgress.RunStatus == cloudapi.RunStatusRunning) { + globalCancel() + break loop + } + case <-progressCtx.Done(): + break loop } } From 8b719fa96dfcb95a2091e8ab2eb9d96b349e484c Mon Sep 17 00:00:00 2001 From: cpu1 Date: Wed, 3 Jan 2024 18:29:56 +0530 Subject: [PATCH 3/4] Avoid leaking timers Some parts of the codebase do not call `Timer.Stop`, leaking the underlying `Timer` if it has not already fired. --- cmd/root.go | 4 +++- cmd/ui.go | 4 ++-- execution/scheduler.go | 10 ++++++---- js/modules/k6/ws/ws.go | 6 ++++-- js/runner.go | 4 +++- lib/execution.go | 4 +++- lib/netext/httpext/request_test.go | 2 +- lib/testutils/grpcservice/service.go | 11 +++++++++-- 8 files changed, 31 insertions(+), 14 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index b50a7de56a..ee79152d6a 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -144,9 +144,11 @@ func (c *rootCommand) stopLoggers() { close(done) }() close(c.stopLoggersCh) + timer := time.NewTimer(waitLoggerCloseTimeout) + defer timer.Stop() select { case <-done: - case <-time.After(waitLoggerCloseTimeout): + case <-timer.C: c.globalState.FallbackLogger.Errorf("The logger didn't stop in %s", waitLoggerCloseTimeout) } } diff --git a/cmd/ui.go b/cmd/ui.go index d857233999..81048f7736 100644 --- a/cmd/ui.go +++ b/cmd/ui.go @@ -336,10 +336,10 @@ func showProgress(ctx context.Context, gs *state.GlobalState, pbs []*pb.Progress } ticker := time.NewTicker(updateFreq) - ctxDone := ctx.Done() + defer ticker.Stop() for { select { - case <-ctxDone: + case <-ctx.Done(): renderProgressBars(false) gs.OutMutex.Lock() printProgressBars() diff --git a/execution/scheduler.go b/execution/scheduler.go index 5469e452d6..887f2cedbe 100644 --- a/execution/scheduler.go +++ b/execution/scheduler.go @@ -196,8 +196,6 @@ func (e *Scheduler) initVUsConcurrently( func (e *Scheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- metrics.SampleContainer) func() { e.state.Test.Logger.Debug("Starting emission of VUs and VUsMax metrics...") tags := e.state.Test.RunTags - wg := &sync.WaitGroup{} - wg.Add(1) emitMetrics := func() { t := time.Now() @@ -225,8 +223,10 @@ func (e *Scheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- metrics.Sam metrics.PushIfNotDone(ctx, out, samples) } - ticker := time.NewTicker(1 * time.Second) + var wg sync.WaitGroup + wg.Add(1) go func() { + ticker := time.NewTicker(1 * time.Second) defer func() { ticker.Stop() e.state.Test.Logger.Debug("Metrics emission of VUs and VUsMax metrics stopped") @@ -349,11 +349,13 @@ func (e *Scheduler) runExecutor( ) executorLogger.Debugf("Waiting for executor start time...") + timer := time.NewTimer(executorStartTime) + defer timer.Stop() select { case <-runCtx.Done(): runResults <- nil // no error since executor hasn't started yet return - case <-time.After(executorStartTime): + case <-timer.C: // continue } } diff --git a/js/modules/k6/ws/ws.go b/js/modules/k6/ws/ws.go index 1686352c6c..cbe5674466 100644 --- a/js/modules/k6/ws/ws.go +++ b/js/modules/k6/ws/ws.go @@ -425,8 +425,10 @@ func (s *Socket) SetTimeout(fn goja.Callable, timeoutMs float64) error { return fmt.Errorf("setTimeout requires a >0 timeout parameter, received %.2f", timeoutMs) } go func() { + timer := time.NewTimer(d) + defer timer.Stop() select { - case <-time.After(d): + case <-timer.C: select { case s.scheduled <- fn: case <-s.done: @@ -475,7 +477,7 @@ func (s *Socket) SetInterval(fn goja.Callable, intervalMs float64) error { return nil } -// Close closes the webscocket. If providede the first argument will be used as the code for the close message. +// Close closes the webscocket. If provided, the first argument will be used as the code for the close message. func (s *Socket) Close(args ...goja.Value) { code := websocket.CloseGoingAway if len(args) > 0 { diff --git a/js/runner.go b/js/runner.go index 858477831c..65338ee886 100644 --- a/js/runner.go +++ b/js/runner.go @@ -814,8 +814,10 @@ func (u *ActiveVU) RunOnce() error { if isFullIteration && u.Runner.Bundle.Options.MinIterationDuration.Valid { durationDiff := u.Runner.Bundle.Options.MinIterationDuration.TimeDuration() - totalTime if durationDiff > 0 { + timer := time.NewTimer(durationDiff) + defer timer.Stop() select { - case <-time.After(durationDiff): + case <-timer.C: case <-u.RunContext.Done(): } } diff --git a/lib/execution.go b/lib/execution.go index 239041e660..2b8ff2d68f 100644 --- a/lib/execution.go +++ b/lib/execution.go @@ -470,14 +470,16 @@ func (es *ExecutionState) ResumeNotify() <-chan struct{} { // configured maxVUs number is greater than the configured starting VUs. func (es *ExecutionState) GetPlannedVU(logger *logrus.Entry, modifyActiveVUCount bool) (InitializedVU, error) { for i := 1; i <= MaxRetriesGetPlannedVU; i++ { + timer := time.NewTimer(MaxTimeToWaitForPlannedVU) select { case vu := <-es.vus: if modifyActiveVUCount { es.ModCurrentlyActiveVUsCount(+1) } // TODO: set environment and exec + timer.Stop() return vu, nil - case <-time.After(MaxTimeToWaitForPlannedVU): + case <-timer.C: logger.Warnf("Could not get a VU from the buffer for %s", time.Duration(i)*MaxTimeToWaitForPlannedVU) } } diff --git a/lib/netext/httpext/request_test.go b/lib/netext/httpext/request_test.go index 7bb6441acb..8dc44cce45 100644 --- a/lib/netext/httpext/request_test.go +++ b/lib/netext/httpext/request_test.go @@ -533,10 +533,10 @@ func TestMakeRequestRPSLimit(t *testing.T) { } timer := time.NewTimer(3 * time.Second) + defer timer.Stop() for { select { case <-timer.C: - timer.Stop() val := atomic.LoadInt64(&requests) assert.NotEmpty(t, val) assert.InDelta(t, val, 3, 3) diff --git a/lib/testutils/grpcservice/service.go b/lib/testutils/grpcservice/service.go index 267c02f2c2..c252cf3b17 100644 --- a/lib/testutils/grpcservice/service.go +++ b/lib/testutils/grpcservice/service.go @@ -38,11 +38,18 @@ func NewFeatureExplorerServer(features ...*Feature) *FeatureExplorerImplementati } // GetFeature returns the feature at the given point. -func (s *FeatureExplorerImplementation) GetFeature(_ context.Context, point *Point) (*Feature, error) { +func (s *FeatureExplorerImplementation) GetFeature(ctx context.Context, point *Point) (*Feature, error) { s.Logf("GetFeature called with: %+v\n", point) n := rand.Intn(1000) //nolint:gosec - time.Sleep(time.Duration(n) * time.Millisecond) + + timer := time.NewTimer(time.Duration(n) * time.Millisecond) + defer timer.Stop() + select { + case <-timer.C: + case <-ctx.Done(): + return nil, ctx.Err() + } for _, feature := range s.savedFeatures { if proto.Equal(feature.Location, point) { From 5ca910c6e4d1c8a85de1c44ae7bb78ab2fa868cb Mon Sep 17 00:00:00 2001 From: cpu1 Date: Wed, 3 Jan 2024 18:42:47 +0530 Subject: [PATCH 4/4] Honour Request.Context when retrying requests --- cloudapi/client.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cloudapi/client.go b/cloudapi/client.go index b5cd3807fd..eb688c4dca 100644 --- a/cloudapi/client.go +++ b/cloudapi/client.go @@ -101,7 +101,13 @@ func (c *Client) Do(req *http.Request, v interface{}) error { retry, err := c.do(req, v, i) if retry { - time.Sleep(c.retryInterval) + timer := time.NewTimer(c.retryInterval) + select { + case <-timer.C: + case <-req.Context().Done(): + timer.Stop() + return err + } if req.GetBody != nil { req.Body, _ = req.GetBody() }