diff --git a/cmd/updater/main.go b/cmd/updater/main.go index d07c72922..07d625628 100644 --- a/cmd/updater/main.go +++ b/cmd/updater/main.go @@ -163,10 +163,10 @@ func main() { }) log.Info("Configured concurrency") - groupUpdater := updater.GCS(ctx, client, opt.groupTimeout, opt.buildTimeout, opt.buildConcurrency, opt.confirm, opt.enableIgnoreSkip) - mets := updater.CreateMetrics(prometheus.NewFactory()) + groupUpdater := updater.GCS(ctx, client, mets, opt.groupTimeout, opt.buildTimeout, opt.buildConcurrency, opt.confirm, opt.enableIgnoreSkip) + pubsubClient, err := gpubsub.NewClient(ctx, "", option.WithCredentialsFile(opt.creds)) if err != nil { logrus.WithError(err).Fatal("Failed to create pubsub client") diff --git a/pkg/updater/updater.go b/pkg/updater/updater.go index b3bbae8bc..d29326df2 100644 --- a/pkg/updater/updater.go +++ b/pkg/updater/updater.go @@ -49,15 +49,17 @@ const componentName = "updater" // Metrics holds metrics relevant to the Updater. type Metrics struct { - UpdateState metrics.Cyclic - DelaySeconds metrics.Duration + UpdateState metrics.Cyclic + DelaySeconds metrics.Duration + IncompleteUpdates metrics.Counter } // CreateMetrics creates metrics for this controller func CreateMetrics(factory metrics.Factory) *Metrics { return &Metrics{ - UpdateState: factory.NewCyclic(componentName), - DelaySeconds: factory.NewDuration("delay", "Seconds updater is behind schedule", "component"), + UpdateState: factory.NewCyclic(componentName), + DelaySeconds: factory.NewDuration("delay", "Seconds updater is behind schedule", "component"), + IncompleteUpdates: factory.NewCounter("incomplete-updates", "number of update attempts that don't complete"), } } @@ -88,7 +90,7 @@ func (mets *Metrics) start() *metrics.CycleReporter { type GroupUpdater func(parent context.Context, log logrus.FieldLogger, client gcs.Client, tg *configpb.TestGroup, gridPath gcs.Path) (bool, error) // GCS returns a GCS-based GroupUpdater, which knows how to process result data stored in GCS. -func GCS(poolCtx context.Context, colClient gcs.Client, groupTimeout, buildTimeout time.Duration, concurrency int, write bool, enableIgnoreSkip bool) GroupUpdater { +func GCS(poolCtx context.Context, colClient gcs.Client, mets *Metrics, groupTimeout, buildTimeout time.Duration, concurrency int, write bool, enableIgnoreSkip bool) GroupUpdater { var readResult *resultReader if poolCtx == nil { // TODO(fejta): remove check soon @@ -105,7 +107,7 @@ func GCS(poolCtx context.Context, colClient gcs.Client, groupTimeout, buildTimeo defer cancel() gcsColReader := gcsColumnReader(colClient, buildTimeout, readResult, enableIgnoreSkip) reprocess := 20 * time.Minute // allow 20m for prow to finish uploading artifacts - return InflateDropAppend(ctx, log, client, tg, gridPath, write, gcsColReader, reprocess) + return InflateDropAppend(ctx, log, client, tg, gridPath, write, gcsColReader, reprocess, mets) } } @@ -567,7 +569,7 @@ func SortStarted(cols []InflatedColumn) { const byteCeiling = 2e6 // 2 megabytes // InflateDropAppend updates groups by downloading the existing grid, dropping old rows and appending new ones. -func InflateDropAppend(ctx context.Context, alog logrus.FieldLogger, client gcs.Client, tg *configpb.TestGroup, gridPath gcs.Path, write bool, readCols ColumnReader, reprocess time.Duration) (bool, error) { +func InflateDropAppend(ctx context.Context, alog logrus.FieldLogger, client gcs.Client, tg *configpb.TestGroup, gridPath gcs.Path, write bool, readCols ColumnReader, reprocess time.Duration, mets *Metrics) (bool, error) { log := alog.(logrus.Ext1FieldLogger) // Add trace method ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -736,6 +738,7 @@ func InflateDropAppend(ctx context.Context, alog logrus.FieldLogger, client gcs. } if unreadColumns { log = log.WithField("more", true) + mets.IncompleteUpdates.Add(1) } log.WithFields(logrus.Fields{ "cols": len(grid.Columns), diff --git a/pkg/updater/updater_test.go b/pkg/updater/updater_test.go index 651705d46..36c52bd44 100644 --- a/pkg/updater/updater_test.go +++ b/pkg/updater/updater_test.go @@ -95,7 +95,7 @@ func TestGCS(t *testing.T) { } } }() - updater := GCS(tc.ctx, nil, 0, 0, 0, false, false) + updater := GCS(tc.ctx, nil, nil, 0, 0, 0, false, false) _, err := updater(ctx, logrus.WithField("case", tc.name), nil, tc.group, gcs.Path{}) switch { case err != nil: @@ -425,7 +425,7 @@ func TestUpdate(t *testing.T) { if tc.groupUpdater == nil { poolCtx, poolCancel := context.WithCancel(context.Background()) defer poolCancel() - tc.groupUpdater = GCS(poolCtx, client, *tc.groupTimeout, *tc.buildTimeout, tc.buildConcurrency, !tc.skipConfirm, false) + tc.groupUpdater = GCS(poolCtx, client, nil, *tc.groupTimeout, *tc.buildTimeout, tc.buildConcurrency, false, !tc.skipConfirm) } opts := &UpdateOptions{ ConfigPath: configPath, @@ -2059,6 +2059,7 @@ func TestInflateDropAppend(t *testing.T) { !tc.skipWrite, colReader, tc.reprocess, + nil, //metric ) switch { case err != nil: