From 1f9eec86525391bc89a0ab2d4bc19bf38f4a9fee Mon Sep 17 00:00:00 2001 From: Shashank Date: Thu, 23 Feb 2023 19:45:56 +0530 Subject: [PATCH 1/2] added a more metrics Signed-off-by: Shashank --- pkg/updater/updater.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/updater/updater.go b/pkg/updater/updater.go index b3bbae8b..459f5b7c 100644 --- a/pkg/updater/updater.go +++ b/pkg/updater/updater.go @@ -51,6 +51,7 @@ const componentName = "updater" type Metrics struct { UpdateState metrics.Cyclic DelaySeconds metrics.Duration + MoreCounter metrics.Counter } // CreateMetrics creates metrics for this controller @@ -58,6 +59,7 @@ func CreateMetrics(factory metrics.Factory) *Metrics { return &Metrics{ UpdateState: factory.NewCyclic(componentName), DelaySeconds: factory.NewDuration("delay", "Seconds updater is behind schedule", "component"), + MoreCounter: factory.NewCounter("counter", "number of unread columns"), } } @@ -567,7 +569,7 @@ func SortStarted(cols []InflatedColumn) { const byteCeiling = 2e6 // 2 megabytes // InflateDropAppend updates groups by downloading the existing grid, dropping old rows and appending new ones. -func InflateDropAppend(ctx context.Context, alog logrus.FieldLogger, client gcs.Client, tg *configpb.TestGroup, gridPath gcs.Path, write bool, readCols ColumnReader, reprocess time.Duration) (bool, error) { +func InflateDropAppend(ctx context.Context, alog logrus.FieldLogger, client gcs.Client, tg *configpb.TestGroup, gridPath gcs.Path, write bool, readCols ColumnReader, reprocess time.Duration, mets *Metrics) (bool, error) { log := alog.(logrus.Ext1FieldLogger) // Add trace method ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -736,6 +738,7 @@ func InflateDropAppend(ctx context.Context, alog logrus.FieldLogger, client gcs. } if unreadColumns { log = log.WithField("more", true) + mets.MoreCounter.Add(1) } log.WithFields(logrus.Fields{ "cols": len(grid.Columns), From d335ff6ed4663d9aff63580b8d832ba3d7ee5513 Mon Sep 17 00:00:00 2001 From: Shashank Date: Sun, 26 Feb 2023 23:27:11 +0530 Subject: [PATCH 2/2] few additions Signed-off-by: Shashank --- cmd/updater/main.go | 4 ++-- pkg/updater/updater.go | 18 +++++++++--------- pkg/updater/updater_test.go | 5 +++-- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/cmd/updater/main.go b/cmd/updater/main.go index d07c7292..07d62562 100644 --- a/cmd/updater/main.go +++ b/cmd/updater/main.go @@ -163,10 +163,10 @@ func main() { }) log.Info("Configured concurrency") - groupUpdater := updater.GCS(ctx, client, opt.groupTimeout, opt.buildTimeout, opt.buildConcurrency, opt.confirm, opt.enableIgnoreSkip) - mets := updater.CreateMetrics(prometheus.NewFactory()) + groupUpdater := updater.GCS(ctx, client, mets, opt.groupTimeout, opt.buildTimeout, opt.buildConcurrency, opt.confirm, opt.enableIgnoreSkip) + pubsubClient, err := gpubsub.NewClient(ctx, "", option.WithCredentialsFile(opt.creds)) if err != nil { logrus.WithError(err).Fatal("Failed to create pubsub client") diff --git a/pkg/updater/updater.go b/pkg/updater/updater.go index 459f5b7c..d29326df 100644 --- a/pkg/updater/updater.go +++ b/pkg/updater/updater.go @@ -49,17 +49,17 @@ const componentName = "updater" // Metrics holds metrics relevant to the Updater. type Metrics struct { - UpdateState metrics.Cyclic - DelaySeconds metrics.Duration - MoreCounter metrics.Counter + UpdateState metrics.Cyclic + DelaySeconds metrics.Duration + IncompleteUpdates metrics.Counter } // CreateMetrics creates metrics for this controller func CreateMetrics(factory metrics.Factory) *Metrics { return &Metrics{ - UpdateState: factory.NewCyclic(componentName), - DelaySeconds: factory.NewDuration("delay", "Seconds updater is behind schedule", "component"), - MoreCounter: factory.NewCounter("counter", "number of unread columns"), + UpdateState: factory.NewCyclic(componentName), + DelaySeconds: factory.NewDuration("delay", "Seconds updater is behind schedule", "component"), + IncompleteUpdates: factory.NewCounter("incomplete-updates", "number of update attempts that don't complete"), } } @@ -90,7 +90,7 @@ func (mets *Metrics) start() *metrics.CycleReporter { type GroupUpdater func(parent context.Context, log logrus.FieldLogger, client gcs.Client, tg *configpb.TestGroup, gridPath gcs.Path) (bool, error) // GCS returns a GCS-based GroupUpdater, which knows how to process result data stored in GCS. -func GCS(poolCtx context.Context, colClient gcs.Client, groupTimeout, buildTimeout time.Duration, concurrency int, write bool, enableIgnoreSkip bool) GroupUpdater { +func GCS(poolCtx context.Context, colClient gcs.Client, mets *Metrics, groupTimeout, buildTimeout time.Duration, concurrency int, write bool, enableIgnoreSkip bool) GroupUpdater { var readResult *resultReader if poolCtx == nil { // TODO(fejta): remove check soon @@ -107,7 +107,7 @@ func GCS(poolCtx context.Context, colClient gcs.Client, groupTimeout, buildTimeo defer cancel() gcsColReader := gcsColumnReader(colClient, buildTimeout, readResult, enableIgnoreSkip) reprocess := 20 * time.Minute // allow 20m for prow to finish uploading artifacts - return InflateDropAppend(ctx, log, client, tg, gridPath, write, gcsColReader, reprocess) + return InflateDropAppend(ctx, log, client, tg, gridPath, write, gcsColReader, reprocess, mets) } } @@ -738,7 +738,7 @@ func InflateDropAppend(ctx context.Context, alog logrus.FieldLogger, client gcs. } if unreadColumns { log = log.WithField("more", true) - mets.MoreCounter.Add(1) + mets.IncompleteUpdates.Add(1) } log.WithFields(logrus.Fields{ "cols": len(grid.Columns), diff --git a/pkg/updater/updater_test.go b/pkg/updater/updater_test.go index 651705d4..36c52bd4 100644 --- a/pkg/updater/updater_test.go +++ b/pkg/updater/updater_test.go @@ -95,7 +95,7 @@ func TestGCS(t *testing.T) { } } }() - updater := GCS(tc.ctx, nil, 0, 0, 0, false, false) + updater := GCS(tc.ctx, nil, nil, 0, 0, 0, false, false) _, err := updater(ctx, logrus.WithField("case", tc.name), nil, tc.group, gcs.Path{}) switch { case err != nil: @@ -425,7 +425,7 @@ func TestUpdate(t *testing.T) { if tc.groupUpdater == nil { poolCtx, poolCancel := context.WithCancel(context.Background()) defer poolCancel() - tc.groupUpdater = GCS(poolCtx, client, *tc.groupTimeout, *tc.buildTimeout, tc.buildConcurrency, !tc.skipConfirm, false) + tc.groupUpdater = GCS(poolCtx, client, nil, *tc.groupTimeout, *tc.buildTimeout, tc.buildConcurrency, false, !tc.skipConfirm) } opts := &UpdateOptions{ ConfigPath: configPath, @@ -2059,6 +2059,7 @@ func TestInflateDropAppend(t *testing.T) { !tc.skipWrite, colReader, tc.reprocess, + nil, //metric ) switch { case err != nil: