From 44e7d091ea600e48ed47341be6161c8212fb75eb Mon Sep 17 00:00:00 2001 From: Ridwan Sharif Date: Fri, 17 Nov 2023 20:02:44 +0000 Subject: [PATCH 1/4] scrape: add option to Manager to skip initial discovery reload This config option is quite useful on serverless environments where we are sensitive to the start up latencies of the scraper. The serverless instance might only last for a few seconds and may not be able to afford the minimum 5s reload interval before the scrape pools are created. Signed-off-by: Ridwan Sharif --- scrape/manager.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/scrape/manager.go b/scrape/manager.go index c2b434308e..86874bbd82 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -137,6 +137,12 @@ type Options struct { EnableProtobufNegotiation bool // Option to increase the interval used by scrape manager to throttle target groups updates. DiscoveryReloadInterval model.Duration + // Option to enable discovering targets immediately on start up as opposed + // to waiting for the interval defined in DiscoveryReloadInterval before + // initializing the scrape pools. Disabled by default. Useful for serverless + // flavors of OpenTelemetry contrib's prometheusreceiver where we're + // sensitive to start up delays. + DiscoveryReloadOnStartup bool // Optional HTTP client options to use when scraping. HTTPClientOptions []config_util.HTTPClientOption @@ -198,8 +204,17 @@ func (m *Manager) reloader() { reloadIntervalDuration = model.Duration(5 * time.Second) } - ticker := time.NewTicker(time.Duration(reloadIntervalDuration)) + // Skip the initial reload interval wait for the first reload. + if m.opts.DiscoveryReloadOnStartup { + select { + case <-m.triggerReload: + m.reload() + case <-m.graceShut: + return + } + } + ticker := time.NewTicker(time.Duration(reloadIntervalDuration)) defer ticker.Stop() for { From e206020a3ea6951dfe4f3fd97a4d118b66028328 Mon Sep 17 00:00:00 2001 From: Ridwan Sharif Date: Sat, 18 Nov 2023 01:27:28 +0000 Subject: [PATCH 2/4] discovery: add optional skip ability for serverless environments This change adds an option that will allow users to skip the initial wait before sending target sets to the scrape manager. This is particularly useful in environments where the startup latency is required to be low just as in serverless deployments. Signed-off-by: Ridwan Sharif --- discovery/manager.go | 42 +++++++++++++++++++++++++++++++++++++++ discovery/manager_test.go | 25 +++++++++++++++++++++++ scrape/manager_test.go | 38 +++++++++++++++++++++++++++++++++++ 3 files changed, 105 insertions(+) diff --git a/discovery/manager.go b/discovery/manager.go index 8b304a0faf..6d10717712 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -134,6 +134,16 @@ func Name(n string) func(*Manager) { } } +// SkipInitialWait sets the name of the manager. This is used in serverless flavours of OTel's prometheusreceiver +// which is sensitive to startup latencies. +func SkipInitialWait() func(*Manager) { + return func(m *Manager) { + m.mtx.Lock() + defer m.mtx.Unlock() + m.skipStartupWait = true + } +} + // HTTPClientOptions sets the list of HTTP client options to expose to // Discoverers. It is up to Discoverers to choose to use the options provided. func HTTPClientOptions(opts ...config.HTTPClientOption) func(*Manager) { @@ -165,6 +175,11 @@ type Manager struct { // should only be modified in unit tests. updatert time.Duration + // skipStartupWait allows the discovery manager to skip the initial wait before sending updates + // to the channel. This is used in serverless flavours of OTel's prometheusreceiver + // which is sensitive to startup latencies. + skipStartupWait bool + // The triggerSend channel signals to the Manager that new updates have been received from providers. triggerSend chan struct{} @@ -344,6 +359,33 @@ func (m *Manager) sender() { ticker := time.NewTicker(m.updatert) defer ticker.Stop() + // Send the targets downstream as soon as you see them if skipStartupWait is + // set. If discovery receiver's channel is too busy, fall back to the + // regular loop. + if m.skipStartupWait { + select { + case <-m.triggerSend: + sentUpdates.WithLabelValues(m.name).Inc() + select { + case m.syncCh <- m.allGroups(): + case <-ticker.C: + delayedUpdates.WithLabelValues(m.name).Inc() + level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle") + select { + case m.triggerSend <- struct{}{}: + default: + } + case <-m.ctx.Done(): + return + } + case <-m.ctx.Done(): + return + } + + // We restart the ticker to ensure that no two updates are less than updatert apart. + ticker.Reset(m.updatert) + } + for { select { case <-m.ctx.Done(): diff --git a/discovery/manager_test.go b/discovery/manager_test.go index 5371608112..3c843a9cee 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -775,6 +775,31 @@ func pk(provider, setName string, n int) poolKey { } } +func TestTargetSetTargetGroupsPresentOnStartup(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + discoveryManager := NewManager(ctx, log.NewNopLogger(), SkipInitialWait()) + + // Set the updatert to a super long time so we can verify that the skip worked correctly. + discoveryManager.updatert = 100 * time.Hour + go discoveryManager.Run() + + c := map[string]Configs{ + "prometheus": { + staticConfig("foo:9090"), + }, + } + discoveryManager.ApplyConfig(c) + + syncedTargets := <-discoveryManager.SyncCh() + require.Equal(t, 1, len(syncedTargets)) + verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(syncedTargets["prometheus"])) + p := pk("static", "prometheus", 0) + verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(discoveryManager.targets)) +} + func TestTargetSetTargetGroupsPresentOnConfigReload(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/scrape/manager_test.go b/scrape/manager_test.go index e49d5b4833..46c127f337 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -601,6 +601,44 @@ func TestManagerTargetsUpdates(t *testing.T) { } } +func TestManagerSkipInitialWait(t *testing.T) { + opts := Options{DiscoveryReloadOnStartup: true} + m := NewManager(&opts, nil, nil) + + ts := make(chan map[string][]*targetgroup.Group, 1) + go m.Run(ts) + defer m.Stop() + + tgSent := make(map[string][]*targetgroup.Group) + tgSent["test"] = []*targetgroup.Group{ + { + Source: "test_source", + }, + } + + select { + case ts <- tgSent: + case <-time.After(10 * time.Millisecond): + t.Error("Scrape manager's channel remained blocked after the set threshold.") + } + + // Give some time for the reloader to have picked this up. + time.Sleep(2 * time.Second) + + m.mtxScrape.Lock() + tsetActual := m.targetSets + m.mtxScrape.Unlock() + + // Make sure all updates have been received. + require.Equal(t, tgSent, tsetActual) + + select { + case <-m.triggerReload: + t.Error("Reload should've already happened") + default: + } +} + func TestSetOffsetSeed(t *testing.T) { getConfig := func(prometheus string) *config.Config { cfgText := ` From 4cddaf0517799da1f93694bff239629b04363338 Mon Sep 17 00:00:00 2001 From: Ridwan Sharif Date: Tue, 21 Nov 2023 20:34:19 +0000 Subject: [PATCH 3/4] scrape: use locks when shutting down the loops --- scrape/manager.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scrape/manager.go b/scrape/manager.go index 86874bbd82..9b22ed039c 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -304,6 +304,7 @@ func (m *Manager) StopAfterScrapeAttempt(minScrapeTime time.Time) { var wg sync.WaitGroup for _, p := range m.scrapePools { + p.mtx.Lock() for _, l := range p.loops { l := l wg.Add(1) @@ -312,6 +313,7 @@ func (m *Manager) StopAfterScrapeAttempt(minScrapeTime time.Time) { wg.Done() }() } + p.mtx.Unlock() } wg.Wait() From dbc5b25bfbd86ebd1b60257ce52ea032e2c409c0 Mon Sep 17 00:00:00 2001 From: bwplotka Date: Wed, 22 Nov 2023 12:42:42 +0000 Subject: [PATCH 4/4] Updated discovery and test. * Simplified discovery, I think we can always skip wait and make it more robust. * Adjusted TestManagerStopAfterScrapeAttempt to simulate bit more how external users might use it. * Removed one test, we test it TestManagerStopAfterScrapeAttempt already * Cleaned up comments. * Fixed lock bug that was here already. More details on locking: Checked -race on TestManagerStopAfterScrapeAttempt and indeed without your locks. It produces RACE e.g. 0c00011c868 by goroutine 47: github.com/prometheus/prometheus/scrape.(*Manager).StopAfterScrapeAttempt() /Users/bwplotka/Repos/prometheus/scrape/manager.go:311 +0x238 github.com/prometheus/prometheus/scrape.TestManagerStopAfterScrapeAttempt.func8() /Users/bwplotka/Repos/prometheus/scrape/manager_test.go:781 +0x38 github.com/prometheus/prometheus/scrape.TestManagerStopAfterScrapeAttempt.func10() /Users/bwplotka/Repos/prometheus/scrape/manager_test.go:840 +0xa08 testing.tRunner() /Users/bwplotka/.gvm/gos/go1.20.3/src/testing/testing.go:1576 +0x188 testing.(*T).Run.func1() /Users/bwplotka/.gvm/gos/go1.20.3/src/testing/testing.go:1629 +0x40 Previous write at 0x00c00011c868 by goroutine 53: github.com/prometheus/prometheus/scrape.(*scrapePool).sync() /Users/bwplotka/Repos/prometheus/scrape/scrape.go:585 +0x834 github.com/prometheus/prometheus/scrape.(*scrapePool).Sync() /Users/bwplotka/Repos/prometheus/scrape/scrape.go:522 +0x4ac github.com/prometheus/prometheus/scrape.(*Manager).reload.func1() /Users/bwplotka/Repos/prometheus/scrape/manager.go:255 +0x48 github.com/prometheus/prometheus/scrape.(*Manager).reload.func2() /Users/bwplotka/Repos/prometheus/scrape/manager.go:257 +0x6c Goroutine 47 (running) created at: testing.(*T).Run() /Users/bwplotka/.gvm/gos/go1.20.3/src/testing/testing.go:1629 +0x5e4 github.com/prometheus/prometheus/scrape.TestManagerStopAfterScrapeAttempt() /Users/bwplotka/Repos/prometheus/scrape/manager_test.go:792 +0x130 testing.tRunner() /Users/bwplotka/.gvm/gos/go1.20.3/src/testing/testing.go:1576 +0x188 testing.(*T).Run.func1() /Users/bwplotka/.gvm/gos/go1.20.3/src/testing/testing.go:1629 +0x40 Goroutine 53 (finished) created at: github.com/prometheus/prometheus/scrape.(*Manager).reload() /Users/bwplotka/Repos/prometheus/scrape/manager.go:254 +0x774 github.com/prometheus/prometheus/scrape.(*Manager).reloader() /Users/bwplotka/Repos/prometheus/scrape/manager.go:210 +0x128 github.com/prometheus/prometheus/scrape.(*Manager).Run.func1() /Users/bwplotka/Repos/prometheus/scrape/manager.go:183 +0x34 ================== I would argue it's not possible as everything is guarded by mtxScrape. // reload -> ApplyConfig mtxScrape // stop -> Stop/StopAfterScrapeAttempt/ApplyConfig mtxScrape // sync -> Sync -> manager reload mtxScrape Indeed--there was a bug in previous code, those locks are now not needed. Signed-off-by: bwplotka --- discovery/manager.go | 78 +++++++++++------------------------ discovery/manager_test.go | 4 +- scrape/manager.go | 28 ++++++------- scrape/manager_test.go | 87 +++++++++++---------------------------- scrape/scrape.go | 1 + 5 files changed, 66 insertions(+), 132 deletions(-) diff --git a/discovery/manager.go b/discovery/manager.go index 6d10717712..dd70202684 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -134,16 +134,6 @@ func Name(n string) func(*Manager) { } } -// SkipInitialWait sets the name of the manager. This is used in serverless flavours of OTel's prometheusreceiver -// which is sensitive to startup latencies. -func SkipInitialWait() func(*Manager) { - return func(m *Manager) { - m.mtx.Lock() - defer m.mtx.Unlock() - m.skipStartupWait = true - } -} - // HTTPClientOptions sets the list of HTTP client options to expose to // Discoverers. It is up to Discoverers to choose to use the options provided. func HTTPClientOptions(opts ...config.HTTPClientOption) func(*Manager) { @@ -175,11 +165,6 @@ type Manager struct { // should only be modified in unit tests. updatert time.Duration - // skipStartupWait allows the discovery manager to skip the initial wait before sending updates - // to the channel. This is used in serverless flavours of OTel's prometheusreceiver - // which is sensitive to startup latencies. - skipStartupWait bool - // The triggerSend channel signals to the Manager that new updates have been received from providers. triggerSend chan struct{} @@ -356,56 +341,43 @@ func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targ } func (m *Manager) sender() { - ticker := time.NewTicker(m.updatert) - defer ticker.Stop() - - // Send the targets downstream as soon as you see them if skipStartupWait is - // set. If discovery receiver's channel is too busy, fall back to the - // regular loop. - if m.skipStartupWait { + updateWhenTriggered := func() { select { + case <-m.ctx.Done(): + return case <-m.triggerSend: - sentUpdates.WithLabelValues(m.name).Inc() - select { - case m.syncCh <- m.allGroups(): - case <-ticker.C: - delayedUpdates.WithLabelValues(m.name).Inc() - level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle") + for { + sentUpdates.WithLabelValues(m.name).Inc() select { - case m.triggerSend <- struct{}{}: - default: + case <-m.ctx.Done(): + return + case <-m.triggerSend: + // We waited for someone to receive, but we got new update in the meantime, so retry until success. + delayedUpdates.WithLabelValues(m.name).Inc() + level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full, will retry the next cycle") + break + // Attempt to send the update. + case m.syncCh <- m.allGroups(): + return } - case <-m.ctx.Done(): - return } - case <-m.ctx.Done(): - return } - - // We restart the ticker to ensure that no two updates are less than updatert apart. - ticker.Reset(m.updatert) } + // Update as soon as we get trigger. + updateWhenTriggered() + + // Rate-limit further triggers. + // Some discoverers send updates too often. + ticker := time.NewTicker(m.updatert) + defer ticker.Stop() for { select { case <-m.ctx.Done(): return - case <-ticker.C: // Some discoverers send updates too often, so we throttle these with the ticker. - select { - case <-m.triggerSend: - sentUpdates.WithLabelValues(m.name).Inc() - select { - case m.syncCh <- m.allGroups(): - default: - delayedUpdates.WithLabelValues(m.name).Inc() - level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle") - select { - case m.triggerSend <- struct{}{}: - default: - } - } - default: - } + case <-ticker.C: + updateWhenTriggered() + ticker.Reset(m.updatert) } } } diff --git a/discovery/manager_test.go b/discovery/manager_test.go index 3c843a9cee..bf14bf566a 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -778,9 +778,9 @@ func pk(provider, setName string, n int) poolKey { func TestTargetSetTargetGroupsPresentOnStartup(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discoveryManager := NewManager(ctx, log.NewNopLogger(), SkipInitialWait()) + discoveryManager := NewManager(ctx, log.NewNopLogger()) - // Set the updatert to a super long time so we can verify that the skip worked correctly. + // Set the updatert to a super long time, so we can verify that the skip worked correctly. discoveryManager.updatert = 100 * time.Hour go discoveryManager.Run() diff --git a/scrape/manager.go b/scrape/manager.go index 9b22ed039c..aedf9bdaf7 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -138,21 +138,20 @@ type Options struct { // Option to increase the interval used by scrape manager to throttle target groups updates. DiscoveryReloadInterval model.Duration // Option to enable discovering targets immediately on start up as opposed - // to waiting for the interval defined in DiscoveryReloadInterval before - // initializing the scrape pools. Disabled by default. Useful for serverless - // flavors of OpenTelemetry contrib's prometheusreceiver where we're - // sensitive to start up delays. + // to waiting for the interval defined in DiscoveryReloadInterval. Disabled by default. + // + // Useful for serverless flavors of OpenTelemetry contrib's prometheusreceiver. DiscoveryReloadOnStartup bool // Optional HTTP client options to use when scraping. HTTPClientOptions []config_util.HTTPClientOption // InitialScrapeOffset controls how long after startup we should scrape all - // targets. By default, all targets have an offset so we spread the + // targets. By default, all targets have an offset, so we spread the // scraping load evenly within the Prometheus server. Configuring this will - // make it so all targets have the same configured offset, which may be - // undesirable as load is no longer evenly spread. This is useful however - // in serverless deployments where we're sensitive to the intitial offsets + // cause all targets to have the same static offset. This may be undesirable + // as load is no longer evenly spread, but is useful in serverless, sidecar + // based deployments where we're sensitive to the initial offsets // and would like them to be small and configurable. // // NOTE(bwplotka): This option is experimental and not used by Prometheus. @@ -258,8 +257,9 @@ func (m *Manager) reload() { }(m.scrapePools[setName], groups) } - m.mtxScrape.Unlock() wg.Wait() + // Only unlock after all syncs finished. + m.mtxScrape.Unlock() } // setOffsetSeed calculates a global offsetSeed per server relying on extra label set. @@ -287,9 +287,9 @@ func (m *Manager) Stop() { close(m.graceShut) } -// StopAfterScrapeAttempt stops manager after ensuring all targets' last scrape -// attempt happened after minScrapeTime. It cancels all running scrape pools and -// blocks until all have exited. +// StopAfterScrapeAttempt is like Stop, but it stops manager only after ensuring +// all targets' last scrape attempt happened after minScrapeTime. It cancels +// all running scrape pools and blocks until all have exited. Not a cancellable operation. // // It is likely that such shutdown scrape will cause irregular scrape interval and // sudden efficiency (memory, CPU) spikes. However, it can be a fair tradeoff for @@ -304,7 +304,8 @@ func (m *Manager) StopAfterScrapeAttempt(minScrapeTime time.Time) { var wg sync.WaitGroup for _, p := range m.scrapePools { - p.mtx.Lock() + // NOTE(bwplotka): Locking here is not needed, because mtxScrape.Lock() + // ensures that nothing changing scrape pools *should* get through. for _, l := range p.loops { l := l wg.Add(1) @@ -313,7 +314,6 @@ func (m *Manager) StopAfterScrapeAttempt(minScrapeTime time.Time) { wg.Done() }() } - p.mtx.Unlock() } wg.Wait() diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 46c127f337..c853768b91 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -601,44 +601,6 @@ func TestManagerTargetsUpdates(t *testing.T) { } } -func TestManagerSkipInitialWait(t *testing.T) { - opts := Options{DiscoveryReloadOnStartup: true} - m := NewManager(&opts, nil, nil) - - ts := make(chan map[string][]*targetgroup.Group, 1) - go m.Run(ts) - defer m.Stop() - - tgSent := make(map[string][]*targetgroup.Group) - tgSent["test"] = []*targetgroup.Group{ - { - Source: "test_source", - }, - } - - select { - case ts <- tgSent: - case <-time.After(10 * time.Millisecond): - t.Error("Scrape manager's channel remained blocked after the set threshold.") - } - - // Give some time for the reloader to have picked this up. - time.Sleep(2 * time.Second) - - m.mtxScrape.Lock() - tsetActual := m.targetSets - m.mtxScrape.Unlock() - - // Make sure all updates have been received. - require.Equal(t, tgSent, tsetActual) - - select { - case <-m.triggerReload: - t.Error("Reload should've already happened") - default: - } -} - func TestSetOffsetSeed(t *testing.T) { getConfig := func(prometheus string) *config.Config { cfgText := ` @@ -746,11 +708,14 @@ scrape_configs: require.ElementsMatch(t, []string{"job1", "job3"}, scrapeManager.ScrapePools()) } +var ( + noOffset = 0 * time.Nanosecond + largeOffset = 99 * time.Hour + oneSecondOffset = 1 * time.Second + tenSecondOffset = 10 * time.Second +) + func TestManagerStopAfterScrapeAttempt(t *testing.T) { - noOffset := 0 * time.Nanosecond - largeOffset := 99 * time.Hour - oneSecondOffset := 1 * time.Second - tenSecondOffset := 10 * time.Second for _, tcase := range []struct { name string // initialScrapeOffset defines how long to wait before scraping all targets. @@ -816,7 +781,7 @@ func TestManagerStopAfterScrapeAttempt(t *testing.T) { expectedSamples: 1, }, { - name: "no scrape on stop, with offset of 10s", + name: "no scrape on stop after 5s, with offset of 10s", initialScrapeOffset: &tenSecondOffset, stopDelay: 5 * time.Second, stopFunc: func(m *Manager) { m.Stop() }, @@ -828,23 +793,12 @@ func TestManagerStopAfterScrapeAttempt(t *testing.T) { // Setup scrape manager. scrapeManager := NewManager(&Options{ - InitialScrapeOffset: tcase.initialScrapeOffset, - - // Extremely high value to turn it off. We don't want to wait minimum 5s, so - // we reload manually. - // TODO(bwplotka): Make scrape manager more testable. - DiscoveryReloadInterval: model.Duration(99 * time.Hour), + InitialScrapeOffset: tcase.initialScrapeOffset, + DiscoveryReloadOnStartup: true, }, log.NewLogfmtLogger(os.Stderr), &collectResultAppendable{app}) - require.NoError(t, scrapeManager.ApplyConfig(&config.Config{ - GlobalConfig: config.GlobalConfig{ - // Extremely high scrape interval, to ensure the only chance to see the - // sample is on start and stopAfterScrapeAttempt. - ScrapeInterval: model.Duration(99 * time.Hour), - ScrapeTimeout: model.Duration(10 * time.Second), - }, - ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, - })) + ch := make(chan map[string][]*targetgroup.Group, 1) + go scrapeManager.Run(ch) // Start fake HTTP target to scrape returning a single metric. server := httptest.NewServer( @@ -858,9 +812,17 @@ func TestManagerStopAfterScrapeAttempt(t *testing.T) { serverURL, err := url.Parse(server.URL) require.NoError(t, err) - // Add fake target directly into tsets + reload. Normally users would use - // Manager.Run and wait for minimum 5s refresh interval. - scrapeManager.updateTsets(map[string][]*targetgroup.Group{ + require.NoError(t, scrapeManager.ApplyConfig(&config.Config{ + GlobalConfig: config.GlobalConfig{ + // Extremely high scrape interval, to ensure the only chance to see the + // sample is on start and stopAfterScrapeAttempt. + ScrapeInterval: model.Duration(99 * time.Hour), + ScrapeTimeout: model.Duration(10 * time.Second), + }, + ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}}, + })) + + ch <- map[string][]*targetgroup.Group{ "test": { { Targets: []model.LabelSet{{ @@ -869,8 +831,7 @@ func TestManagerStopAfterScrapeAttempt(t *testing.T) { }}, }, }, - }) - scrapeManager.reload() + } // Wait for the defined stop delay, before stopping. time.Sleep(tcase.stopDelay) diff --git a/scrape/scrape.go b/scrape/scrape.go index e97bec4851..14984e645b 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -233,6 +233,7 @@ type scrapePool struct { cancel context.CancelFunc // mtx must not be taken after targetMtx. + // mtx guards updates to scrape pool (e.g. loops and activeTarget maps). mtx sync.Mutex config *config.ScrapeConfig client *http.Client