diff --git a/discovery/manager.go b/discovery/manager.go index 8b304a0faf..dd70202684 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -341,31 +341,45 @@ func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targ } func (m *Manager) sender() { - ticker := time.NewTicker(m.updatert) - defer ticker.Stop() - - for { + updateWhenTriggered := func() { select { case <-m.ctx.Done(): return - case <-ticker.C: // Some discoverers send updates too often, so we throttle these with the ticker. - select { - case <-m.triggerSend: + case <-m.triggerSend: + for { sentUpdates.WithLabelValues(m.name).Inc() select { - case m.syncCh <- m.allGroups(): - default: + case <-m.ctx.Done(): + return + case <-m.triggerSend: + // We waited for someone to receive, but we got new update in the meantime, so retry until success. delayedUpdates.WithLabelValues(m.name).Inc() - level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle") - select { - case m.triggerSend <- struct{}{}: - default: - } + level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full, will retry the next cycle") + break + // Attempt to send the update. + case m.syncCh <- m.allGroups(): + return } - default: } } } + + // Update as soon as we get trigger. + updateWhenTriggered() + + // Rate-limit further triggers. + // Some discoverers send updates too often. + ticker := time.NewTicker(m.updatert) + defer ticker.Stop() + for { + select { + case <-m.ctx.Done(): + return + case <-ticker.C: + updateWhenTriggered() + ticker.Reset(m.updatert) + } + } } func (m *Manager) cancelDiscoverers() { diff --git a/discovery/manager_test.go b/discovery/manager_test.go index 5371608112..bf14bf566a 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -775,6 +775,31 @@ func pk(provider, setName string, n int) poolKey { } } +func TestTargetSetTargetGroupsPresentOnStartup(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + discoveryManager := NewManager(ctx, log.NewNopLogger()) + + // Set the updatert to a super long time, so we can verify that the skip worked correctly. + discoveryManager.updatert = 100 * time.Hour + go discoveryManager.Run() + + c := map[string]Configs{ + "prometheus": { + staticConfig("foo:9090"), + }, + } + discoveryManager.ApplyConfig(c) + + syncedTargets := <-discoveryManager.SyncCh() + require.Equal(t, 1, len(syncedTargets)) + verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(syncedTargets["prometheus"])) + p := pk("static", "prometheus", 0) + verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(discoveryManager.targets)) +} + func TestTargetSetTargetGroupsPresentOnConfigReload(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/scrape/manager.go b/scrape/manager.go index c2b434308e..aedf9bdaf7 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -137,16 +137,21 @@ type Options struct { EnableProtobufNegotiation bool // Option to increase the interval used by scrape manager to throttle target groups updates. DiscoveryReloadInterval model.Duration + // Option to enable discovering targets immediately on start up as opposed + // to waiting for the interval defined in DiscoveryReloadInterval. Disabled by default. + // + // Useful for serverless flavors of OpenTelemetry contrib's prometheusreceiver. + DiscoveryReloadOnStartup bool // Optional HTTP client options to use when scraping. HTTPClientOptions []config_util.HTTPClientOption // InitialScrapeOffset controls how long after startup we should scrape all - // targets. By default, all targets have an offset so we spread the + // targets. By default, all targets have an offset, so we spread the // scraping load evenly within the Prometheus server. Configuring this will - // make it so all targets have the same configured offset, which may be - // undesirable as load is no longer evenly spread. This is useful however - // in serverless deployments where we're sensitive to the intitial offsets + // cause all targets to have the same static offset. This may be undesirable + // as load is no longer evenly spread, but is useful in serverless, sidecar + // based deployments where we're sensitive to the initial offsets // and would like them to be small and configurable. // // NOTE(bwplotka): This option is experimental and not used by Prometheus. @@ -198,8 +203,17 @@ func (m *Manager) reloader() { reloadIntervalDuration = model.Duration(5 * time.Second) } - ticker := time.NewTicker(time.Duration(reloadIntervalDuration)) + // Skip the initial reload interval wait for the first reload. + if m.opts.DiscoveryReloadOnStartup { + select { + case <-m.triggerReload: + m.reload() + case <-m.graceShut: + return + } + } + ticker := time.NewTicker(time.Duration(reloadIntervalDuration)) defer ticker.Stop() for { @@ -243,8 +257,9 @@ func (m *Manager) reload() { }(m.scrapePools[setName], groups) } - m.mtxScrape.Unlock() wg.Wait() + // Only unlock after all syncs finished. + m.mtxScrape.Unlock() } // setOffsetSeed calculates a global offsetSeed per server relying on extra label set. @@ -272,9 +287,9 @@ func (m *Manager) Stop() { close(m.graceShut) } -// StopAfterScrapeAttempt stops manager after ensuring all targets' last scrape -// attempt happened after minScrapeTime. It cancels all running scrape pools and -// blocks until all have exited. +// StopAfterScrapeAttempt is like Stop, but it stops manager only after ensuring +// all targets' last scrape attempt happened after minScrapeTime. It cancels +// all running scrape pools and blocks until all have exited. Not a cancellable operation. // // It is likely that such shutdown scrape will cause irregular scrape interval and // sudden efficiency (memory, CPU) spikes. However, it can be a fair tradeoff for @@ -289,6 +304,8 @@ func (m *Manager) StopAfterScrapeAttempt(minScrapeTime time.Time) { var wg sync.WaitGroup for _, p := range m.scrapePools { + // NOTE(bwplotka): Locking here is not needed, because mtxScrape.Lock() + // ensures that nothing changing scrape pools *should* get through. for _, l := range p.loops { l := l wg.Add(1) diff --git a/scrape/manager_test.go b/scrape/manager_test.go index e49d5b4833..c853768b91 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -708,11 +708,14 @@ scrape_configs: require.ElementsMatch(t, []string{"job1", "job3"}, scrapeManager.ScrapePools()) } +var ( + noOffset = 0 * time.Nanosecond + largeOffset = 99 * time.Hour + oneSecondOffset = 1 * time.Second + tenSecondOffset = 10 * time.Second +) + func TestManagerStopAfterScrapeAttempt(t *testing.T) { - noOffset := 0 * time.Nanosecond - largeOffset := 99 * time.Hour - oneSecondOffset := 1 * time.Second - tenSecondOffset := 10 * time.Second for _, tcase := range []struct { name string // initialScrapeOffset defines how long to wait before scraping all targets. @@ -778,7 +781,7 @@ func TestManagerStopAfterScrapeAttempt(t *testing.T) { expectedSamples: 1, }, { - name: "no scrape on stop, with offset of 10s", + name: "no scrape on stop after 5s, with offset of 10s", initialScrapeOffset: &tenSecondOffset, stopDelay: 5 * time.Second, stopFunc: func(m *Manager) { m.Stop() }, @@ -790,23 +793,12 @@ func TestManagerStopAfterScrapeAttempt(t *testing.T) { // Setup scrape manager. scrapeManager := NewManager(&Options{ - InitialScrapeOffset: tcase.initialScrapeOffset, - - // Extremely high value to turn it off. We don't want to wait minimum 5s, so - // we reload manually. - // TODO(bwplotka): Make scrape manager more testable. - DiscoveryReloadInterval: model.Duration(99 * time.Hour), + InitialScrapeOffset: tcase.initialScrapeOffset, + DiscoveryReloadOnStartup: true, }, log.NewLogfmtLogger(os.Stderr), &collectResultAppendable{app}) - require.NoError(t, scrapeManager.ApplyConfig(&config.Config{ - GlobalConfig: config.GlobalConfig{ - // Extremely high scrape interval, to ensure the only chance to see the - // sample is on start and stopAfterScrapeAttempt. - ScrapeInterval: model.Duration(99 * time.Hour), - ScrapeTimeout: model.Duration(10 * time.Second), - }, - ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, - })) + ch := make(chan map[string][]*targetgroup.Group, 1) + go scrapeManager.Run(ch) // Start fake HTTP target to scrape returning a single metric. server := httptest.NewServer( @@ -820,9 +812,17 @@ func TestManagerStopAfterScrapeAttempt(t *testing.T) { serverURL, err := url.Parse(server.URL) require.NoError(t, err) - // Add fake target directly into tsets + reload. Normally users would use - // Manager.Run and wait for minimum 5s refresh interval. - scrapeManager.updateTsets(map[string][]*targetgroup.Group{ + require.NoError(t, scrapeManager.ApplyConfig(&config.Config{ + GlobalConfig: config.GlobalConfig{ + // Extremely high scrape interval, to ensure the only chance to see the + // sample is on start and stopAfterScrapeAttempt. + ScrapeInterval: model.Duration(99 * time.Hour), + ScrapeTimeout: model.Duration(10 * time.Second), + }, + ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, + })) + + ch <- map[string][]*targetgroup.Group{ "test": { { Targets: []model.LabelSet{{ @@ -831,8 +831,7 @@ func TestManagerStopAfterScrapeAttempt(t *testing.T) { }}, }, }, - }) - scrapeManager.reload() + } // Wait for the defined stop delay, before stopping. time.Sleep(tcase.stopDelay) diff --git a/scrape/scrape.go b/scrape/scrape.go index e97bec4851..14984e645b 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -233,6 +233,7 @@ type scrapePool struct { cancel context.CancelFunc // mtx must not be taken after targetMtx. + // mtx guards updates to scrape pool (e.g. loops and activeTarget maps). mtx sync.Mutex config *config.ScrapeConfig client *http.Client