From da925f154a189718925133bcac5e6b27d0e00d18 Mon Sep 17 00:00:00 2001 From: Maurice Yap Date: Wed, 18 Mar 2026 09:54:41 +0000 Subject: [PATCH 1/6] Broadside: support partitioning by submitted time feature This adds a feature toggle to Broadside which range-partitions the Postgres `job` table by the `submitted` timestamp with daily partitions. This is mutually exclusive with the existing `hotColdSplit` toggle. When enabled: - The job table PK becomes (job_id, submitted) - Child tables (job_run, job_spec, jon_error) gain a submitted column - Ingestion bypasses `LookoutDb` (used in production), and instead uses direct SQL with submitted in WHERE clauses for partition pruning (this removes the risk of breaking anything running in production, at the cost of duplicating functionality - I think this is a good trade-off) - Historical bulk inserts split per age bucket so each INSERT targets a single partition This also fixes a bug where the generate_series SQL function hardcoded submitted to `NOW() - 24 hours`, ignoring the configured `jobAgeDays`. It now uses a LATERAL subquery to vary the base time per age bucket. Signed-off-by: Maurice Yap --- .../configs/examples/test-postgres.yaml | 6 + internal/broadside/actions/actor.go | 16 +- .../broadside/configuration/configuration.go | 3 +- .../broadside/configuration/validation.go | 3 + .../configuration/validation_test.go | 9 + internal/broadside/db/db.go | 101 ++-- internal/broadside/db/doc.go | 14 +- internal/broadside/db/historical.go | 30 +- internal/broadside/db/partition.go | 98 ++++ internal/broadside/db/postgres.go | 160 +++++- .../broadside/db/postgres_historical_test.go | 12 +- internal/broadside/db/postgres_partitioned.go | 479 ++++++++++++++++++ internal/broadside/db/sql/partition_down.sql | 73 +++ internal/broadside/db/sql/partition_up.sql | 115 +++++ internal/broadside/ingester/ingester.go | 83 +-- internal/broadside/jobspec/state.go | 29 +- internal/broadside/orchestrator/runner.go | 13 + 17 files changed, 1112 insertions(+), 132 deletions(-) create mode 100644 internal/broadside/db/partition.go create mode 100644 internal/broadside/db/postgres_partitioned.go create mode 100644 internal/broadside/db/sql/partition_down.sql create mode 100644 internal/broadside/db/sql/partition_up.sql diff --git a/cmd/broadside/configs/examples/test-postgres.yaml b/cmd/broadside/configs/examples/test-postgres.yaml index e78a11b832d..c6839eca086 100644 --- a/cmd/broadside/configs/examples/test-postgres.yaml +++ b/cmd/broadside/configs/examples/test-postgres.yaml @@ -1,6 +1,12 @@ testDuration: 60s warmupDuration: 10s +skipTearDown: false + +featureToggles: + partitionBySubmitted: false + hotColdSplit: false + databaseConfig: postgres: host: localhost diff --git a/internal/broadside/actions/actor.go b/internal/broadside/actions/actor.go index 047648264be..917cb444067 100644 --- a/internal/broadside/actions/actor.go +++ b/internal/broadside/actions/actor.go @@ -170,8 +170,9 @@ func (a *Actor) executeReprioritisation(ctx context.Context, queue string, jobSe queries := make([]db.IngestionQuery, 0, len(jobs)) for _, job := range jobs { queries = append(queries, db.UpdateJobPriority{ - JobID: job.JobID, - Priority: jobspec.PriorityValues, + JobID: job.JobID, + Priority: jobspec.PriorityValues, + Submitted: job.Submitted, }) } @@ -230,6 +231,7 @@ func (a *Actor) executeCancellation(ctx context.Context, queue string, jobSet st Time: now, CancelReason: "Bulk cancellation via Broadside test", CancelUser: "broadside-test", + Submitted: job.Submitted, }) } @@ -257,8 +259,9 @@ func (a *Actor) executeCancellation(ctx context.Context, queue string, jobSet st // jobInfo holds minimal information about a job needed for actions. type jobInfo struct { - JobID string - State string + JobID string + State string + Submitted time.Time } // getActiveJobs queries the database for active jobs in the specified queue and job set. @@ -302,8 +305,9 @@ func (a *Actor) getActiveJobs(ctx context.Context, queue string, jobSet string) result := make([]jobInfo, 0, len(jobs)) for _, job := range jobs { result = append(result, jobInfo{ - JobID: job.JobId, - State: job.State, + JobID: job.JobId, + State: job.State, + Submitted: job.Submitted, }) } diff --git a/internal/broadside/configuration/configuration.go b/internal/broadside/configuration/configuration.go index 9f4ac5f3a3b..4361cc7f8e7 100644 --- a/internal/broadside/configuration/configuration.go +++ b/internal/broadside/configuration/configuration.go @@ -15,7 +15,8 @@ type TestConfig struct { } type FeatureToggles struct { - HotColdSplit bool `yaml:"hotColdSplit,omitempty"` + HotColdSplit bool `yaml:"hotColdSplit,omitempty"` + PartitionBySubmitted bool `yaml:"partitionBySubmitted,omitempty"` } type DatabaseConfig struct { diff --git a/internal/broadside/configuration/validation.go b/internal/broadside/configuration/validation.go index fdecdb29485..0dcbfe7d7ec 100644 --- a/internal/broadside/configuration/validation.go +++ b/internal/broadside/configuration/validation.go @@ -11,6 +11,9 @@ const proportionTolerance = 0.001 // Tolerance for floating point comparison // Validate validates the Test configuration. func (t *TestConfig) Validate() error { + if t.FeatureToggles.HotColdSplit && t.FeatureToggles.PartitionBySubmitted { + return fmt.Errorf("feature toggles are mutually exclusive: only one of hotColdSplit, partitionBySubmitted may be enabled") + } if t.TestDuration <= 0 { return fmt.Errorf("testDuration must be positive, got %v", t.TestDuration) } diff --git a/internal/broadside/configuration/validation_test.go b/internal/broadside/configuration/validation_test.go index e3086cfe143..05cefd84671 100644 --- a/internal/broadside/configuration/validation_test.go +++ b/internal/broadside/configuration/validation_test.go @@ -55,6 +55,15 @@ func TestTest_Validate(t *testing.T) { modify: func(t *TestConfig) {}, wantErr: false, }, + { + name: "mutually exclusive feature toggles", + modify: func(t *TestConfig) { + t.FeatureToggles.HotColdSplit = true + t.FeatureToggles.PartitionBySubmitted = true + }, + wantErr: true, + errText: "feature toggles are mutually exclusive", + }, { name: "zero test duration", modify: func(t *TestConfig) { diff --git a/internal/broadside/db/db.go b/internal/broadside/db/db.go index 011d5b3520c..deac88288b1 100644 --- a/internal/broadside/db/db.go +++ b/internal/broadside/db/db.go @@ -128,8 +128,9 @@ type NewJob struct { } type UpdateJobPriority struct { - JobID string - Priority int64 + JobID string + Priority int64 + Submitted time.Time } func (UpdateJobPriority) isIngestionQuery() {} @@ -139,41 +140,47 @@ type SetJobCancelled struct { Time time.Time CancelReason string CancelUser string + Submitted time.Time } func (SetJobCancelled) isIngestionQuery() {} type SetJobSucceeded struct { - JobID string - Time time.Time + JobID string + Time time.Time + Submitted time.Time } func (SetJobSucceeded) isIngestionQuery() {} type InsertJobError struct { - JobID string - Error []byte + JobID string + Error []byte + Submitted time.Time } func (InsertJobError) isIngestionQuery() {} type SetJobPreempted struct { - JobID string - Time time.Time + JobID string + Time time.Time + Submitted time.Time } func (SetJobPreempted) isIngestionQuery() {} type SetJobRejected struct { - JobID string - Time time.Time + JobID string + Time time.Time + Submitted time.Time } func (SetJobRejected) isIngestionQuery() {} type SetJobErrored struct { - JobID string - Time time.Time + JobID string + Time time.Time + Submitted time.Time } func (SetJobErrored) isIngestionQuery() {} @@ -182,81 +189,91 @@ type SetJobRunning struct { JobID string Time time.Time LatestRunID string + Submitted time.Time } func (SetJobRunning) isIngestionQuery() {} type SetJobRunStarted struct { - JobRunID string - Time time.Time - Node string + JobRunID string + Time time.Time + Node string + Submitted time.Time } func (SetJobRunStarted) isIngestionQuery() {} type SetJobPending struct { - JobID string - Time time.Time - RunID string + JobID string + Time time.Time + RunID string + Submitted time.Time } func (SetJobPending) isIngestionQuery() {} type SetJobRunPending struct { - JobRunID string - Time time.Time + JobRunID string + Time time.Time + Submitted time.Time } func (SetJobRunPending) isIngestionQuery() {} type SetJobRunCancelled struct { - JobRunID string - Time time.Time + JobRunID string + Time time.Time + Submitted time.Time } func (SetJobRunCancelled) isIngestionQuery() {} type SetJobRunFailed struct { - JobRunID string - Time time.Time - Error []byte - Debug []byte - ExitCode int32 + JobRunID string + Time time.Time + Error []byte + Debug []byte + ExitCode int32 + Submitted time.Time } func (SetJobRunFailed) isIngestionQuery() {} type SetJobRunSucceeded struct { - JobRunID string - Time time.Time - ExitCode int32 + JobRunID string + Time time.Time + ExitCode int32 + Submitted time.Time } func (SetJobRunSucceeded) isIngestionQuery() {} type SetJobRunPreempted struct { - JobRunID string - Time time.Time - Error []byte + JobRunID string + Time time.Time + Error []byte + Submitted time.Time } func (SetJobRunPreempted) isIngestionQuery() {} type SetJobLeased struct { - JobID string - Time time.Time - RunID string + JobID string + Time time.Time + RunID string + Submitted time.Time } func (SetJobLeased) isIngestionQuery() {} type InsertJobRun struct { - JobRunID string - JobID string - Cluster string - Node string - Pool string - Time time.Time + JobRunID string + JobID string + Cluster string + Node string + Pool string + Time time.Time + Submitted time.Time } func (InsertJobRun) isIngestionQuery() {} diff --git a/internal/broadside/db/doc.go b/internal/broadside/db/doc.go index 97b727f831b..7b0be78893d 100644 --- a/internal/broadside/db/doc.go +++ b/internal/broadside/db/doc.go @@ -96,6 +96,18 @@ Three implementations are provided: in a single statement — ensuring no terminal state is ever written to job. Historical job population writes directly to job_historical when the toggle is enabled. + When the PartitionBySubmitted feature toggle is enabled (mutually exclusive + with HotColdSplit), InitialiseSchema converts the job table into a + range-partitioned table on the submitted column (sql/partition_up.sql), + creates daily partitions covering all configured jobAgeDays plus a forward + buffer, and adds a DEFAULT partition. Child tables (job_run, job_spec, + job_error) gain a NOT NULL submitted column. During ingestion, + ExecuteIngestionQueryBatch bypasses LookoutDb entirely, using direct SQL that + includes submitted in INSERT columns and UPDATE WHERE clauses for partition + pruning. Historical job population splits per age bucket per chunk so that + each INSERT targets a single partition. TearDown drops the partitioned table, + recreates the original unpartitioned schema (sql/partition_down.sql), and + removes the submitted column from child tables. - ClickHouseDatabase: ClickHouse adapter (placeholder implementation) - MemoryDatabase: In-memory adapter for smoke-testing Broadside @@ -138,7 +150,7 @@ Create a database instance using the appropriate constructor: "host": "localhost", "port": "5432", "database": "lookout", - }, nil, nil) + }, configuration.FeatureToggles{}, nil, nil, nil) // For ClickHouse db := db.NewClickHouseDatabase(map[string]string{ diff --git a/internal/broadside/db/historical.go b/internal/broadside/db/historical.go index cb3244fc8f4..d917af41849 100644 --- a/internal/broadside/db/historical.go +++ b/internal/broadside/db/historical.go @@ -69,35 +69,35 @@ func buildHistoricalJobQueries(jobNum int, params HistoricalJobsParams) []Ingest queries := []IngestionQuery{ InsertJob{Job: newJob, JobSpec: params.JobSpecBytes}, - SetJobLeased{JobID: jobID, Time: leasedTime, RunID: runID}, - InsertJobRun{JobRunID: runID, JobID: jobID, Cluster: cluster, Node: node, Pool: jobspec.GetPool(jobNum), Time: leasedTime}, - SetJobPending{JobID: jobID, Time: pendingTime, RunID: runID}, - SetJobRunPending{JobRunID: runID, Time: pendingTime}, - SetJobRunning{JobID: jobID, Time: runningTime, LatestRunID: runID}, - SetJobRunStarted{JobRunID: runID, Time: runningTime, Node: node}, + SetJobLeased{JobID: jobID, Time: leasedTime, RunID: runID, Submitted: baseTime}, + InsertJobRun{JobRunID: runID, JobID: jobID, Cluster: cluster, Node: node, Pool: jobspec.GetPool(jobNum), Time: leasedTime, Submitted: baseTime}, + SetJobPending{JobID: jobID, Time: pendingTime, RunID: runID, Submitted: baseTime}, + SetJobRunPending{JobRunID: runID, Time: pendingTime, Submitted: baseTime}, + SetJobRunning{JobID: jobID, Time: runningTime, LatestRunID: runID, Submitted: baseTime}, + SetJobRunStarted{JobRunID: runID, Time: runningTime, Node: node, Submitted: baseTime}, } switch historicalState(jobNum, params) { case jobspec.StateSucceeded: queries = append(queries, - SetJobSucceeded{JobID: jobID, Time: terminalTime}, - SetJobRunSucceeded{JobRunID: runID, Time: terminalTime}, + SetJobSucceeded{JobID: jobID, Time: terminalTime, Submitted: baseTime}, + SetJobRunSucceeded{JobRunID: runID, Time: terminalTime, Submitted: baseTime}, ) case jobspec.StateErrored: queries = append(queries, - SetJobErrored{JobID: jobID, Time: terminalTime}, - SetJobRunFailed{JobRunID: runID, Time: terminalTime, Error: params.ErrorBytes, Debug: params.DebugBytes}, - InsertJobError{JobID: jobID, Error: params.ErrorBytes}, + SetJobErrored{JobID: jobID, Time: terminalTime, Submitted: baseTime}, + SetJobRunFailed{JobRunID: runID, Time: terminalTime, Error: params.ErrorBytes, Debug: params.DebugBytes, Submitted: baseTime}, + InsertJobError{JobID: jobID, Error: params.ErrorBytes, Submitted: baseTime}, ) case jobspec.StateCancelled: queries = append(queries, - SetJobCancelled{JobID: jobID, Time: terminalTime, CancelReason: "user requested", CancelUser: params.QueueName}, - SetJobRunCancelled{JobRunID: runID, Time: terminalTime}, + SetJobCancelled{JobID: jobID, Time: terminalTime, CancelReason: "user requested", CancelUser: params.QueueName, Submitted: baseTime}, + SetJobRunCancelled{JobRunID: runID, Time: terminalTime, Submitted: baseTime}, ) case jobspec.StatePreempted: queries = append(queries, - SetJobPreempted{JobID: jobID, Time: terminalTime}, - SetJobRunPreempted{JobRunID: runID, Time: terminalTime, Error: params.PreemptionBytes}, + SetJobPreempted{JobID: jobID, Time: terminalTime, Submitted: baseTime}, + SetJobRunPreempted{JobRunID: runID, Time: terminalTime, Error: params.PreemptionBytes, Submitted: baseTime}, ) } diff --git a/internal/broadside/db/partition.go b/internal/broadside/db/partition.go new file mode 100644 index 00000000000..17b8f247148 --- /dev/null +++ b/internal/broadside/db/partition.go @@ -0,0 +1,98 @@ +package db + +import ( + "context" + _ "embed" + "fmt" + "time" + + "github.com/armadaproject/armada/internal/common/logging" +) + +//go:embed sql/partition_up.sql +var partitionMigrationSQL string + +//go:embed sql/partition_down.sql +var partitionRevertSQL string + +// applyPartitionMigration applies the partition-by-submitted migration, +// then creates daily partitions covering all configured jobAgeDays plus +// a buffer for live ingestion, plus a DEFAULT partition. +func (p *PostgresDatabase) applyPartitionMigration(ctx context.Context, jobAgeDays []int) error { + // Check whether the job table is already partitioned (relkind 'p'). + // If so, skip the one-time conversion and just ensure partitions exist. + var relkind string + if err := p.pool.QueryRow(ctx, + "SELECT relkind::text FROM pg_class WHERE relname = 'job'").Scan(&relkind); err != nil { + return fmt.Errorf("checking job table type: %w", err) + } + + if relkind != "p" { + if _, err := p.pool.Exec(ctx, partitionMigrationSQL); err != nil { + return fmt.Errorf("applying partition migration: %w", err) + } + + if err := p.createDailyPartitions(ctx, jobAgeDays); err != nil { + return fmt.Errorf("creating daily partitions: %w", err) + } + + // Move any existing rows from the old unpartitioned table into the + // new partitioned one, then drop the old table. This must happen + // after partitions exist so Postgres can route the rows. + if _, err := p.pool.Exec(ctx, "INSERT INTO job SELECT * FROM job_unpartitioned"); err != nil { + return fmt.Errorf("moving rows to partitioned table: %w", err) + } + if _, err := p.pool.Exec(ctx, "DROP TABLE job_unpartitioned"); err != nil { + return fmt.Errorf("dropping old unpartitioned table: %w", err) + } + + logging.Info("Converted job table to range-partitioned") + } else { + // Already partitioned — just ensure any new date partitions exist. + if err := p.createDailyPartitions(ctx, jobAgeDays); err != nil { + return fmt.Errorf("creating daily partitions: %w", err) + } + logging.Info("Job table already partitioned, ensured partitions are up to date") + } + + return nil +} + +// createDailyPartitions creates one partition per day covering all dates +// in jobAgeDays (relative to today) plus a 2-day forward buffer, and a +// DEFAULT partition as a safety net. +func (p *PostgresDatabase) createDailyPartitions(ctx context.Context, jobAgeDays []int) error { + today := time.Now().Truncate(24 * time.Hour) + + dates := make(map[time.Time]struct{}) + for _, days := range jobAgeDays { + date := today.AddDate(0, 0, -days) + dates[date] = struct{}{} + } + // Add today + 2 days buffer for live ingestion + for offset := 0; offset <= 2; offset++ { + dates[today.AddDate(0, 0, offset)] = struct{}{} + } + + for date := range dates { + partitionName := fmt.Sprintf("job_p%s", date.Format("20060102")) + nextDay := date.AddDate(0, 0, 1) + sql := fmt.Sprintf( + "CREATE TABLE IF NOT EXISTS %s PARTITION OF job FOR VALUES FROM ('%s') TO ('%s')", + partitionName, + date.Format("2006-01-02"), + nextDay.Format("2006-01-02"), + ) + if _, err := p.pool.Exec(ctx, sql); err != nil { + return fmt.Errorf("creating partition %s: %w", partitionName, err) + } + logging.Infof("Created partition %s", partitionName) + } + + if _, err := p.pool.Exec(ctx, "CREATE TABLE IF NOT EXISTS job_default PARTITION OF job DEFAULT"); err != nil { + return fmt.Errorf("creating default partition: %w", err) + } + logging.Info("Created default partition job_default") + + return nil +} diff --git a/internal/broadside/db/postgres.go b/internal/broadside/db/postgres.go index 9571b5703b9..d81a8350ba3 100644 --- a/internal/broadside/db/postgres.go +++ b/internal/broadside/db/postgres.go @@ -35,6 +35,7 @@ type PostgresDatabase struct { features broadsideconfiguration.FeatureToggles tuningSQLStatements []string tuningRevertSQLStatements []string + jobAgeDays []int pool *pgxpool.Pool lookoutDb *lookoutdb.LookoutDb jobsRepository *repository.SqlGetJobsRepository @@ -52,12 +53,13 @@ type PostgresDatabase struct { // - password: database password // - dbname: database name (e.g., "broadside_test") // - sslmode: SSL mode (e.g., "disable") -func NewPostgresDatabase(config map[string]string, features broadsideconfiguration.FeatureToggles, tuningSQLStatements []string, tuningRevertSQLStatements []string) *PostgresDatabase { +func NewPostgresDatabase(config map[string]string, features broadsideconfiguration.FeatureToggles, tuningSQLStatements []string, tuningRevertSQLStatements []string, jobAgeDays []int) *PostgresDatabase { return &PostgresDatabase{ config: config, features: features, tuningSQLStatements: tuningSQLStatements, tuningRevertSQLStatements: tuningRevertSQLStatements, + jobAgeDays: jobAgeDays, } } @@ -87,17 +89,30 @@ func (p *PostgresDatabase) InitialiseSchema(ctx context.Context) error { return fmt.Errorf("applying migrations: %w", err) } - if err := p.applyTuningSQL(ctx); err != nil { - pool.Close() - return fmt.Errorf("applying tuning SQL: %w", err) - } - if p.features.HotColdSplit { if _, err := p.pool.Exec(ctx, hotColdMigrationSQL); err != nil { pool.Close() return fmt.Errorf("applying hot/cold split migration: %w", err) } logging.Info("Hot/cold split migration applied") + } else if p.features.PartitionBySubmitted { + if err := p.applyPartitionMigration(ctx, p.jobAgeDays); err != nil { + pool.Close() + return fmt.Errorf("applying partition-by-submitted migration: %w", err) + } + logging.Info("Partition-by-submitted migration applied") + } + + if p.features.PartitionBySubmitted { + if err := p.applyTuningSQLToPartitions(ctx); err != nil { + pool.Close() + return fmt.Errorf("applying tuning SQL to partitions: %w", err) + } + } else { + if err := p.applyTuningSQL(ctx); err != nil { + pool.Close() + return fmt.Errorf("applying tuning SQL: %w", err) + } } decompressor := &compress.NoOpDecompressor{} @@ -137,6 +152,79 @@ func (p *PostgresDatabase) revertTuningSQL(ctx context.Context) error { return nil } +// applyTuningSQLToPartitions applies tuning SQL to leaf partitions of the +// job table rather than the partitioned parent (which doesn't support storage +// parameters). Statements targeting other tables are applied as-is. +func (p *PostgresDatabase) applyTuningSQLToPartitions(ctx context.Context) error { + return p.execTuningSQLOnPartitions(ctx, p.tuningSQLStatements, "applying") +} + +// revertTuningSQLFromPartitions mirrors applyTuningSQLToPartitions for the +// revert path. +func (p *PostgresDatabase) revertTuningSQLFromPartitions(ctx context.Context) error { + return p.execTuningSQLOnPartitions(ctx, p.tuningRevertSQLStatements, "reverting") +} + +// execTuningSQLOnPartitions executes tuning SQL statements, rewriting any that +// target "ALTER TABLE job " to run against each leaf partition instead of the +// partitioned parent (which doesn't support storage parameters). +func (p *PostgresDatabase) execTuningSQLOnPartitions(ctx context.Context, stmts []string, verb string) error { + partitions, err := p.listJobPartitions(ctx) + if err != nil { + return err + } + + for i, stmt := range stmts { + if targetsJobTable(stmt) { + for _, part := range partitions { + partStmt := strings.Replace(stmt, "job", part, 1) + if _, err := p.pool.Exec(ctx, partStmt); err != nil { + return fmt.Errorf("%s tuning SQL statement %d on partition %s: %w", verb, i+1, part, err) + } + } + logging.Infof("%s tuning SQL statement %d: applied to %d partitions", strings.Title(verb), i+1, len(partitions)) + } else { + if _, err := p.pool.Exec(ctx, stmt); err != nil { + return fmt.Errorf("%s tuning SQL statement %d: %w", verb, i+1, err) + } + logging.Infof("%s tuning SQL statement %d", strings.Title(verb), i+1) + } + } + return nil +} + +// listJobPartitions returns the names of all leaf partitions of the job table. +func (p *PostgresDatabase) listJobPartitions(ctx context.Context) ([]string, error) { + rows, err := p.pool.Query(ctx, ` + SELECT c.relname FROM pg_inherits i + JOIN pg_class c ON c.oid = i.inhrelid + JOIN pg_class p ON p.oid = i.inhparent + WHERE p.relname = 'job' + ORDER BY c.relname`) + if err != nil { + return nil, fmt.Errorf("querying job partitions: %w", err) + } + defer rows.Close() + + var partitions []string + for rows.Next() { + var name string + if err := rows.Scan(&name); err != nil { + return nil, fmt.Errorf("scanning partition name: %w", err) + } + partitions = append(partitions, name) + } + return partitions, nil +} + +// targetsJobTable returns true if the SQL statement is an ALTER TABLE targeting +// the "job" table specifically (not job_run, job_spec, etc.). +func targetsJobTable(stmt string) bool { + lower := strings.ToLower(stmt) + return strings.Contains(lower, "alter table job ") && + !strings.Contains(lower, "alter table job_") +} + // ExecuteIngestionQueryBatch executes a batch of ingestion queries using the // same sequential phase ordering as the production Lookout ingester: // @@ -155,6 +243,10 @@ func (p *PostgresDatabase) revertTuningSQL(ctx context.Context) error { // wins) before being sent, preventing undefined behaviour when duplicate IDs // appear in the UPDATE … FROM temp-table pattern. func (p *PostgresDatabase) ExecuteIngestionQueryBatch(ctx context.Context, queries []IngestionQuery) error { + if p.features.PartitionBySubmitted { + return p.executePartitionedBatch(ctx, queries) + } + set, err := queriesToInstructionSet(queries) if err != nil { return err @@ -262,8 +354,14 @@ func (p *PostgresDatabase) GetJobGroups(ctx *context.Context, filters []*model.F // truncated and the hot/cold migration is reverted so the schema is left // in its original state. func (p *PostgresDatabase) TearDown(ctx context.Context) error { - if err := p.revertTuningSQL(ctx); err != nil { - return fmt.Errorf("reverting tuning SQL: %w", err) + if p.features.PartitionBySubmitted { + if err := p.revertTuningSQLFromPartitions(ctx); err != nil { + return fmt.Errorf("reverting tuning SQL from partitions: %w", err) + } + } else { + if err := p.revertTuningSQL(ctx); err != nil { + return fmt.Errorf("reverting tuning SQL: %w", err) + } } tables := []string{ @@ -289,6 +387,11 @@ func (p *PostgresDatabase) TearDown(ctx context.Context) error { return fmt.Errorf("reverting hot/cold split migration: %w", err) } logging.Info("Hot/cold split migration reverted") + } else if p.features.PartitionBySubmitted { + if _, err := p.pool.Exec(ctx, partitionRevertSQL); err != nil { + return fmt.Errorf("reverting partition migration: %w", err) + } + logging.Info("Partition-by-submitted migration reverted") } return nil @@ -420,6 +523,9 @@ func (p *PostgresDatabase) insertHistoricalJobChunkWithRetry(ctx context.Context // insertHistoricalJobChunk inserts a single chunk [startIdx, lastIdx] of // historical jobs in one transaction using server-side generate_series. func (p *PostgresDatabase) insertHistoricalJobChunk(ctx context.Context, params HistoricalJobsParams, startIdx, lastIdx int) error { + if p.features.PartitionBySubmitted { + return p.insertHistoricalJobChunkPartitioned(ctx, params, startIdx, lastIdx) + } prefix := fmt.Sprintf("%04d%04d", params.QueueIdx, params.JobSetIdx) succeeded := params.SucceededThreshold errored := params.ErroredThreshold @@ -432,12 +538,18 @@ func (p *PostgresDatabase) insertHistoricalJobChunk(ctx context.Context, params poolArr := stringSliceToSQL(jobspec.PoolOptions) nsArr := stringSliceToSQL(jobspec.NamespaceOptions) pcArr := stringSliceToSQL(jobspec.PriorityClassOptions) + ageArr := intSliceToSQL(params.JobAgeDays) + ageLen := len(params.JobAgeDays) jobTable := "job" if p.features.HotColdSplit { jobTable = "job_historical" } + // The LATERAL subquery computes a per-row base_time from the jobAgeDays + // array, distributing jobs evenly across age buckets (job i → bucket + // i%%len(jobAgeDays)). This replaces the previous hardcoded + // NOW() - INTERVAL '24 hours'. jobSQL := fmt.Sprintf(` INSERT INTO %s ( job_id, queue, owner, namespace, jobset, @@ -457,23 +569,24 @@ SELECT (%s)[i%%%d+1], (%s)[i%%%d+1], (i%%2000)+1, - NOW() - INTERVAL '24 hours', + t.base_time, CASE WHEN i%%1000 < %d THEN %d WHEN i%%1000 < %d THEN %d WHEN i%%1000 < %d THEN %d ELSE %d END, - NOW() - INTERVAL '24 hours' + INTERVAL '10 seconds', - EXTRACT(EPOCH FROM NOW() - INTERVAL '24 hours' + INTERVAL '10 seconds')::bigint, + t.base_time + INTERVAL '10 seconds', + EXTRACT(EPOCH FROM t.base_time + INTERVAL '10 seconds')::bigint, (%s)[i%%%d+1], %s, '%s' || lpad(i::text, 10, '0') || '00', CASE WHEN i%%1000 >= %d AND i%%1000 < %d - THEN NOW() - INTERVAL '24 hours' + INTERVAL '10 seconds' END, + THEN t.base_time + INTERVAL '10 seconds' END, CASE WHEN i%%1000 >= %d AND i%%1000 < %d THEN 'user requested' END, CASE WHEN i%%1000 >= %d AND i%%1000 < %d THEN '%s' END -FROM generate_series(%d, %d) AS i`, +FROM generate_series(%d, %d) AS i, +LATERAL (SELECT NOW() - (%s)[i%%%d+1] * INTERVAL '1 day' AS base_time) AS t`, jobTable, prefix, params.QueueName, params.QueueName, @@ -494,6 +607,7 @@ FROM generate_series(%d, %d) AS i`, errored, cancelled, errored, cancelled, params.QueueName, startIdx, lastIdx, + ageArr, ageLen, ) jobSpecSQL := fmt.Sprintf(` @@ -513,10 +627,10 @@ SELECT '%s' || lpad(i::text, 10, '0'), 'broadside-cluster-' || (i%%40+1), 'broadside-cluster-' || (i%%40+1) || '-node-' || (i%%80+1), - NOW() - INTERVAL '24 hours' + INTERVAL '1 second', - NOW() - INTERVAL '24 hours' + INTERVAL '2 seconds', - NOW() - INTERVAL '24 hours' + INTERVAL '3 seconds', - NOW() - INTERVAL '24 hours' + INTERVAL '10 seconds', + t.base_time + INTERVAL '1 second', + t.base_time + INTERVAL '2 seconds', + t.base_time + INTERVAL '3 seconds', + t.base_time + INTERVAL '10 seconds', (%s)[i%%%d+1], CASE WHEN i%%1000 < %d THEN 3 WHEN i%%1000 < %d THEN 4 @@ -527,7 +641,8 @@ SELECT END, CASE WHEN i%%1000 >= %d AND i%%1000 < %d THEN $2::bytea END, CASE WHEN i%%1000 < %d THEN 0 END -FROM generate_series(%d, %d) AS i`, +FROM generate_series(%d, %d) AS i, +LATERAL (SELECT NOW() - (%s)[i%%%d+1] * INTERVAL '1 day' AS base_time) AS t`, prefix, prefix, poolArr, len(jobspec.PoolOptions), succeeded, errored, cancelled, @@ -536,6 +651,7 @@ FROM generate_series(%d, %d) AS i`, succeeded, errored, errored, startIdx, lastIdx, + ageArr, ageLen, ) jobErrorSQL := fmt.Sprintf(` @@ -574,6 +690,14 @@ WHERE i%%1000 >= %d AND i%%1000 < %d`, return nil } +func intSliceToSQL(vals []int) string { + parts := make([]string, len(vals)) + for i, v := range vals { + parts[i] = fmt.Sprintf("%d", v) + } + return "ARRAY[" + strings.Join(parts, ",") + "]" +} + func int64SliceToSQL(vals []int64) string { parts := make([]string, len(vals)) for i, v := range vals { diff --git a/internal/broadside/db/postgres_historical_test.go b/internal/broadside/db/postgres_historical_test.go index 2c4d1a9c251..e1e77563f36 100644 --- a/internal/broadside/db/postgres_historical_test.go +++ b/internal/broadside/db/postgres_historical_test.go @@ -30,7 +30,7 @@ func postgresConfig(t *testing.T) map[string]string { func TestPostgresDatabase_PopulateHistoricalJobs_JobCount(t *testing.T) { cfg := postgresConfig(t) - pg := db.NewPostgresDatabase(cfg, configuration.FeatureToggles{}, nil, nil) + pg := db.NewPostgresDatabase(cfg, configuration.FeatureToggles{}, nil, nil, nil) ctx := context.Background() require.NoError(t, pg.InitialiseSchema(ctx)) defer func() { _ = pg.TearDown(ctx) }() @@ -62,7 +62,7 @@ func TestPostgresDatabase_PopulateHistoricalJobs_JobCount(t *testing.T) { func TestPostgresDatabase_PopulateHistoricalJobs_StateDistribution(t *testing.T) { cfg := postgresConfig(t) - pg := db.NewPostgresDatabase(cfg, configuration.FeatureToggles{}, nil, nil) + pg := db.NewPostgresDatabase(cfg, configuration.FeatureToggles{}, nil, nil, nil) ctx := context.Background() require.NoError(t, pg.InitialiseSchema(ctx)) defer func() { _ = pg.TearDown(ctx) }() @@ -103,7 +103,7 @@ func TestPostgresDatabase_PopulateHistoricalJobs_StateDistribution(t *testing.T) func TestPostgresDatabase_PopulateHistoricalJobs_Chunked(t *testing.T) { cfg := postgresConfig(t) - pg := db.NewPostgresDatabase(cfg, configuration.FeatureToggles{}, nil, nil) + pg := db.NewPostgresDatabase(cfg, configuration.FeatureToggles{}, nil, nil, nil) ctx := context.Background() require.NoError(t, pg.InitialiseSchema(ctx)) defer func() { _ = pg.TearDown(ctx) }() @@ -143,7 +143,7 @@ func TestPostgresDatabase_PopulateHistoricalJobs_Chunked(t *testing.T) { func TestPostgresDatabase_PopulateHistoricalJobs_Resume(t *testing.T) { cfg := postgresConfig(t) - pg := db.NewPostgresDatabase(cfg, configuration.FeatureToggles{}, nil, nil) + pg := db.NewPostgresDatabase(cfg, configuration.FeatureToggles{}, nil, nil, nil) ctx := context.Background() require.NoError(t, pg.InitialiseSchema(ctx)) defer func() { _ = pg.TearDown(ctx) }() @@ -188,7 +188,7 @@ func TestPostgresDatabase_InitialiseSchema_ExecutesTuningSQLWithoutError(t *test tuningSQLStatements := []string{ "ALTER TABLE job SET (autovacuum_vacuum_scale_factor = 0.01)", } - pg := db.NewPostgresDatabase(cfg, configuration.FeatureToggles{}, tuningSQLStatements, nil) + pg := db.NewPostgresDatabase(cfg, configuration.FeatureToggles{}, tuningSQLStatements, nil, nil) ctx := context.Background() require.NoError(t, pg.InitialiseSchema(ctx)) defer func() { _ = pg.TearDown(ctx) }() @@ -203,7 +203,7 @@ func TestPostgresDatabase_TearDown_ExecutesTuningRevertSQLWithoutError(t *testin revertSQLStatements := []string{ "ALTER TABLE job RESET (autovacuum_vacuum_scale_factor)", } - pg := db.NewPostgresDatabase(cfg, configuration.FeatureToggles{}, nil, revertSQLStatements) + pg := db.NewPostgresDatabase(cfg, configuration.FeatureToggles{}, nil, revertSQLStatements, nil) ctx := context.Background() require.NoError(t, pg.InitialiseSchema(ctx)) defer pg.Close() diff --git a/internal/broadside/db/postgres_partitioned.go b/internal/broadside/db/postgres_partitioned.go new file mode 100644 index 00000000000..ab93930efc7 --- /dev/null +++ b/internal/broadside/db/postgres_partitioned.go @@ -0,0 +1,479 @@ +package db + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/jackc/pgx/v5" + + "github.com/armadaproject/armada/internal/broadside/jobspec" + "github.com/armadaproject/armada/internal/common/armadacontext" + "github.com/armadaproject/armada/internal/common/database/lookout" + lookoutmodel "github.com/armadaproject/armada/internal/lookoutingester/model" +) + +// executePartitionedBatch executes a batch of ingestion queries using direct +// SQL that includes the submitted column. This bypasses LookoutDb for all +// operations except job_run updates (which don't need submitted in WHERE). +// +// The submitted column is required because: +// - The job table PK is (job_id, submitted), so UPDATEs need it for partition pruning +// - Child tables (job_run, job_spec, job_error) have a NOT NULL submitted column +func (p *PostgresDatabase) executePartitionedBatch(ctx context.Context, queries []IngestionQuery) error { + submittedMap := buildJobSubmittedMap(queries) + set, err := queriesToInstructionSet(queries) + if err != nil { + return err + } + + // Phase 1: Create jobs and job specs (must exist before job_run references). + var wg sync.WaitGroup + var jobErr, specErr error + wg.Go(func() { jobErr = p.createJobsPartitioned(ctx, set.JobsToCreate) }) + wg.Go(func() { specErr = p.createJobSpecsPartitioned(ctx, set.JobsToCreate, submittedMap) }) + wg.Wait() + if jobErr != nil { + return fmt.Errorf("creating jobs (partitioned): %w", jobErr) + } + if specErr != nil { + return fmt.Errorf("creating job specs (partitioned): %w", specErr) + } + + // Phase 2: Update jobs, create job_runs, create job_errors (parallel). + var updateErr, runErr, errErr error + wg.Go(func() { updateErr = p.updateJobsPartitioned(ctx, set.JobsToUpdate, submittedMap) }) + wg.Go(func() { runErr = p.createJobRunsPartitioned(ctx, set.JobRunsToCreate, submittedMap) }) + wg.Go(func() { errErr = p.createJobErrorsPartitioned(ctx, set.JobErrorsToCreate, submittedMap) }) + wg.Wait() + if updateErr != nil { + return fmt.Errorf("updating jobs (partitioned): %w", updateErr) + } + if runErr != nil { + return fmt.Errorf("creating job runs (partitioned): %w", runErr) + } + if errErr != nil { + return fmt.Errorf("creating job errors (partitioned): %w", errErr) + } + + // Phase 3: Update job_runs. The job_run table is not partitioned and the + // UPDATE WHERE clause uses run_id only, so LookoutDb works here. + armadaCtx := armadacontext.FromGrpcCtx(ctx) + p.lookoutDb.UpdateJobRuns(armadaCtx, set.JobRunsToUpdate) + + return nil +} + +// buildJobSubmittedMap extracts the submitted time for each job ID from the +// query batch. Each query type carries a Submitted field; the first seen +// value for each job ID wins (they should all be consistent). +func buildJobSubmittedMap(queries []IngestionQuery) map[string]time.Time { + m := make(map[string]time.Time) + for _, q := range queries { + switch v := q.(type) { + case InsertJob: + m[v.Job.JobID] = v.Job.Submitted + case SetJobLeased: + setIfAbsent(m, v.JobID, v.Submitted) + case InsertJobRun: + setIfAbsent(m, v.JobID, v.Submitted) + case SetJobPending: + setIfAbsent(m, v.JobID, v.Submitted) + case SetJobRunning: + setIfAbsent(m, v.JobID, v.Submitted) + case SetJobSucceeded: + setIfAbsent(m, v.JobID, v.Submitted) + case SetJobErrored: + setIfAbsent(m, v.JobID, v.Submitted) + case SetJobCancelled: + setIfAbsent(m, v.JobID, v.Submitted) + case SetJobPreempted: + setIfAbsent(m, v.JobID, v.Submitted) + case SetJobRejected: + setIfAbsent(m, v.JobID, v.Submitted) + case UpdateJobPriority: + setIfAbsent(m, v.JobID, v.Submitted) + case InsertJobError: + setIfAbsent(m, v.JobID, v.Submitted) + } + } + return m +} + +func setIfAbsent(m map[string]time.Time, key string, val time.Time) { + if _, ok := m[key]; !ok { + m[key] = val + } +} + +// createJobsPartitioned inserts jobs using CopyFrom directly into the named +// leaf partition (bypassing parent-table routing which can fail with COPY). +// Instructions are grouped by partition based on their submitted date. +func (p *PostgresDatabase) createJobsPartitioned(ctx context.Context, instructions []*lookoutmodel.CreateJobInstruction) error { + if len(instructions) == 0 { + return nil + } + + groups := make(map[string][]*lookoutmodel.CreateJobInstruction) + for _, instr := range instructions { + part := partitionNameForTime(instr.Submitted) + groups[part] = append(groups[part], instr) + } + + columns := []string{ + "job_id", "queue", "owner", "namespace", "jobset", + "cpu", "memory", "ephemeral_storage", "gpu", "priority", + "submitted", "state", "last_transition_time", "last_transition_time_seconds", + "priority_class", "annotations", "job_spec", + } + + for partName, instrs := range groups { + _, err := p.pool.CopyFrom(ctx, + pgx.Identifier{partName}, + columns, + pgx.CopyFromSlice(len(instrs), func(i int) ([]interface{}, error) { + instr := instrs[i] + return []interface{}{ + instr.JobId, instr.Queue, instr.Owner, instr.Namespace, instr.JobSet, + instr.Cpu, instr.Memory, instr.EphemeralStorage, instr.Gpu, instr.Priority, + instr.Submitted, instr.State, instr.LastTransitionTime, instr.LastTransitionTimeSeconds, + instr.PriorityClass, instr.Annotations, instr.JobProto, + }, nil + }), + ) + if err != nil { + return fmt.Errorf("copying into partition %s: %w", partName, err) + } + } + return nil +} + +// partitionNameForTime returns the daily partition name for the given timestamp, +// using the same date-truncation logic as createDailyPartitions. +func partitionNameForTime(t time.Time) string { + date := t.Truncate(24 * time.Hour) + return fmt.Sprintf("job_p%s", date.Format("20060102")) +} + +// createJobSpecsPartitioned inserts job_spec rows with submitted. +func (p *PostgresDatabase) createJobSpecsPartitioned(ctx context.Context, instructions []*lookoutmodel.CreateJobInstruction, submittedMap map[string]time.Time) error { + if len(instructions) == 0 { + return nil + } + _, err := p.pool.CopyFrom(ctx, + pgx.Identifier{"job_spec"}, + []string{"job_id", "job_spec", "submitted"}, + pgx.CopyFromSlice(len(instructions), func(i int) ([]interface{}, error) { + instr := instructions[i] + return []interface{}{ + instr.JobId, instr.JobProto, submittedMap[instr.JobId], + }, nil + }), + ) + return err +} + +// createJobRunsPartitioned inserts job_run rows with submitted. +func (p *PostgresDatabase) createJobRunsPartitioned(ctx context.Context, instructions []*lookoutmodel.CreateJobRunInstruction, submittedMap map[string]time.Time) error { + if len(instructions) == 0 { + return nil + } + _, err := p.pool.CopyFrom(ctx, + pgx.Identifier{"job_run"}, + []string{"run_id", "job_id", "cluster", "node", "leased", "pool", "job_run_state", "submitted"}, + pgx.CopyFromSlice(len(instructions), func(i int) ([]interface{}, error) { + instr := instructions[i] + return []interface{}{ + instr.RunId, instr.JobId, instr.Cluster, instr.Node, + instr.Leased, instr.Pool, instr.JobRunState, + submittedMap[instr.JobId], + }, nil + }), + ) + return err +} + +// createJobErrorsPartitioned inserts job_error rows with submitted. +func (p *PostgresDatabase) createJobErrorsPartitioned(ctx context.Context, instructions []*lookoutmodel.CreateJobErrorInstruction, submittedMap map[string]time.Time) error { + if len(instructions) == 0 { + return nil + } + _, err := p.pool.CopyFrom(ctx, + pgx.Identifier{"job_error"}, + []string{"job_id", "error", "submitted"}, + pgx.CopyFromSlice(len(instructions), func(i int) ([]interface{}, error) { + instr := instructions[i] + return []interface{}{ + instr.JobId, instr.Error, submittedMap[instr.JobId], + }, nil + }), + ) + return err +} + +// updateJobsPartitioned updates job rows using a temp table with submitted +// in the WHERE clause for partition pruning. +func (p *PostgresDatabase) updateJobsPartitioned(ctx context.Context, instructions []*lookoutmodel.UpdateJobInstruction, submittedMap map[string]time.Time) error { + if len(instructions) == 0 { + return nil + } + + tx, err := p.pool.Begin(ctx) + if err != nil { + return fmt.Errorf("beginning transaction: %w", err) + } + defer func() { _ = tx.Rollback(ctx) }() + + tmpTable := "job_update_partitioned_tmp" + if _, err := tx.Exec(ctx, fmt.Sprintf(` + CREATE TEMPORARY TABLE %s ( + job_id varchar(32), + submitted timestamp, + priority bigint, + state smallint, + cancelled timestamp, + last_transition_time timestamp, + last_transition_time_seconds bigint, + duplicate bool, + latest_run_id varchar(36), + cancel_reason varchar(512), + cancel_user varchar(512) + ) ON COMMIT DROP`, tmpTable)); err != nil { + return fmt.Errorf("creating temp table: %w", err) + } + + if _, err := tx.CopyFrom(ctx, + pgx.Identifier{tmpTable}, + []string{ + "job_id", "submitted", "priority", "state", "cancelled", + "last_transition_time", "last_transition_time_seconds", + "duplicate", "latest_run_id", "cancel_reason", "cancel_user", + }, + pgx.CopyFromSlice(len(instructions), func(i int) ([]interface{}, error) { + instr := instructions[i] + return []interface{}{ + instr.JobId, submittedMap[instr.JobId], + instr.Priority, instr.State, instr.Cancelled, + instr.LastTransitionTime, instr.LastTransitionTimeSeconds, + instr.Duplicate, instr.LatestRunId, instr.CancelReason, instr.CancelUser, + }, nil + }), + ); err != nil { + return fmt.Errorf("copying to temp table: %w", err) + } + + if _, err := tx.Exec(ctx, fmt.Sprintf(` + UPDATE job + SET + priority = coalesce(tmp.priority, job.priority), + state = coalesce(tmp.state, job.state), + cancelled = coalesce(tmp.cancelled, job.cancelled), + last_transition_time = coalesce(tmp.last_transition_time, job.last_transition_time), + last_transition_time_seconds = coalesce(tmp.last_transition_time_seconds, job.last_transition_time_seconds), + duplicate = coalesce(tmp.duplicate, job.duplicate), + latest_run_id = coalesce(tmp.latest_run_id, job.latest_run_id), + cancel_reason = coalesce(tmp.cancel_reason, job.cancel_reason), + cancel_user = coalesce(tmp.cancel_user, job.cancel_user) + FROM %s AS tmp + WHERE tmp.job_id = job.job_id AND tmp.submitted = job.submitted`, tmpTable)); err != nil { + return fmt.Errorf("updating jobs from temp table: %w", err) + } + + return tx.Commit(ctx) +} + +// insertHistoricalJobChunkPartitioned inserts a chunk of historical jobs +// split per age bucket. Each bucket targets a single partition (fixed +// submitted value), avoiding cross-partition writes. +func (p *PostgresDatabase) insertHistoricalJobChunkPartitioned(ctx context.Context, params HistoricalJobsParams, startIdx, lastIdx int) error { + nBuckets := len(params.JobAgeDays) + for bucketIdx, ageDays := range params.JobAgeDays { + if err := p.insertHistoricalJobChunkForBucket(ctx, params, startIdx, lastIdx, bucketIdx, nBuckets, ageDays); err != nil { + return fmt.Errorf("inserting bucket %d (age %d days): %w", bucketIdx, ageDays, err) + } + } + return nil +} + +// insertHistoricalJobChunkForBucket inserts the subset of jobs in +// [startIdx, lastIdx] that belong to the given age bucket, using a fixed +// submitted value so all rows land in a single partition. +func (p *PostgresDatabase) insertHistoricalJobChunkForBucket( + ctx context.Context, params HistoricalJobsParams, + startIdx, lastIdx, bucketIdx, nBuckets, ageDays int, +) error { + prefix := fmt.Sprintf("%04d%04d", params.QueueIdx, params.JobSetIdx) + succeeded := params.SucceededThreshold + errored := params.ErroredThreshold + cancelled := params.CancelledThreshold + + cpuArr := int64SliceToSQL(jobspec.CpuOptions) + memArr := int64SliceToSQL(jobspec.MemoryOptions) + ephArr := int64SliceToSQL(jobspec.EphemeralStorageOptions) + gpuArr := int64SliceToSQL(jobspec.GpuOptions) + poolArr := stringSliceToSQL(jobspec.PoolOptions) + nsArr := stringSliceToSQL(jobspec.NamespaceOptions) + pcArr := stringSliceToSQL(jobspec.PriorityClassOptions) + + // Filter: only rows where i %% nBuckets = bucketIdx + bucketFilter := fmt.Sprintf("WHERE i%%%d = %d", nBuckets, bucketIdx) + + // Compute the partition date using the same logic as createDailyPartitions + // so we can insert directly into the named partition, bypassing the + // parent table's partition routing entirely. + baseTime := time.Now().Truncate(24 * time.Hour).AddDate(0, 0, -ageDays) + baseTimeExpr := fmt.Sprintf("'%s'::timestamp", baseTime.Format("2006-01-02 15:04:05")) + partitionName := fmt.Sprintf("job_p%s", baseTime.Format("20060102")) + + jobSQL := fmt.Sprintf(` +INSERT INTO %s ( + job_id, queue, owner, namespace, jobset, + cpu, memory, ephemeral_storage, gpu, priority, + submitted, state, last_transition_time, last_transition_time_seconds, + priority_class, annotations, latest_run_id, + cancelled, cancel_reason, cancel_user +) +SELECT + '%s' || lpad(i::text, 10, '0'), + '%s', + '%s', + (%s)[i%%%d+1], + '%s', + (%s)[i%%%d+1], + (%s)[i%%%d+1], + (%s)[i%%%d+1], + (%s)[i%%%d+1], + (i%%2000)+1, + %s, + CASE WHEN i%%1000 < %d THEN %d + WHEN i%%1000 < %d THEN %d + WHEN i%%1000 < %d THEN %d + ELSE %d END, + %s + INTERVAL '10 seconds', + EXTRACT(EPOCH FROM %s + INTERVAL '10 seconds')::bigint, + (%s)[i%%%d+1], + %s, + '%s' || lpad(i::text, 10, '0') || '00', + CASE WHEN i%%1000 >= %d AND i%%1000 < %d + THEN %s + INTERVAL '10 seconds' END, + CASE WHEN i%%1000 >= %d AND i%%1000 < %d + THEN 'user requested' END, + CASE WHEN i%%1000 >= %d AND i%%1000 < %d + THEN '%s' END +FROM generate_series(%d, %d) AS i +%s +ON CONFLICT DO NOTHING`, + partitionName, + prefix, + params.QueueName, params.QueueName, + nsArr, len(jobspec.NamespaceOptions), + params.JobSetName, + cpuArr, len(jobspec.CpuOptions), + memArr, len(jobspec.MemoryOptions), + ephArr, len(jobspec.EphemeralStorageOptions), + gpuArr, len(jobspec.GpuOptions), + baseTimeExpr, + succeeded, lookout.JobSucceededOrdinal, + errored, lookout.JobFailedOrdinal, + cancelled, lookout.JobCancelledOrdinal, + lookout.JobPreemptedOrdinal, + baseTimeExpr, + baseTimeExpr, + pcArr, len(jobspec.PriorityClassOptions), + buildAnnotationSQL(), + prefix, + errored, cancelled, baseTimeExpr, + errored, cancelled, + errored, cancelled, params.QueueName, + startIdx, lastIdx, + bucketFilter, + ) + + jobSpecSQL := fmt.Sprintf(` +INSERT INTO job_spec (job_id, job_spec, submitted) +SELECT '%s' || lpad(i::text, 10, '0'), $1::bytea, %s +FROM generate_series(%d, %d) AS i +%s +ON CONFLICT DO NOTHING`, + prefix, baseTimeExpr, startIdx, lastIdx, bucketFilter, + ) + + jobRunSQL := fmt.Sprintf(` +INSERT INTO job_run ( + run_id, job_id, cluster, node, leased, pending, started, finished, + pool, job_run_state, error, debug, exit_code, submitted +) +SELECT + '%s' || lpad(i::text, 10, '0') || '00', + '%s' || lpad(i::text, 10, '0'), + 'broadside-cluster-' || (i%%40+1), + 'broadside-cluster-' || (i%%40+1) || '-node-' || (i%%80+1), + %s + INTERVAL '1 second', + %s + INTERVAL '2 seconds', + %s + INTERVAL '3 seconds', + %s + INTERVAL '10 seconds', + (%s)[i%%%d+1], + CASE WHEN i%%1000 < %d THEN 3 + WHEN i%%1000 < %d THEN 4 + WHEN i%%1000 < %d THEN 6 + ELSE 5 END, + CASE WHEN i%%1000 >= %d AND i%%1000 < %d THEN $1::bytea + WHEN i%%1000 >= %d THEN $3::bytea + END, + CASE WHEN i%%1000 >= %d AND i%%1000 < %d THEN $2::bytea END, + CASE WHEN i%%1000 < %d THEN 0 END, + %s +FROM generate_series(%d, %d) AS i +%s +ON CONFLICT DO NOTHING`, + prefix, prefix, + baseTimeExpr, baseTimeExpr, baseTimeExpr, baseTimeExpr, + poolArr, len(jobspec.PoolOptions), + succeeded, errored, cancelled, + succeeded, errored, + cancelled, + succeeded, errored, + errored, + baseTimeExpr, + startIdx, lastIdx, + bucketFilter, + ) + + jobErrorSQL := fmt.Sprintf(` +INSERT INTO job_error (job_id, error, submitted) +SELECT '%s' || lpad(i::text, 10, '0'), $1::bytea, %s +FROM generate_series(%d, %d) AS i +WHERE i%%1000 >= %d AND i%%1000 < %d AND i%%%d = %d +ON CONFLICT DO NOTHING`, + prefix, baseTimeExpr, startIdx, lastIdx, + succeeded, errored, nBuckets, bucketIdx, + ) + + tx, err := p.pool.Begin(ctx) + if err != nil { + return fmt.Errorf("beginning transaction: %w", err) + } + defer func() { _ = tx.Rollback(ctx) }() + + type stmtArgs struct { + sql string + args []any + } + stmts := []stmtArgs{ + {jobSQL, nil}, + {jobSpecSQL, []any{params.JobSpecBytes}}, + {jobRunSQL, []any{params.ErrorBytes, params.DebugBytes, params.PreemptionBytes}}, + {jobErrorSQL, []any{params.ErrorBytes}}, + } + for _, s := range stmts { + if _, err := tx.Exec(ctx, s.sql, s.args...); err != nil { + return fmt.Errorf("executing historical jobs SQL (bucket %d): %w", bucketIdx, err) + } + } + + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("committing transaction: %w", err) + } + return nil +} diff --git a/internal/broadside/db/sql/partition_down.sql b/internal/broadside/db/sql/partition_down.sql new file mode 100644 index 00000000000..36f2315a4e3 --- /dev/null +++ b/internal/broadside/db/sql/partition_down.sql @@ -0,0 +1,73 @@ +-- partition_down.sql reverts the partition-by-submitted migration. +-- +-- It replaces the partitioned job table with an unpartitioned one and +-- removes the submitted column from child tables. No data is preserved: +-- the caller truncates all tables before running this. +-- +-- This file is embedded by internal/broadside/db/partition.go and executed +-- by TearDown when the PartitionBySubmitted feature toggle is enabled. + +BEGIN; + +-- Drop the partitioned job table (cascades to all partitions) +DROP TABLE IF EXISTS job CASCADE; + +-- Recreate unpartitioned job table matching the Lookout schema +CREATE TABLE job ( + job_id varchar(32) NOT NULL PRIMARY KEY, + queue varchar(512) NOT NULL, + owner varchar(512) NOT NULL, + jobset varchar(1024) NOT NULL, + cpu bigint NOT NULL, + memory bigint NOT NULL, + ephemeral_storage bigint NOT NULL, + gpu bigint NOT NULL, + priority bigint NOT NULL, + submitted timestamp NOT NULL, + cancelled timestamp NULL, + state smallint NOT NULL, + last_transition_time timestamp NOT NULL, + last_transition_time_seconds bigint NOT NULL, + job_spec bytea NULL, + duplicate bool NOT NULL DEFAULT false, + priority_class varchar(63) NULL, + latest_run_id varchar(36) NULL, + cancel_reason varchar(512) NULL, + namespace varchar(512) NULL, + annotations jsonb NOT NULL DEFAULT '{}'::jsonb, + external_job_uri varchar(1024) NULL, + cancel_user varchar(512) NULL +); + +ALTER TABLE job ALTER COLUMN job_spec SET STORAGE EXTERNAL; + +CREATE INDEX idx_job_queue ON job (queue); + +CREATE INDEX idx_job_queue_last_transition_time_seconds ON job ( + queue, + last_transition_time_seconds +); + +CREATE INDEX idx_job_jobset_last_transition_time_seconds ON job ( + jobset, + last_transition_time_seconds +); + +CREATE INDEX idx_job_queue_jobset_last_transition_time_seconds ON job ( + queue, + jobset, + last_transition_time_seconds +); + +CREATE INDEX idx_job_queue_jobset_state ON job (queue, jobset, state); + +CREATE INDEX idx_job_state ON job (state); + +-- Remove submitted from child tables +ALTER TABLE job_run DROP COLUMN IF EXISTS submitted; + +ALTER TABLE job_spec DROP COLUMN IF EXISTS submitted; + +ALTER TABLE job_error DROP COLUMN IF EXISTS submitted; + +COMMIT; diff --git a/internal/broadside/db/sql/partition_up.sql b/internal/broadside/db/sql/partition_up.sql new file mode 100644 index 00000000000..0707b4648e5 --- /dev/null +++ b/internal/broadside/db/sql/partition_up.sql @@ -0,0 +1,115 @@ +-- partition_up.sql converts the job table into a range-partitioned table +-- on the submitted column with daily partitions. +-- +-- This file is embedded by internal/broadside/db/partition.go and executed +-- when the PartitionBySubmitted feature toggle is enabled. +-- +-- This runs against an empty database (immediately after Lookout migrations). +-- Daily partition CREATE statements and the DEFAULT partition are generated +-- dynamically in Go after this static migration completes. + +BEGIN; + +-- Step 1: Add submitted column to child tables (nullable initially, set NOT +-- NULL immediately since tables are empty at this point) +ALTER TABLE job_run +ADD COLUMN IF NOT EXISTS submitted timestamp NOT NULL DEFAULT '1970-01-01'; + +ALTER TABLE job_spec +ADD COLUMN IF NOT EXISTS submitted timestamp NOT NULL DEFAULT '1970-01-01'; + +ALTER TABLE job_error +ADD COLUMN IF NOT EXISTS submitted timestamp NOT NULL DEFAULT '1970-01-01'; + +-- Remove the defaults (they were only needed for the ALTER) +ALTER TABLE job_run ALTER COLUMN submitted DROP DEFAULT; + +ALTER TABLE job_spec ALTER COLUMN submitted DROP DEFAULT; + +ALTER TABLE job_error ALTER COLUMN submitted DROP DEFAULT; + +-- Step 2: Convert job to partitioned table +ALTER TABLE job RENAME TO job_unpartitioned; + +CREATE TABLE job ( + job_id varchar(32) NOT NULL, + queue varchar(512) NOT NULL, + owner varchar(512) NOT NULL, + jobset varchar(1024) NOT NULL, + cpu bigint NOT NULL, + memory bigint NOT NULL, + ephemeral_storage bigint NOT NULL, + gpu bigint NOT NULL, + priority bigint NOT NULL, + submitted timestamp NOT NULL, + cancelled timestamp NULL, + state smallint NOT NULL, + last_transition_time timestamp NOT NULL, + last_transition_time_seconds bigint NOT NULL, + job_spec bytea NULL, + duplicate bool NOT NULL DEFAULT false, + priority_class varchar(63) NULL, + latest_run_id varchar(36) NULL, + cancel_reason varchar(512) NULL, + namespace varchar(512) NULL, + annotations jsonb NOT NULL DEFAULT '{}'::jsonb, + external_job_uri varchar(1024) NULL, + cancel_user varchar(512) NULL, + PRIMARY KEY (job_id, submitted) +) PARTITION BY RANGE (submitted); + +-- Step 3: Drop old indexes so the names are free for the new table. +-- The old table (job_unpartitioned) is kept until Go code has created +-- partitions and moved the data across. +DROP INDEX IF EXISTS idx_job_queue; + +DROP INDEX IF EXISTS idx_job_queue_pattern_last_transition_time_seconds; + +DROP INDEX IF EXISTS idx_job_queue_last_transition_time_seconds; + +DROP INDEX IF EXISTS idx_job_jobset_last_transition_time_seconds; + +DROP INDEX IF EXISTS idx_job_queue_jobset_last_transition_time_seconds; + +DROP INDEX IF EXISTS idx_job_queue_jobset_state; + +DROP INDEX IF EXISTS idx_job_jobset_pattern; + +DROP INDEX IF EXISTS idx_job_state; + +DROP INDEX IF EXISTS idx_job_submitted; + +DROP INDEX IF EXISTS idx_job_ltt_jobid; + +DROP INDEX IF EXISTS idx_job_active_queue_jobset; + +DROP INDEX IF EXISTS idx_job_queue_namespace; + +DROP INDEX IF EXISTS idx_job_latest_run_id; + +-- Step 4: Create indexes on the partitioned parent (inherited by partitions) +CREATE INDEX idx_job_queue ON job (queue); + +CREATE INDEX idx_job_queue_last_transition_time_seconds ON job ( + queue, + last_transition_time_seconds +); + +CREATE INDEX idx_job_jobset_last_transition_time_seconds ON job ( + jobset, + last_transition_time_seconds +); + +CREATE INDEX idx_job_queue_jobset_last_transition_time_seconds ON job ( + queue, + jobset, + last_transition_time_seconds +); + +CREATE INDEX idx_job_queue_jobset_state ON job (queue, jobset, state); + +CREATE INDEX idx_job_state ON job (state); + +CREATE INDEX idx_job_submitted ON job (submitted); + +COMMIT; diff --git a/internal/broadside/ingester/ingester.go b/internal/broadside/ingester/ingester.go index 643a07087fe..947c21dbe8c 100644 --- a/internal/broadside/ingester/ingester.go +++ b/internal/broadside/ingester/ingester.go @@ -368,7 +368,13 @@ func (i *Ingester) runBatchExecutor( if !ok { stopAndDrainTimer() if len(batch) > 0 { - i.executeBatch(ctx, batch) + // Use a fresh context for the final flush. The parent ctx is + // cancelled at this point, but the batch still needs to commit + // cleanly — otherwise the Postgres backend will be mid-rollback + // when TearDown tries to TRUNCATE, blocking on lock acquisition. + flushCtx, flushCancel := context.WithTimeout(context.Background(), 10*time.Second) + i.executeBatch(flushCtx, batch) + flushCancel() } return } @@ -454,6 +460,7 @@ func (i *Ingester) submitJob( newJob.JobID, "", jobspec.StateLeased, + now, )) } @@ -465,6 +472,7 @@ func (i *Ingester) processTransition( ) { now := time.Now() cfg := i.config.JobStateTransitionConfig + submitted := trans.Submitted() jobNumber := jobspec.ExtractJobNumber(trans.JobID()) cluster, node := jobspec.GetClusterNodeForJobNumber(jobNumber) @@ -474,20 +482,22 @@ func (i *Ingester) processTransition( runID := jobspec.EncodeRunID(trans.JobID(), 0) i.routerSend(router, timestampedQuery{ query: db.SetJobLeased{ - JobID: trans.JobID(), - Time: now, - RunID: runID, + JobID: trans.JobID(), + Time: now, + RunID: runID, + Submitted: submitted, }, enqueuedAt: now, }, ctx) i.routerSend(router, timestampedQuery{ query: db.InsertJobRun{ - JobRunID: runID, - JobID: trans.JobID(), - Cluster: cluster, - Node: node, - Pool: jobspec.GetPool(jobNumber), - Time: now, + JobRunID: runID, + JobID: trans.JobID(), + Cluster: cluster, + Node: node, + Pool: jobspec.GetPool(jobNumber), + Time: now, + Submitted: submitted, }, enqueuedAt: now, }, ctx) @@ -496,21 +506,24 @@ func (i *Ingester) processTransition( trans.JobID(), runID, jobspec.StatePending, + submitted, )) case jobspec.StatePending: i.routerSend(router, timestampedQuery{ query: db.SetJobPending{ - JobID: trans.JobID(), - Time: now, - RunID: trans.RunID(), + JobID: trans.JobID(), + Time: now, + RunID: trans.RunID(), + Submitted: submitted, }, enqueuedAt: now, }, ctx) i.routerSend(router, timestampedQuery{ query: db.SetJobRunPending{ - JobRunID: trans.RunID(), - Time: now, + JobRunID: trans.RunID(), + Time: now, + Submitted: submitted, }, enqueuedAt: now, }, ctx) @@ -519,6 +532,7 @@ func (i *Ingester) processTransition( trans.JobID(), trans.RunID(), jobspec.StateRunning, + submitted, )) case jobspec.StateRunning: @@ -527,14 +541,16 @@ func (i *Ingester) processTransition( JobID: trans.JobID(), Time: now, LatestRunID: trans.RunID(), + Submitted: submitted, }, enqueuedAt: now, }, ctx) i.routerSend(router, timestampedQuery{ query: db.SetJobRunStarted{ - JobRunID: trans.RunID(), - Time: now, - Node: node, + JobRunID: trans.RunID(), + Time: now, + Node: node, + Submitted: submitted, }, enqueuedAt: now, }, ctx) @@ -545,6 +561,7 @@ func (i *Ingester) processTransition( trans.JobID(), trans.RunID(), jobspec.StateSucceeded, + submitted, )) } else { heap.Push(transitions, jobspec.NewScheduledTransition( @@ -552,21 +569,24 @@ func (i *Ingester) processTransition( trans.JobID(), trans.RunID(), jobspec.StateErrored, + submitted, )) } case jobspec.StateSucceeded: i.routerSend(router, timestampedQuery{ query: db.SetJobSucceeded{ - JobID: trans.JobID(), - Time: now, + JobID: trans.JobID(), + Time: now, + Submitted: submitted, }, enqueuedAt: now, }, ctx) i.routerSend(router, timestampedQuery{ query: db.SetJobRunSucceeded{ - JobRunID: trans.RunID(), - Time: now, + JobRunID: trans.RunID(), + Time: now, + Submitted: submitted, }, enqueuedAt: now, }, ctx) @@ -574,24 +594,27 @@ func (i *Ingester) processTransition( case jobspec.StateErrored: i.routerSend(router, timestampedQuery{ query: db.SetJobErrored{ - JobID: trans.JobID(), - Time: now, + JobID: trans.JobID(), + Time: now, + Submitted: submitted, }, enqueuedAt: now, }, ctx) i.routerSend(router, timestampedQuery{ query: db.SetJobRunFailed{ - JobRunID: trans.RunID(), - Time: now, - Error: simulatedError, - Debug: simulatedDebugMsg, + JobRunID: trans.RunID(), + Time: now, + Error: simulatedError, + Debug: simulatedDebugMsg, + Submitted: submitted, }, enqueuedAt: now, }, ctx) i.routerSend(router, timestampedQuery{ query: db.InsertJobError{ - JobID: trans.JobID(), - Error: simulatedError, + JobID: trans.JobID(), + Error: simulatedError, + Submitted: submitted, }, enqueuedAt: now, }, ctx) diff --git a/internal/broadside/jobspec/state.go b/internal/broadside/jobspec/state.go index 372182de2bc..40590b317bd 100644 --- a/internal/broadside/jobspec/state.go +++ b/internal/broadside/jobspec/state.go @@ -26,25 +26,28 @@ const ( ) type ScheduledTransition struct { - time time.Time - jobID string - runID string - toState JobState + time time.Time + jobID string + runID string + toState JobState + submitted time.Time } -func NewScheduledTransition(t time.Time, jobID, runID string, toState JobState) ScheduledTransition { +func NewScheduledTransition(t time.Time, jobID, runID string, toState JobState, submitted time.Time) ScheduledTransition { return ScheduledTransition{ - time: t, - jobID: jobID, - runID: runID, - toState: toState, + time: t, + jobID: jobID, + runID: runID, + toState: toState, + submitted: submitted, } } -func (s ScheduledTransition) Time() time.Time { return s.time } -func (s ScheduledTransition) JobID() string { return s.jobID } -func (s ScheduledTransition) RunID() string { return s.runID } -func (s ScheduledTransition) ToState() JobState { return s.toState } +func (s ScheduledTransition) Time() time.Time { return s.time } +func (s ScheduledTransition) JobID() string { return s.jobID } +func (s ScheduledTransition) RunID() string { return s.runID } +func (s ScheduledTransition) ToState() JobState { return s.toState } +func (s ScheduledTransition) Submitted() time.Time { return s.submitted } type TransitionHeap []ScheduledTransition diff --git a/internal/broadside/orchestrator/runner.go b/internal/broadside/orchestrator/runner.go index 851fcc06f30..75b66a9c0cf 100644 --- a/internal/broadside/orchestrator/runner.go +++ b/internal/broadside/orchestrator/runner.go @@ -252,11 +252,24 @@ func (r *Runner) newDatabase() (db.Database, error) { func NewDatabase(config configuration.TestConfig) (db.Database, error) { switch { case len(config.DatabaseConfig.Postgres) > 0: + var allJobAgeDays []int + seen := make(map[int]struct{}) + for _, qc := range config.QueueConfig { + for _, jsc := range qc.JobSetConfig { + for _, d := range jsc.HistoricalJobsConfig.JobAgeDays { + if _, ok := seen[d]; !ok { + allJobAgeDays = append(allJobAgeDays, d) + seen[d] = struct{}{} + } + } + } + } return db.NewPostgresDatabase( config.DatabaseConfig.Postgres, config.FeatureToggles, config.DatabaseConfig.PostgresTuningSQL, config.DatabaseConfig.PostgresTuningRevertSQL, + allJobAgeDays, ), nil case len(config.DatabaseConfig.ClickHouse) > 0: return db.NewClickHouseDatabase(config.DatabaseConfig.ClickHouse), nil From d2a24e63d857bf224b11f7776fa127c82f87b2f7 Mon Sep 17 00:00:00 2001 From: Maurice Yap Date: Wed, 18 Mar 2026 10:25:49 +0000 Subject: [PATCH 2/6] lint fix Signed-off-by: Maurice Yap --- internal/broadside/db/postgres_partitioned.go | 2 +- internal/broadside/jobspec/state.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/broadside/db/postgres_partitioned.go b/internal/broadside/db/postgres_partitioned.go index ab93930efc7..d595b5b5f1d 100644 --- a/internal/broadside/db/postgres_partitioned.go +++ b/internal/broadside/db/postgres_partitioned.go @@ -322,7 +322,7 @@ func (p *PostgresDatabase) insertHistoricalJobChunkForBucket( // Compute the partition date using the same logic as createDailyPartitions // so we can insert directly into the named partition, bypassing the // parent table's partition routing entirely. - baseTime := time.Now().Truncate(24 * time.Hour).AddDate(0, 0, -ageDays) + baseTime := time.Now().Truncate(24*time.Hour).AddDate(0, 0, -ageDays) baseTimeExpr := fmt.Sprintf("'%s'::timestamp", baseTime.Format("2006-01-02 15:04:05")) partitionName := fmt.Sprintf("job_p%s", baseTime.Format("20060102")) diff --git a/internal/broadside/jobspec/state.go b/internal/broadside/jobspec/state.go index 40590b317bd..adf2b79350f 100644 --- a/internal/broadside/jobspec/state.go +++ b/internal/broadside/jobspec/state.go @@ -44,10 +44,10 @@ func NewScheduledTransition(t time.Time, jobID, runID string, toState JobState, } func (s ScheduledTransition) Time() time.Time { return s.time } -func (s ScheduledTransition) JobID() string { return s.jobID } -func (s ScheduledTransition) RunID() string { return s.runID } -func (s ScheduledTransition) ToState() JobState { return s.toState } -func (s ScheduledTransition) Submitted() time.Time { return s.submitted } +func (s ScheduledTransition) JobID() string { return s.jobID } +func (s ScheduledTransition) RunID() string { return s.runID } +func (s ScheduledTransition) ToState() JobState { return s.toState } +func (s ScheduledTransition) Submitted() time.Time { return s.submitted } type TransitionHeap []ScheduledTransition From 7ad0dab9581bc42446e3c5314984739f05e80b65 Mon Sep 17 00:00:00 2001 From: Maurice Yap Date: Wed, 18 Mar 2026 10:52:20 +0000 Subject: [PATCH 3/6] comments Signed-off-by: Maurice Yap --- internal/broadside/db/postgres.go | 14 +++- internal/broadside/db/postgres_unit_test.go | 85 +++++++++++++++++++++ internal/broadside/ingester/ingester.go | 2 +- 3 files changed, 99 insertions(+), 2 deletions(-) create mode 100644 internal/broadside/db/postgres_unit_test.go diff --git a/internal/broadside/db/postgres.go b/internal/broadside/db/postgres.go index d81a8350ba3..ef00f02e90d 100644 --- a/internal/broadside/db/postgres.go +++ b/internal/broadside/db/postgres.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "regexp" "strings" "sync" "time" @@ -177,7 +178,7 @@ func (p *PostgresDatabase) execTuningSQLOnPartitions(ctx context.Context, stmts for i, stmt := range stmts { if targetsJobTable(stmt) { for _, part := range partitions { - partStmt := strings.Replace(stmt, "job", part, 1) + partStmt := replaceJobTable(stmt, part) if _, err := p.pool.Exec(ctx, partStmt); err != nil { return fmt.Errorf("%s tuning SQL statement %d on partition %s: %w", verb, i+1, part, err) } @@ -225,6 +226,17 @@ func targetsJobTable(stmt string) bool { !strings.Contains(lower, "alter table job_") } +// alterTableJobRe matches "ALTER TABLE job " case-insensitively so that the +// table name can be rewritten to a specific partition name without accidentally +// matching "job" inside SQL comments or in unrelated table names. +var alterTableJobRe = regexp.MustCompile(`(?i)ALTER TABLE job `) + +// replaceJobTable rewrites the "ALTER TABLE job " phrase in stmt to target the +// named partition instead, handling any casing of the original keyword. +func replaceJobTable(stmt, partition string) string { + return alterTableJobRe.ReplaceAllLiteralString(stmt, "ALTER TABLE "+partition+" ") +} + // ExecuteIngestionQueryBatch executes a batch of ingestion queries using the // same sequential phase ordering as the production Lookout ingester: // diff --git a/internal/broadside/db/postgres_unit_test.go b/internal/broadside/db/postgres_unit_test.go new file mode 100644 index 00000000000..47282823f85 --- /dev/null +++ b/internal/broadside/db/postgres_unit_test.go @@ -0,0 +1,85 @@ +package db + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestTargetsJobTable(t *testing.T) { + tests := []struct { + name string + stmt string + expected bool + }{ + { + name: "standard lowercase", + stmt: "ALTER TABLE job SET (autovacuum_vacuum_scale_factor = 0.01)", + expected: true, + }, + { + name: "mixed case table name", + stmt: "ALTER TABLE JOB SET (autovacuum_vacuum_scale_factor = 0.01)", + expected: true, + }, + { + name: "job_run not matched", + stmt: "ALTER TABLE job_run SET (autovacuum_vacuum_scale_factor = 0.01)", + expected: false, + }, + { + name: "comment containing job before alter table", + stmt: "-- tune the job table\nALTER TABLE job SET (autovacuum_vacuum_scale_factor = 0.01)", + expected: true, + }, + { + name: "comment containing job but targets job_run", + stmt: "-- tune the job table\nALTER TABLE job_run SET (autovacuum_vacuum_scale_factor = 0.01)", + expected: false, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expected, targetsJobTable(tc.stmt)) + }) + } +} + +func TestReplaceJobTable(t *testing.T) { + tests := []struct { + name string + stmt string + partition string + expected string + }{ + { + name: "standard lowercase", + stmt: "ALTER TABLE job SET (autovacuum_vacuum_scale_factor = 0.01)", + partition: "job_p2024", + expected: "ALTER TABLE job_p2024 SET (autovacuum_vacuum_scale_factor = 0.01)", + }, + { + name: "mixed case table name", + stmt: "ALTER TABLE JOB SET (autovacuum_vacuum_scale_factor = 0.01)", + partition: "job_p2024", + expected: "ALTER TABLE job_p2024 SET (autovacuum_vacuum_scale_factor = 0.01)", + }, + { + name: "comment containing job before alter table", + stmt: "-- tune the job table\nALTER TABLE job SET (autovacuum_vacuum_scale_factor = 0.01)", + partition: "job_p2024", + expected: "-- tune the job table\nALTER TABLE job_p2024 SET (autovacuum_vacuum_scale_factor = 0.01)", + }, + { + name: "reset statement", + stmt: "ALTER TABLE job RESET (autovacuum_vacuum_scale_factor)", + partition: "job_p2024", + expected: "ALTER TABLE job_p2024 RESET (autovacuum_vacuum_scale_factor)", + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expected, replaceJobTable(tc.stmt, tc.partition)) + }) + } +} diff --git a/internal/broadside/ingester/ingester.go b/internal/broadside/ingester/ingester.go index 947c21dbe8c..239cde3740b 100644 --- a/internal/broadside/ingester/ingester.go +++ b/internal/broadside/ingester/ingester.go @@ -373,8 +373,8 @@ func (i *Ingester) runBatchExecutor( // cleanly — otherwise the Postgres backend will be mid-rollback // when TearDown tries to TRUNCATE, blocking on lock acquisition. flushCtx, flushCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer flushCancel() i.executeBatch(flushCtx, batch) - flushCancel() } return } From 8d51eea4b719d0b7abf6577c928dce2a75883551 Mon Sep 17 00:00:00 2001 From: Maurice Yap Date: Fri, 20 Mar 2026 11:17:59 +0000 Subject: [PATCH 4/6] comments Signed-off-by: Maurice Yap --- internal/broadside/configuration/doc.go | 2 +- internal/broadside/db/db.go | 42 ++++++++----------- internal/broadside/db/historical.go | 12 +++--- internal/broadside/db/postgres.go | 17 +++++++- internal/broadside/db/postgres_partitioned.go | 23 +++++----- internal/broadside/db/sql/partition_down.sql | 13 ++++++ internal/broadside/db/sql/partition_up.sql | 2 + internal/broadside/ingester/ingester.go | 26 +++++------- 8 files changed, 78 insertions(+), 59 deletions(-) diff --git a/internal/broadside/configuration/doc.go b/internal/broadside/configuration/doc.go index 04174a8d5be..6fcb582fca3 100644 --- a/internal/broadside/configuration/doc.go +++ b/internal/broadside/configuration/doc.go @@ -11,7 +11,7 @@ job submissions, state transitions, and query patterns. The main configuration type is TestConfig, which defines: - Test duration and warmup period - - Database configuration (Postgres or ClickHouse connection parameters, optional Postgres tuning and revert SQL) + - Database configuration (Postgres or ClickHouse connection parameters, optional Postgres tuning and revert SQL, optional partition-by-submitted toggle) - Queue configuration (queue/jobset distribution and historical job setup) - Ingestion configuration (job submission rates and state transitions) - Query configuration (rates for different query types) diff --git a/internal/broadside/db/db.go b/internal/broadside/db/db.go index deac88288b1..0d4161a2784 100644 --- a/internal/broadside/db/db.go +++ b/internal/broadside/db/db.go @@ -195,10 +195,9 @@ type SetJobRunning struct { func (SetJobRunning) isIngestionQuery() {} type SetJobRunStarted struct { - JobRunID string - Time time.Time - Node string - Submitted time.Time + JobRunID string + Time time.Time + Node string } func (SetJobRunStarted) isIngestionQuery() {} @@ -213,46 +212,41 @@ type SetJobPending struct { func (SetJobPending) isIngestionQuery() {} type SetJobRunPending struct { - JobRunID string - Time time.Time - Submitted time.Time + JobRunID string + Time time.Time } func (SetJobRunPending) isIngestionQuery() {} type SetJobRunCancelled struct { - JobRunID string - Time time.Time - Submitted time.Time + JobRunID string + Time time.Time } func (SetJobRunCancelled) isIngestionQuery() {} type SetJobRunFailed struct { - JobRunID string - Time time.Time - Error []byte - Debug []byte - ExitCode int32 - Submitted time.Time + JobRunID string + Time time.Time + Error []byte + Debug []byte + ExitCode int32 } func (SetJobRunFailed) isIngestionQuery() {} type SetJobRunSucceeded struct { - JobRunID string - Time time.Time - ExitCode int32 - Submitted time.Time + JobRunID string + Time time.Time + ExitCode int32 } func (SetJobRunSucceeded) isIngestionQuery() {} type SetJobRunPreempted struct { - JobRunID string - Time time.Time - Error []byte - Submitted time.Time + JobRunID string + Time time.Time + Error []byte } func (SetJobRunPreempted) isIngestionQuery() {} diff --git a/internal/broadside/db/historical.go b/internal/broadside/db/historical.go index d917af41849..3ca2b4ef965 100644 --- a/internal/broadside/db/historical.go +++ b/internal/broadside/db/historical.go @@ -72,32 +72,32 @@ func buildHistoricalJobQueries(jobNum int, params HistoricalJobsParams) []Ingest SetJobLeased{JobID: jobID, Time: leasedTime, RunID: runID, Submitted: baseTime}, InsertJobRun{JobRunID: runID, JobID: jobID, Cluster: cluster, Node: node, Pool: jobspec.GetPool(jobNum), Time: leasedTime, Submitted: baseTime}, SetJobPending{JobID: jobID, Time: pendingTime, RunID: runID, Submitted: baseTime}, - SetJobRunPending{JobRunID: runID, Time: pendingTime, Submitted: baseTime}, + SetJobRunPending{JobRunID: runID, Time: pendingTime}, SetJobRunning{JobID: jobID, Time: runningTime, LatestRunID: runID, Submitted: baseTime}, - SetJobRunStarted{JobRunID: runID, Time: runningTime, Node: node, Submitted: baseTime}, + SetJobRunStarted{JobRunID: runID, Time: runningTime, Node: node}, } switch historicalState(jobNum, params) { case jobspec.StateSucceeded: queries = append(queries, SetJobSucceeded{JobID: jobID, Time: terminalTime, Submitted: baseTime}, - SetJobRunSucceeded{JobRunID: runID, Time: terminalTime, Submitted: baseTime}, + SetJobRunSucceeded{JobRunID: runID, Time: terminalTime}, ) case jobspec.StateErrored: queries = append(queries, SetJobErrored{JobID: jobID, Time: terminalTime, Submitted: baseTime}, - SetJobRunFailed{JobRunID: runID, Time: terminalTime, Error: params.ErrorBytes, Debug: params.DebugBytes, Submitted: baseTime}, + SetJobRunFailed{JobRunID: runID, Time: terminalTime, Error: params.ErrorBytes, Debug: params.DebugBytes}, InsertJobError{JobID: jobID, Error: params.ErrorBytes, Submitted: baseTime}, ) case jobspec.StateCancelled: queries = append(queries, SetJobCancelled{JobID: jobID, Time: terminalTime, CancelReason: "user requested", CancelUser: params.QueueName, Submitted: baseTime}, - SetJobRunCancelled{JobRunID: runID, Time: terminalTime, Submitted: baseTime}, + SetJobRunCancelled{JobRunID: runID, Time: terminalTime}, ) case jobspec.StatePreempted: queries = append(queries, SetJobPreempted{JobID: jobID, Time: terminalTime, Submitted: baseTime}, - SetJobRunPreempted{JobRunID: runID, Time: terminalTime, Error: params.PreemptionBytes, Submitted: baseTime}, + SetJobRunPreempted{JobRunID: runID, Time: terminalTime, Error: params.PreemptionBytes}, ) } diff --git a/internal/broadside/db/postgres.go b/internal/broadside/db/postgres.go index ef00f02e90d..a1eccdd1ac6 100644 --- a/internal/broadside/db/postgres.go +++ b/internal/broadside/db/postgres.go @@ -183,12 +183,12 @@ func (p *PostgresDatabase) execTuningSQLOnPartitions(ctx context.Context, stmts return fmt.Errorf("%s tuning SQL statement %d on partition %s: %w", verb, i+1, part, err) } } - logging.Infof("%s tuning SQL statement %d: applied to %d partitions", strings.Title(verb), i+1, len(partitions)) + logging.Infof("%s tuning SQL statement %d: applied to %d partitions", capitalize(verb), i+1, len(partitions)) } else { if _, err := p.pool.Exec(ctx, stmt); err != nil { return fmt.Errorf("%s tuning SQL statement %d: %w", verb, i+1, err) } - logging.Infof("%s tuning SQL statement %d", strings.Title(verb), i+1) + logging.Infof("%s tuning SQL statement %d", capitalize(verb), i+1) } } return nil @@ -215,6 +215,9 @@ func (p *PostgresDatabase) listJobPartitions(ctx context.Context) ([]string, err } partitions = append(partitions, name) } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterating job partitions: %w", err) + } return partitions, nil } @@ -538,6 +541,9 @@ func (p *PostgresDatabase) insertHistoricalJobChunk(ctx context.Context, params if p.features.PartitionBySubmitted { return p.insertHistoricalJobChunkPartitioned(ctx, params, startIdx, lastIdx) } + if len(params.JobAgeDays) == 0 { + return nil + } prefix := fmt.Sprintf("%04d%04d", params.QueueIdx, params.JobSetIdx) succeeded := params.SucceededThreshold errored := params.ErroredThreshold @@ -726,6 +732,13 @@ func stringSliceToSQL(vals []string) string { return "ARRAY[" + strings.Join(parts, ",") + "]" } +func capitalize(s string) string { + if s == "" { + return s + } + return strings.ToUpper(s[:1]) + s[1:] +} + func buildAnnotationSQL() string { parts := make([]string, 0, len(jobspec.AnnotationConfigs)*2) for _, ac := range jobspec.AnnotationConfigs { diff --git a/internal/broadside/db/postgres_partitioned.go b/internal/broadside/db/postgres_partitioned.go index d595b5b5f1d..fe2f4e3fae4 100644 --- a/internal/broadside/db/postgres_partitioned.go +++ b/internal/broadside/db/postgres_partitioned.go @@ -21,6 +21,10 @@ import ( // The submitted column is required because: // - The job table PK is (job_id, submitted), so UPDATEs need it for partition pruning // - Child tables (job_run, job_spec, job_error) have a NOT NULL submitted column +// +// This function is NOT idempotent. Phase 1 uses CopyFrom (COPY protocol) which +// has no ON CONFLICT support. Retrying after a partial failure will produce +// duplicate-key errors. Do not add retry logic around this call. func (p *PostgresDatabase) executePartitionedBatch(ctx context.Context, queries []IngestionQuery) error { submittedMap := buildJobSubmittedMap(queries) set, err := queriesToInstructionSet(queries) @@ -28,20 +32,17 @@ func (p *PostgresDatabase) executePartitionedBatch(ctx context.Context, queries return err } - // Phase 1: Create jobs and job specs (must exist before job_run references). - var wg sync.WaitGroup - var jobErr, specErr error - wg.Go(func() { jobErr = p.createJobsPartitioned(ctx, set.JobsToCreate) }) - wg.Go(func() { specErr = p.createJobSpecsPartitioned(ctx, set.JobsToCreate, submittedMap) }) - wg.Wait() - if jobErr != nil { - return fmt.Errorf("creating jobs (partitioned): %w", jobErr) + // Phase 1: Create jobs, then job specs (job_spec.job_id references job.job_id, + // so specs must be inserted after jobs are committed). + if err := p.createJobsPartitioned(ctx, set.JobsToCreate); err != nil { + return fmt.Errorf("creating jobs (partitioned): %w", err) } - if specErr != nil { - return fmt.Errorf("creating job specs (partitioned): %w", specErr) + if err := p.createJobSpecsPartitioned(ctx, set.JobsToCreate, submittedMap); err != nil { + return fmt.Errorf("creating job specs (partitioned): %w", err) } // Phase 2: Update jobs, create job_runs, create job_errors (parallel). + var wg sync.WaitGroup var updateErr, runErr, errErr error wg.Go(func() { updateErr = p.updateJobsPartitioned(ctx, set.JobsToUpdate, submittedMap) }) wg.Go(func() { runErr = p.createJobRunsPartitioned(ctx, set.JobRunsToCreate, submittedMap) }) @@ -73,7 +74,7 @@ func buildJobSubmittedMap(queries []IngestionQuery) map[string]time.Time { for _, q := range queries { switch v := q.(type) { case InsertJob: - m[v.Job.JobID] = v.Job.Submitted + setIfAbsent(m, v.Job.JobID, v.Job.Submitted) case SetJobLeased: setIfAbsent(m, v.JobID, v.Submitted) case InsertJobRun: diff --git a/internal/broadside/db/sql/partition_down.sql b/internal/broadside/db/sql/partition_down.sql index 36f2315a4e3..fbc9440511e 100644 --- a/internal/broadside/db/sql/partition_down.sql +++ b/internal/broadside/db/sql/partition_down.sql @@ -63,6 +63,19 @@ CREATE INDEX idx_job_queue_jobset_state ON job (queue, jobset, state); CREATE INDEX idx_job_state ON job (state); +CREATE INDEX idx_job_jobset_pattern ON job (jobset varchar_pattern_ops) WITH (fillfactor = 80); + +CREATE INDEX idx_job_ltt_jobid ON job (last_transition_time, job_id) WITH (fillfactor = 80); + +CREATE INDEX idx_job_active_queue_jobset ON job (queue, jobset) WITH (fillfactor = 80) +WHERE state IN (1, 2, 3, 8); + +CREATE INDEX idx_job_queue_namespace ON job (queue, namespace) WITH (fillfactor = 80); + +CREATE INDEX idx_job_latest_run_id ON job (latest_run_id) WITH (fillfactor = 80); + +CREATE INDEX idx_job_submitted ON job (submitted DESC); + -- Remove submitted from child tables ALTER TABLE job_run DROP COLUMN IF EXISTS submitted; diff --git a/internal/broadside/db/sql/partition_up.sql b/internal/broadside/db/sql/partition_up.sql index 0707b4648e5..68d781fe1ee 100644 --- a/internal/broadside/db/sql/partition_up.sql +++ b/internal/broadside/db/sql/partition_up.sql @@ -112,4 +112,6 @@ CREATE INDEX idx_job_state ON job (state); CREATE INDEX idx_job_submitted ON job (submitted); +ALTER TABLE job ALTER COLUMN job_spec SET STORAGE EXTERNAL; + COMMIT; diff --git a/internal/broadside/ingester/ingester.go b/internal/broadside/ingester/ingester.go index 239cde3740b..c10d65e92ab 100644 --- a/internal/broadside/ingester/ingester.go +++ b/internal/broadside/ingester/ingester.go @@ -521,9 +521,8 @@ func (i *Ingester) processTransition( }, ctx) i.routerSend(router, timestampedQuery{ query: db.SetJobRunPending{ - JobRunID: trans.RunID(), - Time: now, - Submitted: submitted, + JobRunID: trans.RunID(), + Time: now, }, enqueuedAt: now, }, ctx) @@ -547,10 +546,9 @@ func (i *Ingester) processTransition( }, ctx) i.routerSend(router, timestampedQuery{ query: db.SetJobRunStarted{ - JobRunID: trans.RunID(), - Time: now, - Node: node, - Submitted: submitted, + JobRunID: trans.RunID(), + Time: now, + Node: node, }, enqueuedAt: now, }, ctx) @@ -584,9 +582,8 @@ func (i *Ingester) processTransition( }, ctx) i.routerSend(router, timestampedQuery{ query: db.SetJobRunSucceeded{ - JobRunID: trans.RunID(), - Time: now, - Submitted: submitted, + JobRunID: trans.RunID(), + Time: now, }, enqueuedAt: now, }, ctx) @@ -602,11 +599,10 @@ func (i *Ingester) processTransition( }, ctx) i.routerSend(router, timestampedQuery{ query: db.SetJobRunFailed{ - JobRunID: trans.RunID(), - Time: now, - Error: simulatedError, - Debug: simulatedDebugMsg, - Submitted: submitted, + JobRunID: trans.RunID(), + Time: now, + Error: simulatedError, + Debug: simulatedDebugMsg, }, enqueuedAt: now, }, ctx) From 0ef703ae4ed5fd16beefc287f1f1df7bc4cf82a8 Mon Sep 17 00:00:00 2001 From: Maurice Yap Date: Fri, 20 Mar 2026 11:46:24 +0000 Subject: [PATCH 5/6] more comments Signed-off-by: Maurice Yap --- internal/broadside/db/postgres.go | 8 ++- internal/broadside/db/postgres_partitioned.go | 51 +++++++------------ internal/broadside/db/postgres_unit_test.go | 5 ++ internal/broadside/db/sql/partition_up.sql | 13 ++++- 4 files changed, 41 insertions(+), 36 deletions(-) diff --git a/internal/broadside/db/postgres.go b/internal/broadside/db/postgres.go index a1eccdd1ac6..26fe2f8487a 100644 --- a/internal/broadside/db/postgres.go +++ b/internal/broadside/db/postgres.go @@ -225,8 +225,12 @@ func (p *PostgresDatabase) listJobPartitions(ctx context.Context) ([]string, err // the "job" table specifically (not job_run, job_spec, etc.). func targetsJobTable(stmt string) bool { lower := strings.ToLower(stmt) - return strings.Contains(lower, "alter table job ") && - !strings.Contains(lower, "alter table job_") + idx := strings.Index(lower, "alter table ") + if idx == -1 { + return false + } + after := lower[idx+len("alter table "):] + return strings.HasPrefix(after, "job ") || strings.HasPrefix(after, "job\n") || strings.HasPrefix(after, "job\t") } // alterTableJobRe matches "ALTER TABLE job " case-insensitively so that the diff --git a/internal/broadside/db/postgres_partitioned.go b/internal/broadside/db/postgres_partitioned.go index fe2f4e3fae4..6132938645a 100644 --- a/internal/broadside/db/postgres_partitioned.go +++ b/internal/broadside/db/postgres_partitioned.go @@ -108,20 +108,14 @@ func setIfAbsent(m map[string]time.Time, key string, val time.Time) { } } -// createJobsPartitioned inserts jobs using CopyFrom directly into the named -// leaf partition (bypassing parent-table routing which can fail with COPY). -// Instructions are grouped by partition based on their submitted date. +// createJobsPartitioned inserts jobs via the partitioned parent table, allowing +// PostgreSQL's partition routing to direct each row to the correct leaf partition +// (including job_default for any dates outside the pre-created range). func (p *PostgresDatabase) createJobsPartitioned(ctx context.Context, instructions []*lookoutmodel.CreateJobInstruction) error { if len(instructions) == 0 { return nil } - groups := make(map[string][]*lookoutmodel.CreateJobInstruction) - for _, instr := range instructions { - part := partitionNameForTime(instr.Submitted) - groups[part] = append(groups[part], instr) - } - columns := []string{ "job_id", "queue", "owner", "namespace", "jobset", "cpu", "memory", "ephemeral_storage", "gpu", "priority", @@ -129,34 +123,25 @@ func (p *PostgresDatabase) createJobsPartitioned(ctx context.Context, instructio "priority_class", "annotations", "job_spec", } - for partName, instrs := range groups { - _, err := p.pool.CopyFrom(ctx, - pgx.Identifier{partName}, - columns, - pgx.CopyFromSlice(len(instrs), func(i int) ([]interface{}, error) { - instr := instrs[i] - return []interface{}{ - instr.JobId, instr.Queue, instr.Owner, instr.Namespace, instr.JobSet, - instr.Cpu, instr.Memory, instr.EphemeralStorage, instr.Gpu, instr.Priority, - instr.Submitted, instr.State, instr.LastTransitionTime, instr.LastTransitionTimeSeconds, - instr.PriorityClass, instr.Annotations, instr.JobProto, - }, nil - }), - ) - if err != nil { - return fmt.Errorf("copying into partition %s: %w", partName, err) - } + _, err := p.pool.CopyFrom(ctx, + pgx.Identifier{"job"}, + columns, + pgx.CopyFromSlice(len(instructions), func(i int) ([]interface{}, error) { + instr := instructions[i] + return []interface{}{ + instr.JobId, instr.Queue, instr.Owner, instr.Namespace, instr.JobSet, + instr.Cpu, instr.Memory, instr.EphemeralStorage, instr.Gpu, instr.Priority, + instr.Submitted, instr.State, instr.LastTransitionTime, instr.LastTransitionTimeSeconds, + instr.PriorityClass, instr.Annotations, instr.JobProto, + }, nil + }), + ) + if err != nil { + return fmt.Errorf("copying jobs into partitioned table: %w", err) } return nil } -// partitionNameForTime returns the daily partition name for the given timestamp, -// using the same date-truncation logic as createDailyPartitions. -func partitionNameForTime(t time.Time) string { - date := t.Truncate(24 * time.Hour) - return fmt.Sprintf("job_p%s", date.Format("20060102")) -} - // createJobSpecsPartitioned inserts job_spec rows with submitted. func (p *PostgresDatabase) createJobSpecsPartitioned(ctx context.Context, instructions []*lookoutmodel.CreateJobInstruction, submittedMap map[string]time.Time) error { if len(instructions) == 0 { diff --git a/internal/broadside/db/postgres_unit_test.go b/internal/broadside/db/postgres_unit_test.go index 47282823f85..16799f9c4b6 100644 --- a/internal/broadside/db/postgres_unit_test.go +++ b/internal/broadside/db/postgres_unit_test.go @@ -37,6 +37,11 @@ func TestTargetsJobTable(t *testing.T) { stmt: "-- tune the job table\nALTER TABLE job_run SET (autovacuum_vacuum_scale_factor = 0.01)", expected: false, }, + { + name: "comment referencing alter table job_ before real alter table job", + stmt: "-- Tunes job table, see alter table job_run for the equivalent\nALTER TABLE job SET (autovacuum_vacuum_scale_factor = 0.01)", + expected: true, + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { diff --git a/internal/broadside/db/sql/partition_up.sql b/internal/broadside/db/sql/partition_up.sql index 68d781fe1ee..37e762b98f8 100644 --- a/internal/broadside/db/sql/partition_up.sql +++ b/internal/broadside/db/sql/partition_up.sql @@ -110,7 +110,18 @@ CREATE INDEX idx_job_queue_jobset_state ON job (queue, jobset, state); CREATE INDEX idx_job_state ON job (state); -CREATE INDEX idx_job_submitted ON job (submitted); +CREATE INDEX idx_job_jobset_pattern ON job (jobset varchar_pattern_ops) WITH (fillfactor = 80); + +CREATE INDEX idx_job_ltt_jobid ON job (last_transition_time, job_id) WITH (fillfactor = 80); + +CREATE INDEX idx_job_active_queue_jobset ON job (queue, jobset) WITH (fillfactor = 80) +WHERE state IN (1, 2, 3, 8); + +CREATE INDEX idx_job_queue_namespace ON job (queue, namespace) WITH (fillfactor = 80); + +CREATE INDEX idx_job_latest_run_id ON job (latest_run_id) WITH (fillfactor = 80); + +CREATE INDEX idx_job_submitted ON job (submitted DESC); ALTER TABLE job ALTER COLUMN job_spec SET STORAGE EXTERNAL; From c6c8bc4714d08e7e2085b07b67d0596b68ee46a8 Mon Sep 17 00:00:00 2001 From: Maurice Yap Date: Fri, 20 Mar 2026 12:07:50 +0000 Subject: [PATCH 6/6] fix Signed-off-by: Maurice Yap --- internal/broadside/db/postgres.go | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/internal/broadside/db/postgres.go b/internal/broadside/db/postgres.go index 26fe2f8487a..09ce062a56d 100644 --- a/internal/broadside/db/postgres.go +++ b/internal/broadside/db/postgres.go @@ -223,8 +223,10 @@ func (p *PostgresDatabase) listJobPartitions(ctx context.Context) ([]string, err // targetsJobTable returns true if the SQL statement is an ALTER TABLE targeting // the "job" table specifically (not job_run, job_spec, etc.). +// Line comments (-- ...) are stripped before matching to avoid false positives +// from table names mentioned in comments. func targetsJobTable(stmt string) bool { - lower := strings.ToLower(stmt) + lower := stripLineComments(strings.ToLower(stmt)) idx := strings.Index(lower, "alter table ") if idx == -1 { return false @@ -233,6 +235,26 @@ func targetsJobTable(stmt string) bool { return strings.HasPrefix(after, "job ") || strings.HasPrefix(after, "job\n") || strings.HasPrefix(after, "job\t") } +// stripLineComments removes SQL line comments (-- to end of line) from s. +func stripLineComments(s string) string { + var b strings.Builder + for { + i := strings.Index(s, "--") + if i == -1 { + b.WriteString(s) + break + } + b.WriteString(s[:i]) + rest := s[i:] + if j := strings.Index(rest, "\n"); j != -1 { + s = rest[j:] // keep the newline so line structure is preserved + } else { + break + } + } + return b.String() +} + // alterTableJobRe matches "ALTER TABLE job " case-insensitively so that the // table name can be rewritten to a specific partition name without accidentally // matching "job" inside SQL comments or in unrelated table names.