Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 29 additions & 15 deletions discovery/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,31 +341,45 @@ func (m *Manager) updater(ctx context.Context, p *Provider, updates chan []*targ
}

func (m *Manager) sender() {
ticker := time.NewTicker(m.updatert)
defer ticker.Stop()

for {
updateWhenTriggered := func() {
select {
case <-m.ctx.Done():
return
case <-ticker.C: // Some discoverers send updates too often, so we throttle these with the ticker.
select {
case <-m.triggerSend:
case <-m.triggerSend:
for {
sentUpdates.WithLabelValues(m.name).Inc()
select {
case m.syncCh <- m.allGroups():
default:
case <-m.ctx.Done():
return
case <-m.triggerSend:
// We waited for someone to receive, but we got new update in the meantime, so retry until success.
delayedUpdates.WithLabelValues(m.name).Inc()
level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle")
select {
case m.triggerSend <- struct{}{}:
default:
}
level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full, will retry the next cycle")
break
// Attempt to send the update.
case m.syncCh <- m.allGroups():
return
}
default:
}
}
}

// Update as soon as we get trigger.
updateWhenTriggered()

// Rate-limit further triggers.
// Some discoverers send updates too often.
ticker := time.NewTicker(m.updatert)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C:
updateWhenTriggered()
ticker.Reset(m.updatert)
}
}
}

func (m *Manager) cancelDiscoverers() {
Expand Down
25 changes: 25 additions & 0 deletions discovery/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,31 @@ func pk(provider, setName string, n int) poolKey {
}
}

func TestTargetSetTargetGroupsPresentOnStartup(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
discoveryManager := NewManager(ctx, log.NewNopLogger())

// Set the updatert to a super long time, so we can verify that the skip worked correctly.
discoveryManager.updatert = 100 * time.Hour
go discoveryManager.Run()

c := map[string]Configs{
"prometheus": {
staticConfig("foo:9090"),
},
}
discoveryManager.ApplyConfig(c)

syncedTargets := <-discoveryManager.SyncCh()
require.Equal(t, 1, len(syncedTargets))
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(syncedTargets["prometheus"]))
p := pk("static", "prometheus", 0)
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(discoveryManager.targets))
}

func TestTargetSetTargetGroupsPresentOnConfigReload(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
35 changes: 26 additions & 9 deletions scrape/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,21 @@ type Options struct {
EnableProtobufNegotiation bool
// Option to increase the interval used by scrape manager to throttle target groups updates.
DiscoveryReloadInterval model.Duration
// Option to enable discovering targets immediately on start up as opposed
// to waiting for the interval defined in DiscoveryReloadInterval. Disabled by default.
//
// Useful for serverless flavors of OpenTelemetry contrib's prometheusreceiver.
DiscoveryReloadOnStartup bool

// Optional HTTP client options to use when scraping.
HTTPClientOptions []config_util.HTTPClientOption

// InitialScrapeOffset controls how long after startup we should scrape all
// targets. By default, all targets have an offset so we spread the
// targets. By default, all targets have an offset, so we spread the
// scraping load evenly within the Prometheus server. Configuring this will
// make it so all targets have the same configured offset, which may be
// undesirable as load is no longer evenly spread. This is useful however
// in serverless deployments where we're sensitive to the intitial offsets
// cause all targets to have the same static offset. This may be undesirable
// as load is no longer evenly spread, but is useful in serverless, sidecar
// based deployments where we're sensitive to the initial offsets
// and would like them to be small and configurable.
//
// NOTE(bwplotka): This option is experimental and not used by Prometheus.
Expand Down Expand Up @@ -198,8 +203,17 @@ func (m *Manager) reloader() {
reloadIntervalDuration = model.Duration(5 * time.Second)
}

ticker := time.NewTicker(time.Duration(reloadIntervalDuration))
// Skip the initial reload interval wait for the first reload.
if m.opts.DiscoveryReloadOnStartup {
select {
case <-m.triggerReload:
m.reload()
case <-m.graceShut:
return
}
}

ticker := time.NewTicker(time.Duration(reloadIntervalDuration))
defer ticker.Stop()

for {
Expand Down Expand Up @@ -243,8 +257,9 @@ func (m *Manager) reload() {
}(m.scrapePools[setName], groups)

}
m.mtxScrape.Unlock()
wg.Wait()
// Only unlock after all syncs finished.
m.mtxScrape.Unlock()
}

// setOffsetSeed calculates a global offsetSeed per server relying on extra label set.
Expand Down Expand Up @@ -272,9 +287,9 @@ func (m *Manager) Stop() {
close(m.graceShut)
}

// StopAfterScrapeAttempt stops manager after ensuring all targets' last scrape
// attempt happened after minScrapeTime. It cancels all running scrape pools and
// blocks until all have exited.
// StopAfterScrapeAttempt is like Stop, but it stops manager only after ensuring
// all targets' last scrape attempt happened after minScrapeTime. It cancels
// all running scrape pools and blocks until all have exited. Not a cancellable operation.
//
// It is likely that such shutdown scrape will cause irregular scrape interval and
// sudden efficiency (memory, CPU) spikes. However, it can be a fair tradeoff for
Expand All @@ -289,6 +304,8 @@ func (m *Manager) StopAfterScrapeAttempt(minScrapeTime time.Time) {

var wg sync.WaitGroup
for _, p := range m.scrapePools {
// NOTE(bwplotka): Locking here is not needed, because mtxScrape.Lock()
// ensures that nothing changing scrape pools *should* get through.
for _, l := range p.loops {
l := l
wg.Add(1)
Expand Down
49 changes: 24 additions & 25 deletions scrape/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,11 +708,14 @@ scrape_configs:
require.ElementsMatch(t, []string{"job1", "job3"}, scrapeManager.ScrapePools())
}

var (
noOffset = 0 * time.Nanosecond
largeOffset = 99 * time.Hour
oneSecondOffset = 1 * time.Second
tenSecondOffset = 10 * time.Second
)

func TestManagerStopAfterScrapeAttempt(t *testing.T) {
noOffset := 0 * time.Nanosecond
largeOffset := 99 * time.Hour
oneSecondOffset := 1 * time.Second
tenSecondOffset := 10 * time.Second
for _, tcase := range []struct {
name string
// initialScrapeOffset defines how long to wait before scraping all targets.
Expand Down Expand Up @@ -778,7 +781,7 @@ func TestManagerStopAfterScrapeAttempt(t *testing.T) {
expectedSamples: 1,
},
{
name: "no scrape on stop, with offset of 10s",
name: "no scrape on stop after 5s, with offset of 10s",
initialScrapeOffset: &tenSecondOffset,
stopDelay: 5 * time.Second,
stopFunc: func(m *Manager) { m.Stop() },
Expand All @@ -790,23 +793,12 @@ func TestManagerStopAfterScrapeAttempt(t *testing.T) {

// Setup scrape manager.
scrapeManager := NewManager(&Options{
InitialScrapeOffset: tcase.initialScrapeOffset,

// Extremely high value to turn it off. We don't want to wait minimum 5s, so
// we reload manually.
// TODO(bwplotka): Make scrape manager more testable.
DiscoveryReloadInterval: model.Duration(99 * time.Hour),
InitialScrapeOffset: tcase.initialScrapeOffset,
DiscoveryReloadOnStartup: true,
}, log.NewLogfmtLogger(os.Stderr), &collectResultAppendable{app})

require.NoError(t, scrapeManager.ApplyConfig(&config.Config{
GlobalConfig: config.GlobalConfig{
// Extremely high scrape interval, to ensure the only chance to see the
// sample is on start and stopAfterScrapeAttempt.
ScrapeInterval: model.Duration(99 * time.Hour),
ScrapeTimeout: model.Duration(10 * time.Second),
},
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
}))
ch := make(chan map[string][]*targetgroup.Group, 1)
go scrapeManager.Run(ch)

// Start fake HTTP target to scrape returning a single metric.
server := httptest.NewServer(
Expand All @@ -820,9 +812,17 @@ func TestManagerStopAfterScrapeAttempt(t *testing.T) {
serverURL, err := url.Parse(server.URL)
require.NoError(t, err)

// Add fake target directly into tsets + reload. Normally users would use
// Manager.Run and wait for minimum 5s refresh interval.
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
require.NoError(t, scrapeManager.ApplyConfig(&config.Config{
GlobalConfig: config.GlobalConfig{
// Extremely high scrape interval, to ensure the only chance to see the
// sample is on start and stopAfterScrapeAttempt.
ScrapeInterval: model.Duration(99 * time.Hour),
ScrapeTimeout: model.Duration(10 * time.Second),
},
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
}))

ch <- map[string][]*targetgroup.Group{
"test": {
{
Targets: []model.LabelSet{{
Expand All @@ -831,8 +831,7 @@ func TestManagerStopAfterScrapeAttempt(t *testing.T) {
}},
},
},
})
scrapeManager.reload()
}

// Wait for the defined stop delay, before stopping.
time.Sleep(tcase.stopDelay)
Expand Down
1 change: 1 addition & 0 deletions scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ type scrapePool struct {
cancel context.CancelFunc

// mtx must not be taken after targetMtx.
// mtx guards updates to scrape pool (e.g. loops and activeTarget maps).
mtx sync.Mutex
config *config.ScrapeConfig
client *http.Client
Expand Down