Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/updater/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ func main() {
})
log.Info("Configured concurrency")

groupUpdater := updater.GCS(ctx, client, opt.groupTimeout, opt.buildTimeout, opt.buildConcurrency, opt.confirm, opt.enableIgnoreSkip)

mets := updater.CreateMetrics(prometheus.NewFactory())

groupUpdater := updater.GCS(ctx, client, mets, opt.groupTimeout, opt.buildTimeout, opt.buildConcurrency, opt.confirm, opt.enableIgnoreSkip)

pubsubClient, err := gpubsub.NewClient(ctx, "", option.WithCredentialsFile(opt.creds))
if err != nil {
logrus.WithError(err).Fatal("Failed to create pubsub client")
Expand Down
17 changes: 10 additions & 7 deletions pkg/updater/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,17 @@ const componentName = "updater"

// Metrics holds metrics relevant to the Updater.
type Metrics struct {
UpdateState metrics.Cyclic
DelaySeconds metrics.Duration
UpdateState metrics.Cyclic
DelaySeconds metrics.Duration
IncompleteUpdates metrics.Counter
}

// CreateMetrics creates metrics for this controller
func CreateMetrics(factory metrics.Factory) *Metrics {
return &Metrics{
UpdateState: factory.NewCyclic(componentName),
DelaySeconds: factory.NewDuration("delay", "Seconds updater is behind schedule", "component"),
UpdateState: factory.NewCyclic(componentName),
DelaySeconds: factory.NewDuration("delay", "Seconds updater is behind schedule", "component"),
IncompleteUpdates: factory.NewCounter("incomplete-updates", "number of update attempts that don't complete"),
}
}

Expand Down Expand Up @@ -88,7 +90,7 @@ func (mets *Metrics) start() *metrics.CycleReporter {
type GroupUpdater func(parent context.Context, log logrus.FieldLogger, client gcs.Client, tg *configpb.TestGroup, gridPath gcs.Path) (bool, error)

// GCS returns a GCS-based GroupUpdater, which knows how to process result data stored in GCS.
func GCS(poolCtx context.Context, colClient gcs.Client, groupTimeout, buildTimeout time.Duration, concurrency int, write bool, enableIgnoreSkip bool) GroupUpdater {
func GCS(poolCtx context.Context, colClient gcs.Client, mets *Metrics, groupTimeout, buildTimeout time.Duration, concurrency int, write bool, enableIgnoreSkip bool) GroupUpdater {
var readResult *resultReader
if poolCtx == nil {
// TODO(fejta): remove check soon
Expand All @@ -105,7 +107,7 @@ func GCS(poolCtx context.Context, colClient gcs.Client, groupTimeout, buildTimeo
defer cancel()
gcsColReader := gcsColumnReader(colClient, buildTimeout, readResult, enableIgnoreSkip)
reprocess := 20 * time.Minute // allow 20m for prow to finish uploading artifacts
return InflateDropAppend(ctx, log, client, tg, gridPath, write, gcsColReader, reprocess)
return InflateDropAppend(ctx, log, client, tg, gridPath, write, gcsColReader, reprocess, mets)
}
}

Expand Down Expand Up @@ -567,7 +569,7 @@ func SortStarted(cols []InflatedColumn) {
const byteCeiling = 2e6 // 2 megabytes

// InflateDropAppend updates groups by downloading the existing grid, dropping old rows and appending new ones.
func InflateDropAppend(ctx context.Context, alog logrus.FieldLogger, client gcs.Client, tg *configpb.TestGroup, gridPath gcs.Path, write bool, readCols ColumnReader, reprocess time.Duration) (bool, error) {
func InflateDropAppend(ctx context.Context, alog logrus.FieldLogger, client gcs.Client, tg *configpb.TestGroup, gridPath gcs.Path, write bool, readCols ColumnReader, reprocess time.Duration, mets *Metrics) (bool, error) {
log := alog.(logrus.Ext1FieldLogger) // Add trace method
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -736,6 +738,7 @@ func InflateDropAppend(ctx context.Context, alog logrus.FieldLogger, client gcs.
}
if unreadColumns {
log = log.WithField("more", true)
mets.IncompleteUpdates.Add(1)
}
log.WithFields(logrus.Fields{
"cols": len(grid.Columns),
Expand Down
5 changes: 3 additions & 2 deletions pkg/updater/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestGCS(t *testing.T) {
}
}
}()
updater := GCS(tc.ctx, nil, 0, 0, 0, false, false)
updater := GCS(tc.ctx, nil, nil, 0, 0, 0, false, false)
_, err := updater(ctx, logrus.WithField("case", tc.name), nil, tc.group, gcs.Path{})
switch {
case err != nil:
Expand Down Expand Up @@ -425,7 +425,7 @@ func TestUpdate(t *testing.T) {
if tc.groupUpdater == nil {
poolCtx, poolCancel := context.WithCancel(context.Background())
defer poolCancel()
tc.groupUpdater = GCS(poolCtx, client, *tc.groupTimeout, *tc.buildTimeout, tc.buildConcurrency, !tc.skipConfirm, false)
tc.groupUpdater = GCS(poolCtx, client, nil, *tc.groupTimeout, *tc.buildTimeout, tc.buildConcurrency, false, !tc.skipConfirm)
}
opts := &UpdateOptions{
ConfigPath: configPath,
Expand Down Expand Up @@ -2059,6 +2059,7 @@ func TestInflateDropAppend(t *testing.T) {
!tc.skipWrite,
colReader,
tc.reprocess,
nil, //metric
)
switch {
case err != nil:
Expand Down