Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cloudapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,13 @@ func (c *Client) Do(req *http.Request, v interface{}) error {
retry, err := c.do(req, v, i)

if retry {
time.Sleep(c.retryInterval)
timer := time.NewTimer(c.retryInterval)
select {
case <-timer.C:
case <-req.Context().Done():
timer.Stop()
return err
}
if req.GetBody != nil {
req.Body, _ = req.GetBody()
}
Expand Down
37 changes: 27 additions & 10 deletions cloudapi/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,12 @@ func (c *Config) logtailConn(ctx context.Context, referenceID string, since time
headers.Add("X-K6TestRun-Id", referenceID)

var conn *websocket.Conn
err = retry(sleeperFunc(time.Sleep), 3, 5*time.Second, 2*time.Minute, func() (err error) {
makeTimer := func(d time.Duration) cancellableTimer {
return &wrappedTimer{
Timer: time.NewTimer(d),
}
}
err = retry(ctx, makeTimer, 3, 5*time.Second, 2*time.Minute, func() (err error) {
// We don't need to close the http body or use it for anything until we want to actually log
// what the server returned as body when it errors out
conn, _, err = websocket.DefaultDialer.DialContext(ctx, u.String(), headers) //nolint:bodyclose
Expand Down Expand Up @@ -207,24 +212,29 @@ func (c *Config) StreamLogsToLogger(
}
}

// sleeper represents an abstraction for waiting an amount of time.
type sleeper interface {
Sleep(d time.Duration)
// cancellableTimer represents an abstraction for waiting an amount of time.
type cancellableTimer interface {
C() <-chan time.Time
Stop() bool
}

// sleeperFunc uses the underhood function for implementing the wait operation.
type sleeperFunc func(time.Duration)
type wrappedTimer struct {
*time.Timer
}

func (sfn sleeperFunc) Sleep(d time.Duration) {
sfn(d)
func (w *wrappedTimer) C() <-chan time.Time {
return w.Timer.C
}

// timerFunc is a function that returns a cancellableTimer for a duration value.
type timerFunc func(time.Duration) cancellableTimer

// retry retries to execute a provided function until it isn't successful
// or the maximum number of attempts is hit. It waits the specified interval
// between the latest iteration and the next retry.
// Interval is used as the base to compute an exponential backoff,
// if the computed interval overtakes the max interval then max will be used.
func retry(s sleeper, attempts uint, interval, max time.Duration, do func() error) (err error) {
func retry(ctx context.Context, makeTimer timerFunc, attempts uint, interval, max time.Duration, do func() error) (err error) {
baseInterval := math.Abs(interval.Truncate(time.Second).Seconds())
r := rand.New(rand.NewSource(time.Now().UnixNano())) //nolint:gosec

Expand All @@ -237,7 +247,14 @@ func retry(s sleeper, attempts uint, interval, max time.Duration, do func() erro
if wait > max {
wait = max
}
s.Sleep(wait)

t := makeTimer(wait)
select {
case <-t.C():
case <-ctx.Done():
t.Stop()
return ctx.Err()
}
}
err = do()
if err == nil {
Expand Down
48 changes: 37 additions & 11 deletions cloudapi/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,30 @@ func TestMSGLog(t *testing.T) {
}
}

// fakeTimer is a timer for testing that satisfies the cancellableTimer interface.
type fakeTimer struct {
c chan time.Time
}

// newFakeTimer creates a new fakeTimer that fires immediately.
func newFakeTimer() *fakeTimer {
c := make(chan time.Time, 1)
c <- time.Now()
return &fakeTimer{
c: c,
}
}

// C returns the timer channel.
func (t *fakeTimer) C() <-chan time.Time {
return t.c
}

// Stop does nothing.
func (t *fakeTimer) Stop() bool {
return false
}

func TestRetry(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -149,14 +173,15 @@ func TestRetry(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var sleepRequests []time.Duration
// sleepCollector tracks the request duration value for sleep requests.
sleepCollector := sleeperFunc(func(d time.Duration) {
sleepRequests = append(sleepRequests, d)
})
var startTimerRequests []time.Duration
// startTimerCollector tracks the request duration value for sleep requests.
startTimerCollector := func(d time.Duration) cancellableTimer {
startTimerRequests = append(startTimerRequests, d)
return newFakeTimer()
}

var iterations int
err := retry(sleepCollector, 5, 5*time.Second, 2*time.Minute, func() error {
err := retry(context.Background(), startTimerCollector, 5, 5*time.Second, 2*time.Minute, func() error {
iterations++
if iterations < tt.attempts {
return fmt.Errorf("unexpected error")
Expand All @@ -165,22 +190,23 @@ func TestRetry(t *testing.T) {
})
require.NoError(t, err)
require.Equal(t, tt.attempts, iterations)
require.Equal(t, len(tt.expWaits), len(sleepRequests))
require.Equal(t, len(tt.expWaits), len(startTimerRequests))

// the added random milliseconds makes difficult to know the exact value
// so it asserts that expwait <= actual <= expwait + 1s
for i, expwait := range tt.expWaits {
assert.GreaterOrEqual(t, sleepRequests[i], expwait)
assert.LessOrEqual(t, sleepRequests[i], expwait+(1*time.Second))
assert.GreaterOrEqual(t, startTimerRequests[i], expwait)
assert.LessOrEqual(t, startTimerRequests[i], expwait+(1*time.Second))
}
})
}
})
t.Run("Fail", func(t *testing.T) {
t.Parallel()

mock := sleeperFunc(func(time.Duration) { /* noop - nowait */ })
err := retry(mock, 5, 5*time.Second, 30*time.Second, func() error {
err := retry(context.Background(), func(_ time.Duration) cancellableTimer {
return newFakeTimer()
}, 5, 5*time.Second, 30*time.Second, func() error {
return fmt.Errorf("unexpected error")
})

Expand Down
38 changes: 22 additions & 16 deletions cmd/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,10 @@ func (c *cmdCloud) run(cmd *cobra.Command, args []string) error {
)

progressCtx, progressCancel := context.WithCancel(globalCtx)
progressBarWG := &sync.WaitGroup{}
defer progressCancel()
var progressBarWG sync.WaitGroup
progressBarWG.Add(1)
defer progressBarWG.Wait()
defer progressCancel()
go func() {
showProgress(progressCtx, c.gs, []*pb.ProgressBar{progressBar}, logger)
progressBarWG.Done()
Expand Down Expand Up @@ -272,7 +272,6 @@ func (c *cmdCloud) run(cmd *cobra.Command, args []string) error {
}),
)

ticker := time.NewTicker(time.Millisecond * 2000)
if c.showCloudLogs {
go func() {
logger.Debug("Connecting to cloud logs server...")
Expand All @@ -282,21 +281,28 @@ func (c *cmdCloud) run(cmd *cobra.Command, args []string) error {
}()
}

for range ticker.C {
newTestProgress, progressErr := client.GetTestProgress(refID)
if progressErr != nil {
logger.WithError(progressErr).Error("Test progress error")
continue
}
ticker := time.NewTicker(2 * time.Second)
loop:
for {
select {
case <-ticker.C:
newTestProgress, progressErr := client.GetTestProgress(refID)
if progressErr != nil {
logger.WithError(progressErr).Error("Test progress error")
continue
}

testProgressLock.Lock()
testProgress = newTestProgress
testProgressLock.Unlock()
testProgressLock.Lock()
testProgress = newTestProgress
testProgressLock.Unlock()

if (newTestProgress.RunStatus > cloudapi.RunStatusRunning) ||
(c.exitOnRunning && newTestProgress.RunStatus == cloudapi.RunStatusRunning) {
globalCancel()
break
if (newTestProgress.RunStatus > cloudapi.RunStatusRunning) ||
(c.exitOnRunning && newTestProgress.RunStatus == cloudapi.RunStatusRunning) {
globalCancel()
break loop
}
case <-progressCtx.Done():
break loop
}
}

Expand Down
4 changes: 3 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,11 @@ func (c *rootCommand) stopLoggers() {
close(done)
}()
close(c.stopLoggersCh)
timer := time.NewTimer(waitLoggerCloseTimeout)
defer timer.Stop()
select {
case <-done:
case <-time.After(waitLoggerCloseTimeout):
case <-timer.C:
c.globalState.FallbackLogger.Errorf("The logger didn't stop in %s", waitLoggerCloseTimeout)
}
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,10 @@ func showProgress(ctx context.Context, gs *state.GlobalState, pbs []*pb.Progress
}

ticker := time.NewTicker(updateFreq)
ctxDone := ctx.Done()
defer ticker.Stop()
for {
select {
case <-ctxDone:
case <-ctx.Done():
renderProgressBars(false)
gs.OutMutex.Lock()
printProgressBars()
Expand Down
10 changes: 6 additions & 4 deletions execution/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,6 @@ func (e *Scheduler) initVUsConcurrently(
func (e *Scheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- metrics.SampleContainer) func() {
e.state.Test.Logger.Debug("Starting emission of VUs and VUsMax metrics...")
tags := e.state.Test.RunTags
wg := &sync.WaitGroup{}
wg.Add(1)

emitMetrics := func() {
t := time.Now()
Expand Down Expand Up @@ -225,8 +223,10 @@ func (e *Scheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- metrics.Sam
metrics.PushIfNotDone(ctx, out, samples)
}

ticker := time.NewTicker(1 * time.Second)
var wg sync.WaitGroup
wg.Add(1)
go func() {
ticker := time.NewTicker(1 * time.Second)
defer func() {
ticker.Stop()
e.state.Test.Logger.Debug("Metrics emission of VUs and VUsMax metrics stopped")
Expand Down Expand Up @@ -349,11 +349,13 @@ func (e *Scheduler) runExecutor(
)

executorLogger.Debugf("Waiting for executor start time...")
timer := time.NewTimer(executorStartTime)
defer timer.Stop()
select {
case <-runCtx.Done():
runResults <- nil // no error since executor hasn't started yet
return
case <-time.After(executorStartTime):
case <-timer.C:
// continue
}
}
Expand Down
6 changes: 4 additions & 2 deletions js/modules/k6/ws/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,10 @@ func (s *Socket) SetTimeout(fn goja.Callable, timeoutMs float64) error {
return fmt.Errorf("setTimeout requires a >0 timeout parameter, received %.2f", timeoutMs)
}
go func() {
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-time.After(d):
case <-timer.C:
select {
case s.scheduled <- fn:
case <-s.done:
Expand Down Expand Up @@ -475,7 +477,7 @@ func (s *Socket) SetInterval(fn goja.Callable, intervalMs float64) error {
return nil
}

// Close closes the webscocket. If providede the first argument will be used as the code for the close message.
// Close closes the webscocket. If provided, the first argument will be used as the code for the close message.
func (s *Socket) Close(args ...goja.Value) {
code := websocket.CloseGoingAway
if len(args) > 0 {
Expand Down
4 changes: 3 additions & 1 deletion js/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,8 +814,10 @@ func (u *ActiveVU) RunOnce() error {
if isFullIteration && u.Runner.Bundle.Options.MinIterationDuration.Valid {
durationDiff := u.Runner.Bundle.Options.MinIterationDuration.TimeDuration() - totalTime
if durationDiff > 0 {
timer := time.NewTimer(durationDiff)
defer timer.Stop()
select {
case <-time.After(durationDiff):
case <-timer.C:
case <-u.RunContext.Done():
}
}
Expand Down
4 changes: 3 additions & 1 deletion lib/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,14 +470,16 @@ func (es *ExecutionState) ResumeNotify() <-chan struct{} {
// configured maxVUs number is greater than the configured starting VUs.
func (es *ExecutionState) GetPlannedVU(logger *logrus.Entry, modifyActiveVUCount bool) (InitializedVU, error) {
for i := 1; i <= MaxRetriesGetPlannedVU; i++ {
timer := time.NewTimer(MaxTimeToWaitForPlannedVU)
select {
case vu := <-es.vus:
if modifyActiveVUCount {
es.ModCurrentlyActiveVUsCount(+1)
}
// TODO: set environment and exec
timer.Stop()
return vu, nil
case <-time.After(MaxTimeToWaitForPlannedVU):
case <-timer.C:
logger.Warnf("Could not get a VU from the buffer for %s", time.Duration(i)*MaxTimeToWaitForPlannedVU)
}
}
Expand Down
7 changes: 4 additions & 3 deletions lib/executor/constant_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,7 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- metrics
}

start, offsets, _ := car.et.GetStripedOffsets()
timer := time.NewTimer(time.Hour * 24)
// here the we need the not scaled one
// Here we need the not scaled one.
notScaledTickerPeriod := getTickerPeriod(
big.NewRat(
car.config.Rate.Int64,
Expand All @@ -326,7 +325,8 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- metrics
metricTags := car.getMetricTags(nil)
for li, gi := 0, start; ; li, gi = li+1, gi+offsets[li%len(offsets)] {
t := notScaledTickerPeriod*time.Duration(gi) - time.Since(startTime)
timer.Reset(t)

timer := time.NewTimer(t)
select {
case <-timer.C:
if vusPool.TryRunIteration() {
Expand Down Expand Up @@ -362,6 +362,7 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- metrics
}

case <-regDurationCtx.Done():
timer.Stop()
return nil
}
}
Expand Down
4 changes: 2 additions & 2 deletions lib/executor/ramping_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,6 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- metrics
}

regDurationDone := regDurationCtx.Done()
timer := time.NewTimer(time.Hour)
start := time.Now()
ch := make(chan time.Duration, 10) // buffer 10 iteration times ahead
var prevTime time.Duration
Expand All @@ -453,10 +452,11 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- metrics
prevTime = nextTime
b := time.Until(start.Add(nextTime))
if b > 0 { // TODO: have a minimal ?
timer.Reset(b)
timer := time.NewTimer(b)
select {
case <-timer.C:
case <-regDurationDone:
timer.Stop()
return nil
}
}
Expand Down
Loading