diff --git a/cmd/broadside/configs/examples/test-postgres.yaml b/cmd/broadside/configs/examples/test-postgres.yaml index e78a11b832d..c6839eca086 100644 --- a/cmd/broadside/configs/examples/test-postgres.yaml +++ b/cmd/broadside/configs/examples/test-postgres.yaml @@ -1,6 +1,12 @@ testDuration: 60s warmupDuration: 10s +skipTearDown: false + +featureToggles: + partitionBySubmitted: false + hotColdSplit: false + databaseConfig: postgres: host: localhost diff --git a/internal/broadside/actions/actor.go b/internal/broadside/actions/actor.go index 047648264be..917cb444067 100644 --- a/internal/broadside/actions/actor.go +++ b/internal/broadside/actions/actor.go @@ -170,8 +170,9 @@ func (a *Actor) executeReprioritisation(ctx context.Context, queue string, jobSe queries := make([]db.IngestionQuery, 0, len(jobs)) for _, job := range jobs { queries = append(queries, db.UpdateJobPriority{ - JobID: job.JobID, - Priority: jobspec.PriorityValues, + JobID: job.JobID, + Priority: jobspec.PriorityValues, + Submitted: job.Submitted, }) } @@ -230,6 +231,7 @@ func (a *Actor) executeCancellation(ctx context.Context, queue string, jobSet st Time: now, CancelReason: "Bulk cancellation via Broadside test", CancelUser: "broadside-test", + Submitted: job.Submitted, }) } @@ -257,8 +259,9 @@ func (a *Actor) executeCancellation(ctx context.Context, queue string, jobSet st // jobInfo holds minimal information about a job needed for actions. type jobInfo struct { - JobID string - State string + JobID string + State string + Submitted time.Time } // getActiveJobs queries the database for active jobs in the specified queue and job set. @@ -302,8 +305,9 @@ func (a *Actor) getActiveJobs(ctx context.Context, queue string, jobSet string) result := make([]jobInfo, 0, len(jobs)) for _, job := range jobs { result = append(result, jobInfo{ - JobID: job.JobId, - State: job.State, + JobID: job.JobId, + State: job.State, + Submitted: job.Submitted, }) } diff --git a/internal/broadside/configuration/configuration.go b/internal/broadside/configuration/configuration.go index 9f4ac5f3a3b..4361cc7f8e7 100644 --- a/internal/broadside/configuration/configuration.go +++ b/internal/broadside/configuration/configuration.go @@ -15,7 +15,8 @@ type TestConfig struct { } type FeatureToggles struct { - HotColdSplit bool `yaml:"hotColdSplit,omitempty"` + HotColdSplit bool `yaml:"hotColdSplit,omitempty"` + PartitionBySubmitted bool `yaml:"partitionBySubmitted,omitempty"` } type DatabaseConfig struct { diff --git a/internal/broadside/configuration/doc.go b/internal/broadside/configuration/doc.go index 04174a8d5be..6fcb582fca3 100644 --- a/internal/broadside/configuration/doc.go +++ b/internal/broadside/configuration/doc.go @@ -11,7 +11,7 @@ job submissions, state transitions, and query patterns. The main configuration type is TestConfig, which defines: - Test duration and warmup period - - Database configuration (Postgres or ClickHouse connection parameters, optional Postgres tuning and revert SQL) + - Database configuration (Postgres or ClickHouse connection parameters, optional Postgres tuning and revert SQL, optional partition-by-submitted toggle) - Queue configuration (queue/jobset distribution and historical job setup) - Ingestion configuration (job submission rates and state transitions) - Query configuration (rates for different query types) diff --git a/internal/broadside/configuration/validation.go b/internal/broadside/configuration/validation.go index fdecdb29485..0dcbfe7d7ec 100644 --- a/internal/broadside/configuration/validation.go +++ b/internal/broadside/configuration/validation.go @@ -11,6 +11,9 @@ const proportionTolerance = 0.001 // Tolerance for floating point comparison // Validate validates the Test configuration. func (t *TestConfig) Validate() error { + if t.FeatureToggles.HotColdSplit && t.FeatureToggles.PartitionBySubmitted { + return fmt.Errorf("feature toggles are mutually exclusive: only one of hotColdSplit, partitionBySubmitted may be enabled") + } if t.TestDuration <= 0 { return fmt.Errorf("testDuration must be positive, got %v", t.TestDuration) } diff --git a/internal/broadside/configuration/validation_test.go b/internal/broadside/configuration/validation_test.go index e3086cfe143..05cefd84671 100644 --- a/internal/broadside/configuration/validation_test.go +++ b/internal/broadside/configuration/validation_test.go @@ -55,6 +55,15 @@ func TestTest_Validate(t *testing.T) { modify: func(t *TestConfig) {}, wantErr: false, }, + { + name: "mutually exclusive feature toggles", + modify: func(t *TestConfig) { + t.FeatureToggles.HotColdSplit = true + t.FeatureToggles.PartitionBySubmitted = true + }, + wantErr: true, + errText: "feature toggles are mutually exclusive", + }, { name: "zero test duration", modify: func(t *TestConfig) { diff --git a/internal/broadside/db/db.go b/internal/broadside/db/db.go index 011d5b3520c..0d4161a2784 100644 --- a/internal/broadside/db/db.go +++ b/internal/broadside/db/db.go @@ -128,8 +128,9 @@ type NewJob struct { } type UpdateJobPriority struct { - JobID string - Priority int64 + JobID string + Priority int64 + Submitted time.Time } func (UpdateJobPriority) isIngestionQuery() {} @@ -139,41 +140,47 @@ type SetJobCancelled struct { Time time.Time CancelReason string CancelUser string + Submitted time.Time } func (SetJobCancelled) isIngestionQuery() {} type SetJobSucceeded struct { - JobID string - Time time.Time + JobID string + Time time.Time + Submitted time.Time } func (SetJobSucceeded) isIngestionQuery() {} type InsertJobError struct { - JobID string - Error []byte + JobID string + Error []byte + Submitted time.Time } func (InsertJobError) isIngestionQuery() {} type SetJobPreempted struct { - JobID string - Time time.Time + JobID string + Time time.Time + Submitted time.Time } func (SetJobPreempted) isIngestionQuery() {} type SetJobRejected struct { - JobID string - Time time.Time + JobID string + Time time.Time + Submitted time.Time } func (SetJobRejected) isIngestionQuery() {} type SetJobErrored struct { - JobID string - Time time.Time + JobID string + Time time.Time + Submitted time.Time } func (SetJobErrored) isIngestionQuery() {} @@ -182,6 +189,7 @@ type SetJobRunning struct { JobID string Time time.Time LatestRunID string + Submitted time.Time } func (SetJobRunning) isIngestionQuery() {} @@ -195,9 +203,10 @@ type SetJobRunStarted struct { func (SetJobRunStarted) isIngestionQuery() {} type SetJobPending struct { - JobID string - Time time.Time - RunID string + JobID string + Time time.Time + RunID string + Submitted time.Time } func (SetJobPending) isIngestionQuery() {} @@ -243,20 +252,22 @@ type SetJobRunPreempted struct { func (SetJobRunPreempted) isIngestionQuery() {} type SetJobLeased struct { - JobID string - Time time.Time - RunID string + JobID string + Time time.Time + RunID string + Submitted time.Time } func (SetJobLeased) isIngestionQuery() {} type InsertJobRun struct { - JobRunID string - JobID string - Cluster string - Node string - Pool string - Time time.Time + JobRunID string + JobID string + Cluster string + Node string + Pool string + Time time.Time + Submitted time.Time } func (InsertJobRun) isIngestionQuery() {} diff --git a/internal/broadside/db/doc.go b/internal/broadside/db/doc.go index 97b727f831b..7b0be78893d 100644 --- a/internal/broadside/db/doc.go +++ b/internal/broadside/db/doc.go @@ -96,6 +96,18 @@ Three implementations are provided: in a single statement — ensuring no terminal state is ever written to job. Historical job population writes directly to job_historical when the toggle is enabled. + When the PartitionBySubmitted feature toggle is enabled (mutually exclusive + with HotColdSplit), InitialiseSchema converts the job table into a + range-partitioned table on the submitted column (sql/partition_up.sql), + creates daily partitions covering all configured jobAgeDays plus a forward + buffer, and adds a DEFAULT partition. Child tables (job_run, job_spec, + job_error) gain a NOT NULL submitted column. During ingestion, + ExecuteIngestionQueryBatch bypasses LookoutDb entirely, using direct SQL that + includes submitted in INSERT columns and UPDATE WHERE clauses for partition + pruning. Historical job population splits per age bucket per chunk so that + each INSERT targets a single partition. TearDown drops the partitioned table, + recreates the original unpartitioned schema (sql/partition_down.sql), and + removes the submitted column from child tables. - ClickHouseDatabase: ClickHouse adapter (placeholder implementation) - MemoryDatabase: In-memory adapter for smoke-testing Broadside @@ -138,7 +150,7 @@ Create a database instance using the appropriate constructor: "host": "localhost", "port": "5432", "database": "lookout", - }, nil, nil) + }, configuration.FeatureToggles{}, nil, nil, nil) // For ClickHouse db := db.NewClickHouseDatabase(map[string]string{ diff --git a/internal/broadside/db/historical.go b/internal/broadside/db/historical.go index cb3244fc8f4..3ca2b4ef965 100644 --- a/internal/broadside/db/historical.go +++ b/internal/broadside/db/historical.go @@ -69,34 +69,34 @@ func buildHistoricalJobQueries(jobNum int, params HistoricalJobsParams) []Ingest queries := []IngestionQuery{ InsertJob{Job: newJob, JobSpec: params.JobSpecBytes}, - SetJobLeased{JobID: jobID, Time: leasedTime, RunID: runID}, - InsertJobRun{JobRunID: runID, JobID: jobID, Cluster: cluster, Node: node, Pool: jobspec.GetPool(jobNum), Time: leasedTime}, - SetJobPending{JobID: jobID, Time: pendingTime, RunID: runID}, + SetJobLeased{JobID: jobID, Time: leasedTime, RunID: runID, Submitted: baseTime}, + InsertJobRun{JobRunID: runID, JobID: jobID, Cluster: cluster, Node: node, Pool: jobspec.GetPool(jobNum), Time: leasedTime, Submitted: baseTime}, + SetJobPending{JobID: jobID, Time: pendingTime, RunID: runID, Submitted: baseTime}, SetJobRunPending{JobRunID: runID, Time: pendingTime}, - SetJobRunning{JobID: jobID, Time: runningTime, LatestRunID: runID}, + SetJobRunning{JobID: jobID, Time: runningTime, LatestRunID: runID, Submitted: baseTime}, SetJobRunStarted{JobRunID: runID, Time: runningTime, Node: node}, } switch historicalState(jobNum, params) { case jobspec.StateSucceeded: queries = append(queries, - SetJobSucceeded{JobID: jobID, Time: terminalTime}, + SetJobSucceeded{JobID: jobID, Time: terminalTime, Submitted: baseTime}, SetJobRunSucceeded{JobRunID: runID, Time: terminalTime}, ) case jobspec.StateErrored: queries = append(queries, - SetJobErrored{JobID: jobID, Time: terminalTime}, + SetJobErrored{JobID: jobID, Time: terminalTime, Submitted: baseTime}, SetJobRunFailed{JobRunID: runID, Time: terminalTime, Error: params.ErrorBytes, Debug: params.DebugBytes}, - InsertJobError{JobID: jobID, Error: params.ErrorBytes}, + InsertJobError{JobID: jobID, Error: params.ErrorBytes, Submitted: baseTime}, ) case jobspec.StateCancelled: queries = append(queries, - SetJobCancelled{JobID: jobID, Time: terminalTime, CancelReason: "user requested", CancelUser: params.QueueName}, + SetJobCancelled{JobID: jobID, Time: terminalTime, CancelReason: "user requested", CancelUser: params.QueueName, Submitted: baseTime}, SetJobRunCancelled{JobRunID: runID, Time: terminalTime}, ) case jobspec.StatePreempted: queries = append(queries, - SetJobPreempted{JobID: jobID, Time: terminalTime}, + SetJobPreempted{JobID: jobID, Time: terminalTime, Submitted: baseTime}, SetJobRunPreempted{JobRunID: runID, Time: terminalTime, Error: params.PreemptionBytes}, ) } diff --git a/internal/broadside/db/partition.go b/internal/broadside/db/partition.go new file mode 100644 index 00000000000..17b8f247148 --- /dev/null +++ b/internal/broadside/db/partition.go @@ -0,0 +1,98 @@ +package db + +import ( + "context" + _ "embed" + "fmt" + "time" + + "github.com/armadaproject/armada/internal/common/logging" +) + +//go:embed sql/partition_up.sql +var partitionMigrationSQL string + +//go:embed sql/partition_down.sql +var partitionRevertSQL string + +// applyPartitionMigration applies the partition-by-submitted migration, +// then creates daily partitions covering all configured jobAgeDays plus +// a buffer for live ingestion, plus a DEFAULT partition. +func (p *PostgresDatabase) applyPartitionMigration(ctx context.Context, jobAgeDays []int) error { + // Check whether the job table is already partitioned (relkind 'p'). + // If so, skip the one-time conversion and just ensure partitions exist. + var relkind string + if err := p.pool.QueryRow(ctx, + "SELECT relkind::text FROM pg_class WHERE relname = 'job'").Scan(&relkind); err != nil { + return fmt.Errorf("checking job table type: %w", err) + } + + if relkind != "p" { + if _, err := p.pool.Exec(ctx, partitionMigrationSQL); err != nil { + return fmt.Errorf("applying partition migration: %w", err) + } + + if err := p.createDailyPartitions(ctx, jobAgeDays); err != nil { + return fmt.Errorf("creating daily partitions: %w", err) + } + + // Move any existing rows from the old unpartitioned table into the + // new partitioned one, then drop the old table. This must happen + // after partitions exist so Postgres can route the rows. + if _, err := p.pool.Exec(ctx, "INSERT INTO job SELECT * FROM job_unpartitioned"); err != nil { + return fmt.Errorf("moving rows to partitioned table: %w", err) + } + if _, err := p.pool.Exec(ctx, "DROP TABLE job_unpartitioned"); err != nil { + return fmt.Errorf("dropping old unpartitioned table: %w", err) + } + + logging.Info("Converted job table to range-partitioned") + } else { + // Already partitioned — just ensure any new date partitions exist. + if err := p.createDailyPartitions(ctx, jobAgeDays); err != nil { + return fmt.Errorf("creating daily partitions: %w", err) + } + logging.Info("Job table already partitioned, ensured partitions are up to date") + } + + return nil +} + +// createDailyPartitions creates one partition per day covering all dates +// in jobAgeDays (relative to today) plus a 2-day forward buffer, and a +// DEFAULT partition as a safety net. +func (p *PostgresDatabase) createDailyPartitions(ctx context.Context, jobAgeDays []int) error { + today := time.Now().Truncate(24 * time.Hour) + + dates := make(map[time.Time]struct{}) + for _, days := range jobAgeDays { + date := today.AddDate(0, 0, -days) + dates[date] = struct{}{} + } + // Add today + 2 days buffer for live ingestion + for offset := 0; offset <= 2; offset++ { + dates[today.AddDate(0, 0, offset)] = struct{}{} + } + + for date := range dates { + partitionName := fmt.Sprintf("job_p%s", date.Format("20060102")) + nextDay := date.AddDate(0, 0, 1) + sql := fmt.Sprintf( + "CREATE TABLE IF NOT EXISTS %s PARTITION OF job FOR VALUES FROM ('%s') TO ('%s')", + partitionName, + date.Format("2006-01-02"), + nextDay.Format("2006-01-02"), + ) + if _, err := p.pool.Exec(ctx, sql); err != nil { + return fmt.Errorf("creating partition %s: %w", partitionName, err) + } + logging.Infof("Created partition %s", partitionName) + } + + if _, err := p.pool.Exec(ctx, "CREATE TABLE IF NOT EXISTS job_default PARTITION OF job DEFAULT"); err != nil { + return fmt.Errorf("creating default partition: %w", err) + } + logging.Info("Created default partition job_default") + + return nil +} diff --git a/internal/broadside/db/postgres.go b/internal/broadside/db/postgres.go index 9571b5703b9..09ce062a56d 100644 --- a/internal/broadside/db/postgres.go +++ b/internal/broadside/db/postgres.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "regexp" "strings" "sync" "time" @@ -35,6 +36,7 @@ type PostgresDatabase struct { features broadsideconfiguration.FeatureToggles tuningSQLStatements []string tuningRevertSQLStatements []string + jobAgeDays []int pool *pgxpool.Pool lookoutDb *lookoutdb.LookoutDb jobsRepository *repository.SqlGetJobsRepository @@ -52,12 +54,13 @@ type PostgresDatabase struct { // - password: database password // - dbname: database name (e.g., "broadside_test") // - sslmode: SSL mode (e.g., "disable") -func NewPostgresDatabase(config map[string]string, features broadsideconfiguration.FeatureToggles, tuningSQLStatements []string, tuningRevertSQLStatements []string) *PostgresDatabase { +func NewPostgresDatabase(config map[string]string, features broadsideconfiguration.FeatureToggles, tuningSQLStatements []string, tuningRevertSQLStatements []string, jobAgeDays []int) *PostgresDatabase { return &PostgresDatabase{ config: config, features: features, tuningSQLStatements: tuningSQLStatements, tuningRevertSQLStatements: tuningRevertSQLStatements, + jobAgeDays: jobAgeDays, } } @@ -87,17 +90,30 @@ func (p *PostgresDatabase) InitialiseSchema(ctx context.Context) error { return fmt.Errorf("applying migrations: %w", err) } - if err := p.applyTuningSQL(ctx); err != nil { - pool.Close() - return fmt.Errorf("applying tuning SQL: %w", err) - } - if p.features.HotColdSplit { if _, err := p.pool.Exec(ctx, hotColdMigrationSQL); err != nil { pool.Close() return fmt.Errorf("applying hot/cold split migration: %w", err) } logging.Info("Hot/cold split migration applied") + } else if p.features.PartitionBySubmitted { + if err := p.applyPartitionMigration(ctx, p.jobAgeDays); err != nil { + pool.Close() + return fmt.Errorf("applying partition-by-submitted migration: %w", err) + } + logging.Info("Partition-by-submitted migration applied") + } + + if p.features.PartitionBySubmitted { + if err := p.applyTuningSQLToPartitions(ctx); err != nil { + pool.Close() + return fmt.Errorf("applying tuning SQL to partitions: %w", err) + } + } else { + if err := p.applyTuningSQL(ctx); err != nil { + pool.Close() + return fmt.Errorf("applying tuning SQL: %w", err) + } } decompressor := &compress.NoOpDecompressor{} @@ -137,6 +153,119 @@ func (p *PostgresDatabase) revertTuningSQL(ctx context.Context) error { return nil } +// applyTuningSQLToPartitions applies tuning SQL to leaf partitions of the +// job table rather than the partitioned parent (which doesn't support storage +// parameters). Statements targeting other tables are applied as-is. +func (p *PostgresDatabase) applyTuningSQLToPartitions(ctx context.Context) error { + return p.execTuningSQLOnPartitions(ctx, p.tuningSQLStatements, "applying") +} + +// revertTuningSQLFromPartitions mirrors applyTuningSQLToPartitions for the +// revert path. +func (p *PostgresDatabase) revertTuningSQLFromPartitions(ctx context.Context) error { + return p.execTuningSQLOnPartitions(ctx, p.tuningRevertSQLStatements, "reverting") +} + +// execTuningSQLOnPartitions executes tuning SQL statements, rewriting any that +// target "ALTER TABLE job " to run against each leaf partition instead of the +// partitioned parent (which doesn't support storage parameters). +func (p *PostgresDatabase) execTuningSQLOnPartitions(ctx context.Context, stmts []string, verb string) error { + partitions, err := p.listJobPartitions(ctx) + if err != nil { + return err + } + + for i, stmt := range stmts { + if targetsJobTable(stmt) { + for _, part := range partitions { + partStmt := replaceJobTable(stmt, part) + if _, err := p.pool.Exec(ctx, partStmt); err != nil { + return fmt.Errorf("%s tuning SQL statement %d on partition %s: %w", verb, i+1, part, err) + } + } + logging.Infof("%s tuning SQL statement %d: applied to %d partitions", capitalize(verb), i+1, len(partitions)) + } else { + if _, err := p.pool.Exec(ctx, stmt); err != nil { + return fmt.Errorf("%s tuning SQL statement %d: %w", verb, i+1, err) + } + logging.Infof("%s tuning SQL statement %d", capitalize(verb), i+1) + } + } + return nil +} + +// listJobPartitions returns the names of all leaf partitions of the job table. +func (p *PostgresDatabase) listJobPartitions(ctx context.Context) ([]string, error) { + rows, err := p.pool.Query(ctx, ` + SELECT c.relname FROM pg_inherits i + JOIN pg_class c ON c.oid = i.inhrelid + JOIN pg_class p ON p.oid = i.inhparent + WHERE p.relname = 'job' + ORDER BY c.relname`) + if err != nil { + return nil, fmt.Errorf("querying job partitions: %w", err) + } + defer rows.Close() + + var partitions []string + for rows.Next() { + var name string + if err := rows.Scan(&name); err != nil { + return nil, fmt.Errorf("scanning partition name: %w", err) + } + partitions = append(partitions, name) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterating job partitions: %w", err) + } + return partitions, nil +} + +// targetsJobTable returns true if the SQL statement is an ALTER TABLE targeting +// the "job" table specifically (not job_run, job_spec, etc.). +// Line comments (-- ...) are stripped before matching to avoid false positives +// from table names mentioned in comments. +func targetsJobTable(stmt string) bool { + lower := stripLineComments(strings.ToLower(stmt)) + idx := strings.Index(lower, "alter table ") + if idx == -1 { + return false + } + after := lower[idx+len("alter table "):] + return strings.HasPrefix(after, "job ") || strings.HasPrefix(after, "job\n") || strings.HasPrefix(after, "job\t") +} + +// stripLineComments removes SQL line comments (-- to end of line) from s. +func stripLineComments(s string) string { + var b strings.Builder + for { + i := strings.Index(s, "--") + if i == -1 { + b.WriteString(s) + break + } + b.WriteString(s[:i]) + rest := s[i:] + if j := strings.Index(rest, "\n"); j != -1 { + s = rest[j:] // keep the newline so line structure is preserved + } else { + break + } + } + return b.String() +} + +// alterTableJobRe matches "ALTER TABLE job " case-insensitively so that the +// table name can be rewritten to a specific partition name without accidentally +// matching "job" inside SQL comments or in unrelated table names. +var alterTableJobRe = regexp.MustCompile(`(?i)ALTER TABLE job `) + +// replaceJobTable rewrites the "ALTER TABLE job " phrase in stmt to target the +// named partition instead, handling any casing of the original keyword. +func replaceJobTable(stmt, partition string) string { + return alterTableJobRe.ReplaceAllLiteralString(stmt, "ALTER TABLE "+partition+" ") +} + // ExecuteIngestionQueryBatch executes a batch of ingestion queries using the // same sequential phase ordering as the production Lookout ingester: // @@ -155,6 +284,10 @@ func (p *PostgresDatabase) revertTuningSQL(ctx context.Context) error { // wins) before being sent, preventing undefined behaviour when duplicate IDs // appear in the UPDATE … FROM temp-table pattern. func (p *PostgresDatabase) ExecuteIngestionQueryBatch(ctx context.Context, queries []IngestionQuery) error { + if p.features.PartitionBySubmitted { + return p.executePartitionedBatch(ctx, queries) + } + set, err := queriesToInstructionSet(queries) if err != nil { return err @@ -262,8 +395,14 @@ func (p *PostgresDatabase) GetJobGroups(ctx *context.Context, filters []*model.F // truncated and the hot/cold migration is reverted so the schema is left // in its original state. func (p *PostgresDatabase) TearDown(ctx context.Context) error { - if err := p.revertTuningSQL(ctx); err != nil { - return fmt.Errorf("reverting tuning SQL: %w", err) + if p.features.PartitionBySubmitted { + if err := p.revertTuningSQLFromPartitions(ctx); err != nil { + return fmt.Errorf("reverting tuning SQL from partitions: %w", err) + } + } else { + if err := p.revertTuningSQL(ctx); err != nil { + return fmt.Errorf("reverting tuning SQL: %w", err) + } } tables := []string{ @@ -289,6 +428,11 @@ func (p *PostgresDatabase) TearDown(ctx context.Context) error { return fmt.Errorf("reverting hot/cold split migration: %w", err) } logging.Info("Hot/cold split migration reverted") + } else if p.features.PartitionBySubmitted { + if _, err := p.pool.Exec(ctx, partitionRevertSQL); err != nil { + return fmt.Errorf("reverting partition migration: %w", err) + } + logging.Info("Partition-by-submitted migration reverted") } return nil @@ -420,6 +564,12 @@ func (p *PostgresDatabase) insertHistoricalJobChunkWithRetry(ctx context.Context // insertHistoricalJobChunk inserts a single chunk [startIdx, lastIdx] of // historical jobs in one transaction using server-side generate_series. func (p *PostgresDatabase) insertHistoricalJobChunk(ctx context.Context, params HistoricalJobsParams, startIdx, lastIdx int) error { + if p.features.PartitionBySubmitted { + return p.insertHistoricalJobChunkPartitioned(ctx, params, startIdx, lastIdx) + } + if len(params.JobAgeDays) == 0 { + return nil + } prefix := fmt.Sprintf("%04d%04d", params.QueueIdx, params.JobSetIdx) succeeded := params.SucceededThreshold errored := params.ErroredThreshold @@ -432,12 +582,18 @@ func (p *PostgresDatabase) insertHistoricalJobChunk(ctx context.Context, params poolArr := stringSliceToSQL(jobspec.PoolOptions) nsArr := stringSliceToSQL(jobspec.NamespaceOptions) pcArr := stringSliceToSQL(jobspec.PriorityClassOptions) + ageArr := intSliceToSQL(params.JobAgeDays) + ageLen := len(params.JobAgeDays) jobTable := "job" if p.features.HotColdSplit { jobTable = "job_historical" } + // The LATERAL subquery computes a per-row base_time from the jobAgeDays + // array, distributing jobs evenly across age buckets (job i → bucket + // i%%len(jobAgeDays)). This replaces the previous hardcoded + // NOW() - INTERVAL '24 hours'. jobSQL := fmt.Sprintf(` INSERT INTO %s ( job_id, queue, owner, namespace, jobset, @@ -457,23 +613,24 @@ SELECT (%s)[i%%%d+1], (%s)[i%%%d+1], (i%%2000)+1, - NOW() - INTERVAL '24 hours', + t.base_time, CASE WHEN i%%1000 < %d THEN %d WHEN i%%1000 < %d THEN %d WHEN i%%1000 < %d THEN %d ELSE %d END, - NOW() - INTERVAL '24 hours' + INTERVAL '10 seconds', - EXTRACT(EPOCH FROM NOW() - INTERVAL '24 hours' + INTERVAL '10 seconds')::bigint, + t.base_time + INTERVAL '10 seconds', + EXTRACT(EPOCH FROM t.base_time + INTERVAL '10 seconds')::bigint, (%s)[i%%%d+1], %s, '%s' || lpad(i::text, 10, '0') || '00', CASE WHEN i%%1000 >= %d AND i%%1000 < %d - THEN NOW() - INTERVAL '24 hours' + INTERVAL '10 seconds' END, + THEN t.base_time + INTERVAL '10 seconds' END, CASE WHEN i%%1000 >= %d AND i%%1000 < %d THEN 'user requested' END, CASE WHEN i%%1000 >= %d AND i%%1000 < %d THEN '%s' END -FROM generate_series(%d, %d) AS i`, +FROM generate_series(%d, %d) AS i, +LATERAL (SELECT NOW() - (%s)[i%%%d+1] * INTERVAL '1 day' AS base_time) AS t`, jobTable, prefix, params.QueueName, params.QueueName, @@ -494,6 +651,7 @@ FROM generate_series(%d, %d) AS i`, errored, cancelled, errored, cancelled, params.QueueName, startIdx, lastIdx, + ageArr, ageLen, ) jobSpecSQL := fmt.Sprintf(` @@ -513,10 +671,10 @@ SELECT '%s' || lpad(i::text, 10, '0'), 'broadside-cluster-' || (i%%40+1), 'broadside-cluster-' || (i%%40+1) || '-node-' || (i%%80+1), - NOW() - INTERVAL '24 hours' + INTERVAL '1 second', - NOW() - INTERVAL '24 hours' + INTERVAL '2 seconds', - NOW() - INTERVAL '24 hours' + INTERVAL '3 seconds', - NOW() - INTERVAL '24 hours' + INTERVAL '10 seconds', + t.base_time + INTERVAL '1 second', + t.base_time + INTERVAL '2 seconds', + t.base_time + INTERVAL '3 seconds', + t.base_time + INTERVAL '10 seconds', (%s)[i%%%d+1], CASE WHEN i%%1000 < %d THEN 3 WHEN i%%1000 < %d THEN 4 @@ -527,7 +685,8 @@ SELECT END, CASE WHEN i%%1000 >= %d AND i%%1000 < %d THEN $2::bytea END, CASE WHEN i%%1000 < %d THEN 0 END -FROM generate_series(%d, %d) AS i`, +FROM generate_series(%d, %d) AS i, +LATERAL (SELECT NOW() - (%s)[i%%%d+1] * INTERVAL '1 day' AS base_time) AS t`, prefix, prefix, poolArr, len(jobspec.PoolOptions), succeeded, errored, cancelled, @@ -536,6 +695,7 @@ FROM generate_series(%d, %d) AS i`, succeeded, errored, errored, startIdx, lastIdx, + ageArr, ageLen, ) jobErrorSQL := fmt.Sprintf(` @@ -574,6 +734,14 @@ WHERE i%%1000 >= %d AND i%%1000 < %d`, return nil } +func intSliceToSQL(vals []int) string { + parts := make([]string, len(vals)) + for i, v := range vals { + parts[i] = fmt.Sprintf("%d", v) + } + return "ARRAY[" + strings.Join(parts, ",") + "]" +} + func int64SliceToSQL(vals []int64) string { parts := make([]string, len(vals)) for i, v := range vals { @@ -590,6 +758,13 @@ func stringSliceToSQL(vals []string) string { return "ARRAY[" + strings.Join(parts, ",") + "]" } +func capitalize(s string) string { + if s == "" { + return s + } + return strings.ToUpper(s[:1]) + s[1:] +} + func buildAnnotationSQL() string { parts := make([]string, 0, len(jobspec.AnnotationConfigs)*2) for _, ac := range jobspec.AnnotationConfigs { diff --git a/internal/broadside/db/postgres_historical_test.go b/internal/broadside/db/postgres_historical_test.go index 2c4d1a9c251..e1e77563f36 100644 --- a/internal/broadside/db/postgres_historical_test.go +++ b/internal/broadside/db/postgres_historical_test.go @@ -30,7 +30,7 @@ func postgresConfig(t *testing.T) map[string]string { func TestPostgresDatabase_PopulateHistoricalJobs_JobCount(t *testing.T) { cfg := postgresConfig(t) - pg := db.NewPostgresDatabase(cfg, configuration.FeatureToggles{}, nil, nil) + pg := db.NewPostgresDatabase(cfg, configuration.FeatureToggles{}, nil, nil, nil) ctx := context.Background() require.NoError(t, pg.InitialiseSchema(ctx)) defer func() { _ = pg.TearDown(ctx) }() @@ -62,7 +62,7 @@ func TestPostgresDatabase_PopulateHistoricalJobs_JobCount(t *testing.T) { func TestPostgresDatabase_PopulateHistoricalJobs_StateDistribution(t *testing.T) { cfg := postgresConfig(t) - pg := db.NewPostgresDatabase(cfg, configuration.FeatureToggles{}, nil, nil) + pg := db.NewPostgresDatabase(cfg, configuration.FeatureToggles{}, nil, nil, nil) ctx := context.Background() require.NoError(t, pg.InitialiseSchema(ctx)) defer func() { _ = pg.TearDown(ctx) }() @@ -103,7 +103,7 @@ func TestPostgresDatabase_PopulateHistoricalJobs_StateDistribution(t *testing.T) func TestPostgresDatabase_PopulateHistoricalJobs_Chunked(t *testing.T) { cfg := postgresConfig(t) - pg := db.NewPostgresDatabase(cfg, configuration.FeatureToggles{}, nil, nil) + pg := db.NewPostgresDatabase(cfg, configuration.FeatureToggles{}, nil, nil, nil) ctx := context.Background() require.NoError(t, pg.InitialiseSchema(ctx)) defer func() { _ = pg.TearDown(ctx) }() @@ -143,7 +143,7 @@ func TestPostgresDatabase_PopulateHistoricalJobs_Chunked(t *testing.T) { func TestPostgresDatabase_PopulateHistoricalJobs_Resume(t *testing.T) { cfg := postgresConfig(t) - pg := db.NewPostgresDatabase(cfg, configuration.FeatureToggles{}, nil, nil) + pg := db.NewPostgresDatabase(cfg, configuration.FeatureToggles{}, nil, nil, nil) ctx := context.Background() require.NoError(t, pg.InitialiseSchema(ctx)) defer func() { _ = pg.TearDown(ctx) }() @@ -188,7 +188,7 @@ func TestPostgresDatabase_InitialiseSchema_ExecutesTuningSQLWithoutError(t *test tuningSQLStatements := []string{ "ALTER TABLE job SET (autovacuum_vacuum_scale_factor = 0.01)", } - pg := db.NewPostgresDatabase(cfg, configuration.FeatureToggles{}, tuningSQLStatements, nil) + pg := db.NewPostgresDatabase(cfg, configuration.FeatureToggles{}, tuningSQLStatements, nil, nil) ctx := context.Background() require.NoError(t, pg.InitialiseSchema(ctx)) defer func() { _ = pg.TearDown(ctx) }() @@ -203,7 +203,7 @@ func TestPostgresDatabase_TearDown_ExecutesTuningRevertSQLWithoutError(t *testin revertSQLStatements := []string{ "ALTER TABLE job RESET (autovacuum_vacuum_scale_factor)", } - pg := db.NewPostgresDatabase(cfg, configuration.FeatureToggles{}, nil, revertSQLStatements) + pg := db.NewPostgresDatabase(cfg, configuration.FeatureToggles{}, nil, revertSQLStatements, nil) ctx := context.Background() require.NoError(t, pg.InitialiseSchema(ctx)) defer pg.Close() diff --git a/internal/broadside/db/postgres_partitioned.go b/internal/broadside/db/postgres_partitioned.go new file mode 100644 index 00000000000..6132938645a --- /dev/null +++ b/internal/broadside/db/postgres_partitioned.go @@ -0,0 +1,465 @@ +package db + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/jackc/pgx/v5" + + "github.com/armadaproject/armada/internal/broadside/jobspec" + "github.com/armadaproject/armada/internal/common/armadacontext" + "github.com/armadaproject/armada/internal/common/database/lookout" + lookoutmodel "github.com/armadaproject/armada/internal/lookoutingester/model" +) + +// executePartitionedBatch executes a batch of ingestion queries using direct +// SQL that includes the submitted column. This bypasses LookoutDb for all +// operations except job_run updates (which don't need submitted in WHERE). +// +// The submitted column is required because: +// - The job table PK is (job_id, submitted), so UPDATEs need it for partition pruning +// - Child tables (job_run, job_spec, job_error) have a NOT NULL submitted column +// +// This function is NOT idempotent. Phase 1 uses CopyFrom (COPY protocol) which +// has no ON CONFLICT support. Retrying after a partial failure will produce +// duplicate-key errors. Do not add retry logic around this call. +func (p *PostgresDatabase) executePartitionedBatch(ctx context.Context, queries []IngestionQuery) error { + submittedMap := buildJobSubmittedMap(queries) + set, err := queriesToInstructionSet(queries) + if err != nil { + return err + } + + // Phase 1: Create jobs, then job specs (job_spec.job_id references job.job_id, + // so specs must be inserted after jobs are committed). + if err := p.createJobsPartitioned(ctx, set.JobsToCreate); err != nil { + return fmt.Errorf("creating jobs (partitioned): %w", err) + } + if err := p.createJobSpecsPartitioned(ctx, set.JobsToCreate, submittedMap); err != nil { + return fmt.Errorf("creating job specs (partitioned): %w", err) + } + + // Phase 2: Update jobs, create job_runs, create job_errors (parallel). + var wg sync.WaitGroup + var updateErr, runErr, errErr error + wg.Go(func() { updateErr = p.updateJobsPartitioned(ctx, set.JobsToUpdate, submittedMap) }) + wg.Go(func() { runErr = p.createJobRunsPartitioned(ctx, set.JobRunsToCreate, submittedMap) }) + wg.Go(func() { errErr = p.createJobErrorsPartitioned(ctx, set.JobErrorsToCreate, submittedMap) }) + wg.Wait() + if updateErr != nil { + return fmt.Errorf("updating jobs (partitioned): %w", updateErr) + } + if runErr != nil { + return fmt.Errorf("creating job runs (partitioned): %w", runErr) + } + if errErr != nil { + return fmt.Errorf("creating job errors (partitioned): %w", errErr) + } + + // Phase 3: Update job_runs. The job_run table is not partitioned and the + // UPDATE WHERE clause uses run_id only, so LookoutDb works here. + armadaCtx := armadacontext.FromGrpcCtx(ctx) + p.lookoutDb.UpdateJobRuns(armadaCtx, set.JobRunsToUpdate) + + return nil +} + +// buildJobSubmittedMap extracts the submitted time for each job ID from the +// query batch. Each query type carries a Submitted field; the first seen +// value for each job ID wins (they should all be consistent). +func buildJobSubmittedMap(queries []IngestionQuery) map[string]time.Time { + m := make(map[string]time.Time) + for _, q := range queries { + switch v := q.(type) { + case InsertJob: + setIfAbsent(m, v.Job.JobID, v.Job.Submitted) + case SetJobLeased: + setIfAbsent(m, v.JobID, v.Submitted) + case InsertJobRun: + setIfAbsent(m, v.JobID, v.Submitted) + case SetJobPending: + setIfAbsent(m, v.JobID, v.Submitted) + case SetJobRunning: + setIfAbsent(m, v.JobID, v.Submitted) + case SetJobSucceeded: + setIfAbsent(m, v.JobID, v.Submitted) + case SetJobErrored: + setIfAbsent(m, v.JobID, v.Submitted) + case SetJobCancelled: + setIfAbsent(m, v.JobID, v.Submitted) + case SetJobPreempted: + setIfAbsent(m, v.JobID, v.Submitted) + case SetJobRejected: + setIfAbsent(m, v.JobID, v.Submitted) + case UpdateJobPriority: + setIfAbsent(m, v.JobID, v.Submitted) + case InsertJobError: + setIfAbsent(m, v.JobID, v.Submitted) + } + } + return m +} + +func setIfAbsent(m map[string]time.Time, key string, val time.Time) { + if _, ok := m[key]; !ok { + m[key] = val + } +} + +// createJobsPartitioned inserts jobs via the partitioned parent table, allowing +// PostgreSQL's partition routing to direct each row to the correct leaf partition +// (including job_default for any dates outside the pre-created range). +func (p *PostgresDatabase) createJobsPartitioned(ctx context.Context, instructions []*lookoutmodel.CreateJobInstruction) error { + if len(instructions) == 0 { + return nil + } + + columns := []string{ + "job_id", "queue", "owner", "namespace", "jobset", + "cpu", "memory", "ephemeral_storage", "gpu", "priority", + "submitted", "state", "last_transition_time", "last_transition_time_seconds", + "priority_class", "annotations", "job_spec", + } + + _, err := p.pool.CopyFrom(ctx, + pgx.Identifier{"job"}, + columns, + pgx.CopyFromSlice(len(instructions), func(i int) ([]interface{}, error) { + instr := instructions[i] + return []interface{}{ + instr.JobId, instr.Queue, instr.Owner, instr.Namespace, instr.JobSet, + instr.Cpu, instr.Memory, instr.EphemeralStorage, instr.Gpu, instr.Priority, + instr.Submitted, instr.State, instr.LastTransitionTime, instr.LastTransitionTimeSeconds, + instr.PriorityClass, instr.Annotations, instr.JobProto, + }, nil + }), + ) + if err != nil { + return fmt.Errorf("copying jobs into partitioned table: %w", err) + } + return nil +} + +// createJobSpecsPartitioned inserts job_spec rows with submitted. +func (p *PostgresDatabase) createJobSpecsPartitioned(ctx context.Context, instructions []*lookoutmodel.CreateJobInstruction, submittedMap map[string]time.Time) error { + if len(instructions) == 0 { + return nil + } + _, err := p.pool.CopyFrom(ctx, + pgx.Identifier{"job_spec"}, + []string{"job_id", "job_spec", "submitted"}, + pgx.CopyFromSlice(len(instructions), func(i int) ([]interface{}, error) { + instr := instructions[i] + return []interface{}{ + instr.JobId, instr.JobProto, submittedMap[instr.JobId], + }, nil + }), + ) + return err +} + +// createJobRunsPartitioned inserts job_run rows with submitted. +func (p *PostgresDatabase) createJobRunsPartitioned(ctx context.Context, instructions []*lookoutmodel.CreateJobRunInstruction, submittedMap map[string]time.Time) error { + if len(instructions) == 0 { + return nil + } + _, err := p.pool.CopyFrom(ctx, + pgx.Identifier{"job_run"}, + []string{"run_id", "job_id", "cluster", "node", "leased", "pool", "job_run_state", "submitted"}, + pgx.CopyFromSlice(len(instructions), func(i int) ([]interface{}, error) { + instr := instructions[i] + return []interface{}{ + instr.RunId, instr.JobId, instr.Cluster, instr.Node, + instr.Leased, instr.Pool, instr.JobRunState, + submittedMap[instr.JobId], + }, nil + }), + ) + return err +} + +// createJobErrorsPartitioned inserts job_error rows with submitted. +func (p *PostgresDatabase) createJobErrorsPartitioned(ctx context.Context, instructions []*lookoutmodel.CreateJobErrorInstruction, submittedMap map[string]time.Time) error { + if len(instructions) == 0 { + return nil + } + _, err := p.pool.CopyFrom(ctx, + pgx.Identifier{"job_error"}, + []string{"job_id", "error", "submitted"}, + pgx.CopyFromSlice(len(instructions), func(i int) ([]interface{}, error) { + instr := instructions[i] + return []interface{}{ + instr.JobId, instr.Error, submittedMap[instr.JobId], + }, nil + }), + ) + return err +} + +// updateJobsPartitioned updates job rows using a temp table with submitted +// in the WHERE clause for partition pruning. +func (p *PostgresDatabase) updateJobsPartitioned(ctx context.Context, instructions []*lookoutmodel.UpdateJobInstruction, submittedMap map[string]time.Time) error { + if len(instructions) == 0 { + return nil + } + + tx, err := p.pool.Begin(ctx) + if err != nil { + return fmt.Errorf("beginning transaction: %w", err) + } + defer func() { _ = tx.Rollback(ctx) }() + + tmpTable := "job_update_partitioned_tmp" + if _, err := tx.Exec(ctx, fmt.Sprintf(` + CREATE TEMPORARY TABLE %s ( + job_id varchar(32), + submitted timestamp, + priority bigint, + state smallint, + cancelled timestamp, + last_transition_time timestamp, + last_transition_time_seconds bigint, + duplicate bool, + latest_run_id varchar(36), + cancel_reason varchar(512), + cancel_user varchar(512) + ) ON COMMIT DROP`, tmpTable)); err != nil { + return fmt.Errorf("creating temp table: %w", err) + } + + if _, err := tx.CopyFrom(ctx, + pgx.Identifier{tmpTable}, + []string{ + "job_id", "submitted", "priority", "state", "cancelled", + "last_transition_time", "last_transition_time_seconds", + "duplicate", "latest_run_id", "cancel_reason", "cancel_user", + }, + pgx.CopyFromSlice(len(instructions), func(i int) ([]interface{}, error) { + instr := instructions[i] + return []interface{}{ + instr.JobId, submittedMap[instr.JobId], + instr.Priority, instr.State, instr.Cancelled, + instr.LastTransitionTime, instr.LastTransitionTimeSeconds, + instr.Duplicate, instr.LatestRunId, instr.CancelReason, instr.CancelUser, + }, nil + }), + ); err != nil { + return fmt.Errorf("copying to temp table: %w", err) + } + + if _, err := tx.Exec(ctx, fmt.Sprintf(` + UPDATE job + SET + priority = coalesce(tmp.priority, job.priority), + state = coalesce(tmp.state, job.state), + cancelled = coalesce(tmp.cancelled, job.cancelled), + last_transition_time = coalesce(tmp.last_transition_time, job.last_transition_time), + last_transition_time_seconds = coalesce(tmp.last_transition_time_seconds, job.last_transition_time_seconds), + duplicate = coalesce(tmp.duplicate, job.duplicate), + latest_run_id = coalesce(tmp.latest_run_id, job.latest_run_id), + cancel_reason = coalesce(tmp.cancel_reason, job.cancel_reason), + cancel_user = coalesce(tmp.cancel_user, job.cancel_user) + FROM %s AS tmp + WHERE tmp.job_id = job.job_id AND tmp.submitted = job.submitted`, tmpTable)); err != nil { + return fmt.Errorf("updating jobs from temp table: %w", err) + } + + return tx.Commit(ctx) +} + +// insertHistoricalJobChunkPartitioned inserts a chunk of historical jobs +// split per age bucket. Each bucket targets a single partition (fixed +// submitted value), avoiding cross-partition writes. +func (p *PostgresDatabase) insertHistoricalJobChunkPartitioned(ctx context.Context, params HistoricalJobsParams, startIdx, lastIdx int) error { + nBuckets := len(params.JobAgeDays) + for bucketIdx, ageDays := range params.JobAgeDays { + if err := p.insertHistoricalJobChunkForBucket(ctx, params, startIdx, lastIdx, bucketIdx, nBuckets, ageDays); err != nil { + return fmt.Errorf("inserting bucket %d (age %d days): %w", bucketIdx, ageDays, err) + } + } + return nil +} + +// insertHistoricalJobChunkForBucket inserts the subset of jobs in +// [startIdx, lastIdx] that belong to the given age bucket, using a fixed +// submitted value so all rows land in a single partition. +func (p *PostgresDatabase) insertHistoricalJobChunkForBucket( + ctx context.Context, params HistoricalJobsParams, + startIdx, lastIdx, bucketIdx, nBuckets, ageDays int, +) error { + prefix := fmt.Sprintf("%04d%04d", params.QueueIdx, params.JobSetIdx) + succeeded := params.SucceededThreshold + errored := params.ErroredThreshold + cancelled := params.CancelledThreshold + + cpuArr := int64SliceToSQL(jobspec.CpuOptions) + memArr := int64SliceToSQL(jobspec.MemoryOptions) + ephArr := int64SliceToSQL(jobspec.EphemeralStorageOptions) + gpuArr := int64SliceToSQL(jobspec.GpuOptions) + poolArr := stringSliceToSQL(jobspec.PoolOptions) + nsArr := stringSliceToSQL(jobspec.NamespaceOptions) + pcArr := stringSliceToSQL(jobspec.PriorityClassOptions) + + // Filter: only rows where i %% nBuckets = bucketIdx + bucketFilter := fmt.Sprintf("WHERE i%%%d = %d", nBuckets, bucketIdx) + + // Compute the partition date using the same logic as createDailyPartitions + // so we can insert directly into the named partition, bypassing the + // parent table's partition routing entirely. + baseTime := time.Now().Truncate(24*time.Hour).AddDate(0, 0, -ageDays) + baseTimeExpr := fmt.Sprintf("'%s'::timestamp", baseTime.Format("2006-01-02 15:04:05")) + partitionName := fmt.Sprintf("job_p%s", baseTime.Format("20060102")) + + jobSQL := fmt.Sprintf(` +INSERT INTO %s ( + job_id, queue, owner, namespace, jobset, + cpu, memory, ephemeral_storage, gpu, priority, + submitted, state, last_transition_time, last_transition_time_seconds, + priority_class, annotations, latest_run_id, + cancelled, cancel_reason, cancel_user +) +SELECT + '%s' || lpad(i::text, 10, '0'), + '%s', + '%s', + (%s)[i%%%d+1], + '%s', + (%s)[i%%%d+1], + (%s)[i%%%d+1], + (%s)[i%%%d+1], + (%s)[i%%%d+1], + (i%%2000)+1, + %s, + CASE WHEN i%%1000 < %d THEN %d + WHEN i%%1000 < %d THEN %d + WHEN i%%1000 < %d THEN %d + ELSE %d END, + %s + INTERVAL '10 seconds', + EXTRACT(EPOCH FROM %s + INTERVAL '10 seconds')::bigint, + (%s)[i%%%d+1], + %s, + '%s' || lpad(i::text, 10, '0') || '00', + CASE WHEN i%%1000 >= %d AND i%%1000 < %d + THEN %s + INTERVAL '10 seconds' END, + CASE WHEN i%%1000 >= %d AND i%%1000 < %d + THEN 'user requested' END, + CASE WHEN i%%1000 >= %d AND i%%1000 < %d + THEN '%s' END +FROM generate_series(%d, %d) AS i +%s +ON CONFLICT DO NOTHING`, + partitionName, + prefix, + params.QueueName, params.QueueName, + nsArr, len(jobspec.NamespaceOptions), + params.JobSetName, + cpuArr, len(jobspec.CpuOptions), + memArr, len(jobspec.MemoryOptions), + ephArr, len(jobspec.EphemeralStorageOptions), + gpuArr, len(jobspec.GpuOptions), + baseTimeExpr, + succeeded, lookout.JobSucceededOrdinal, + errored, lookout.JobFailedOrdinal, + cancelled, lookout.JobCancelledOrdinal, + lookout.JobPreemptedOrdinal, + baseTimeExpr, + baseTimeExpr, + pcArr, len(jobspec.PriorityClassOptions), + buildAnnotationSQL(), + prefix, + errored, cancelled, baseTimeExpr, + errored, cancelled, + errored, cancelled, params.QueueName, + startIdx, lastIdx, + bucketFilter, + ) + + jobSpecSQL := fmt.Sprintf(` +INSERT INTO job_spec (job_id, job_spec, submitted) +SELECT '%s' || lpad(i::text, 10, '0'), $1::bytea, %s +FROM generate_series(%d, %d) AS i +%s +ON CONFLICT DO NOTHING`, + prefix, baseTimeExpr, startIdx, lastIdx, bucketFilter, + ) + + jobRunSQL := fmt.Sprintf(` +INSERT INTO job_run ( + run_id, job_id, cluster, node, leased, pending, started, finished, + pool, job_run_state, error, debug, exit_code, submitted +) +SELECT + '%s' || lpad(i::text, 10, '0') || '00', + '%s' || lpad(i::text, 10, '0'), + 'broadside-cluster-' || (i%%40+1), + 'broadside-cluster-' || (i%%40+1) || '-node-' || (i%%80+1), + %s + INTERVAL '1 second', + %s + INTERVAL '2 seconds', + %s + INTERVAL '3 seconds', + %s + INTERVAL '10 seconds', + (%s)[i%%%d+1], + CASE WHEN i%%1000 < %d THEN 3 + WHEN i%%1000 < %d THEN 4 + WHEN i%%1000 < %d THEN 6 + ELSE 5 END, + CASE WHEN i%%1000 >= %d AND i%%1000 < %d THEN $1::bytea + WHEN i%%1000 >= %d THEN $3::bytea + END, + CASE WHEN i%%1000 >= %d AND i%%1000 < %d THEN $2::bytea END, + CASE WHEN i%%1000 < %d THEN 0 END, + %s +FROM generate_series(%d, %d) AS i +%s +ON CONFLICT DO NOTHING`, + prefix, prefix, + baseTimeExpr, baseTimeExpr, baseTimeExpr, baseTimeExpr, + poolArr, len(jobspec.PoolOptions), + succeeded, errored, cancelled, + succeeded, errored, + cancelled, + succeeded, errored, + errored, + baseTimeExpr, + startIdx, lastIdx, + bucketFilter, + ) + + jobErrorSQL := fmt.Sprintf(` +INSERT INTO job_error (job_id, error, submitted) +SELECT '%s' || lpad(i::text, 10, '0'), $1::bytea, %s +FROM generate_series(%d, %d) AS i +WHERE i%%1000 >= %d AND i%%1000 < %d AND i%%%d = %d +ON CONFLICT DO NOTHING`, + prefix, baseTimeExpr, startIdx, lastIdx, + succeeded, errored, nBuckets, bucketIdx, + ) + + tx, err := p.pool.Begin(ctx) + if err != nil { + return fmt.Errorf("beginning transaction: %w", err) + } + defer func() { _ = tx.Rollback(ctx) }() + + type stmtArgs struct { + sql string + args []any + } + stmts := []stmtArgs{ + {jobSQL, nil}, + {jobSpecSQL, []any{params.JobSpecBytes}}, + {jobRunSQL, []any{params.ErrorBytes, params.DebugBytes, params.PreemptionBytes}}, + {jobErrorSQL, []any{params.ErrorBytes}}, + } + for _, s := range stmts { + if _, err := tx.Exec(ctx, s.sql, s.args...); err != nil { + return fmt.Errorf("executing historical jobs SQL (bucket %d): %w", bucketIdx, err) + } + } + + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("committing transaction: %w", err) + } + return nil +} diff --git a/internal/broadside/db/postgres_unit_test.go b/internal/broadside/db/postgres_unit_test.go new file mode 100644 index 00000000000..16799f9c4b6 --- /dev/null +++ b/internal/broadside/db/postgres_unit_test.go @@ -0,0 +1,90 @@ +package db + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestTargetsJobTable(t *testing.T) { + tests := []struct { + name string + stmt string + expected bool + }{ + { + name: "standard lowercase", + stmt: "ALTER TABLE job SET (autovacuum_vacuum_scale_factor = 0.01)", + expected: true, + }, + { + name: "mixed case table name", + stmt: "ALTER TABLE JOB SET (autovacuum_vacuum_scale_factor = 0.01)", + expected: true, + }, + { + name: "job_run not matched", + stmt: "ALTER TABLE job_run SET (autovacuum_vacuum_scale_factor = 0.01)", + expected: false, + }, + { + name: "comment containing job before alter table", + stmt: "-- tune the job table\nALTER TABLE job SET (autovacuum_vacuum_scale_factor = 0.01)", + expected: true, + }, + { + name: "comment containing job but targets job_run", + stmt: "-- tune the job table\nALTER TABLE job_run SET (autovacuum_vacuum_scale_factor = 0.01)", + expected: false, + }, + { + name: "comment referencing alter table job_ before real alter table job", + stmt: "-- Tunes job table, see alter table job_run for the equivalent\nALTER TABLE job SET (autovacuum_vacuum_scale_factor = 0.01)", + expected: true, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expected, targetsJobTable(tc.stmt)) + }) + } +} + +func TestReplaceJobTable(t *testing.T) { + tests := []struct { + name string + stmt string + partition string + expected string + }{ + { + name: "standard lowercase", + stmt: "ALTER TABLE job SET (autovacuum_vacuum_scale_factor = 0.01)", + partition: "job_p2024", + expected: "ALTER TABLE job_p2024 SET (autovacuum_vacuum_scale_factor = 0.01)", + }, + { + name: "mixed case table name", + stmt: "ALTER TABLE JOB SET (autovacuum_vacuum_scale_factor = 0.01)", + partition: "job_p2024", + expected: "ALTER TABLE job_p2024 SET (autovacuum_vacuum_scale_factor = 0.01)", + }, + { + name: "comment containing job before alter table", + stmt: "-- tune the job table\nALTER TABLE job SET (autovacuum_vacuum_scale_factor = 0.01)", + partition: "job_p2024", + expected: "-- tune the job table\nALTER TABLE job_p2024 SET (autovacuum_vacuum_scale_factor = 0.01)", + }, + { + name: "reset statement", + stmt: "ALTER TABLE job RESET (autovacuum_vacuum_scale_factor)", + partition: "job_p2024", + expected: "ALTER TABLE job_p2024 RESET (autovacuum_vacuum_scale_factor)", + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expected, replaceJobTable(tc.stmt, tc.partition)) + }) + } +} diff --git a/internal/broadside/db/sql/partition_down.sql b/internal/broadside/db/sql/partition_down.sql new file mode 100644 index 00000000000..fbc9440511e --- /dev/null +++ b/internal/broadside/db/sql/partition_down.sql @@ -0,0 +1,86 @@ +-- partition_down.sql reverts the partition-by-submitted migration. +-- +-- It replaces the partitioned job table with an unpartitioned one and +-- removes the submitted column from child tables. No data is preserved: +-- the caller truncates all tables before running this. +-- +-- This file is embedded by internal/broadside/db/partition.go and executed +-- by TearDown when the PartitionBySubmitted feature toggle is enabled. + +BEGIN; + +-- Drop the partitioned job table (cascades to all partitions) +DROP TABLE IF EXISTS job CASCADE; + +-- Recreate unpartitioned job table matching the Lookout schema +CREATE TABLE job ( + job_id varchar(32) NOT NULL PRIMARY KEY, + queue varchar(512) NOT NULL, + owner varchar(512) NOT NULL, + jobset varchar(1024) NOT NULL, + cpu bigint NOT NULL, + memory bigint NOT NULL, + ephemeral_storage bigint NOT NULL, + gpu bigint NOT NULL, + priority bigint NOT NULL, + submitted timestamp NOT NULL, + cancelled timestamp NULL, + state smallint NOT NULL, + last_transition_time timestamp NOT NULL, + last_transition_time_seconds bigint NOT NULL, + job_spec bytea NULL, + duplicate bool NOT NULL DEFAULT false, + priority_class varchar(63) NULL, + latest_run_id varchar(36) NULL, + cancel_reason varchar(512) NULL, + namespace varchar(512) NULL, + annotations jsonb NOT NULL DEFAULT '{}'::jsonb, + external_job_uri varchar(1024) NULL, + cancel_user varchar(512) NULL +); + +ALTER TABLE job ALTER COLUMN job_spec SET STORAGE EXTERNAL; + +CREATE INDEX idx_job_queue ON job (queue); + +CREATE INDEX idx_job_queue_last_transition_time_seconds ON job ( + queue, + last_transition_time_seconds +); + +CREATE INDEX idx_job_jobset_last_transition_time_seconds ON job ( + jobset, + last_transition_time_seconds +); + +CREATE INDEX idx_job_queue_jobset_last_transition_time_seconds ON job ( + queue, + jobset, + last_transition_time_seconds +); + +CREATE INDEX idx_job_queue_jobset_state ON job (queue, jobset, state); + +CREATE INDEX idx_job_state ON job (state); + +CREATE INDEX idx_job_jobset_pattern ON job (jobset varchar_pattern_ops) WITH (fillfactor = 80); + +CREATE INDEX idx_job_ltt_jobid ON job (last_transition_time, job_id) WITH (fillfactor = 80); + +CREATE INDEX idx_job_active_queue_jobset ON job (queue, jobset) WITH (fillfactor = 80) +WHERE state IN (1, 2, 3, 8); + +CREATE INDEX idx_job_queue_namespace ON job (queue, namespace) WITH (fillfactor = 80); + +CREATE INDEX idx_job_latest_run_id ON job (latest_run_id) WITH (fillfactor = 80); + +CREATE INDEX idx_job_submitted ON job (submitted DESC); + +-- Remove submitted from child tables +ALTER TABLE job_run DROP COLUMN IF EXISTS submitted; + +ALTER TABLE job_spec DROP COLUMN IF EXISTS submitted; + +ALTER TABLE job_error DROP COLUMN IF EXISTS submitted; + +COMMIT; diff --git a/internal/broadside/db/sql/partition_up.sql b/internal/broadside/db/sql/partition_up.sql new file mode 100644 index 00000000000..37e762b98f8 --- /dev/null +++ b/internal/broadside/db/sql/partition_up.sql @@ -0,0 +1,128 @@ +-- partition_up.sql converts the job table into a range-partitioned table +-- on the submitted column with daily partitions. +-- +-- This file is embedded by internal/broadside/db/partition.go and executed +-- when the PartitionBySubmitted feature toggle is enabled. +-- +-- This runs against an empty database (immediately after Lookout migrations). +-- Daily partition CREATE statements and the DEFAULT partition are generated +-- dynamically in Go after this static migration completes. + +BEGIN; + +-- Step 1: Add submitted column to child tables (nullable initially, set NOT +-- NULL immediately since tables are empty at this point) +ALTER TABLE job_run +ADD COLUMN IF NOT EXISTS submitted timestamp NOT NULL DEFAULT '1970-01-01'; + +ALTER TABLE job_spec +ADD COLUMN IF NOT EXISTS submitted timestamp NOT NULL DEFAULT '1970-01-01'; + +ALTER TABLE job_error +ADD COLUMN IF NOT EXISTS submitted timestamp NOT NULL DEFAULT '1970-01-01'; + +-- Remove the defaults (they were only needed for the ALTER) +ALTER TABLE job_run ALTER COLUMN submitted DROP DEFAULT; + +ALTER TABLE job_spec ALTER COLUMN submitted DROP DEFAULT; + +ALTER TABLE job_error ALTER COLUMN submitted DROP DEFAULT; + +-- Step 2: Convert job to partitioned table +ALTER TABLE job RENAME TO job_unpartitioned; + +CREATE TABLE job ( + job_id varchar(32) NOT NULL, + queue varchar(512) NOT NULL, + owner varchar(512) NOT NULL, + jobset varchar(1024) NOT NULL, + cpu bigint NOT NULL, + memory bigint NOT NULL, + ephemeral_storage bigint NOT NULL, + gpu bigint NOT NULL, + priority bigint NOT NULL, + submitted timestamp NOT NULL, + cancelled timestamp NULL, + state smallint NOT NULL, + last_transition_time timestamp NOT NULL, + last_transition_time_seconds bigint NOT NULL, + job_spec bytea NULL, + duplicate bool NOT NULL DEFAULT false, + priority_class varchar(63) NULL, + latest_run_id varchar(36) NULL, + cancel_reason varchar(512) NULL, + namespace varchar(512) NULL, + annotations jsonb NOT NULL DEFAULT '{}'::jsonb, + external_job_uri varchar(1024) NULL, + cancel_user varchar(512) NULL, + PRIMARY KEY (job_id, submitted) +) PARTITION BY RANGE (submitted); + +-- Step 3: Drop old indexes so the names are free for the new table. +-- The old table (job_unpartitioned) is kept until Go code has created +-- partitions and moved the data across. +DROP INDEX IF EXISTS idx_job_queue; + +DROP INDEX IF EXISTS idx_job_queue_pattern_last_transition_time_seconds; + +DROP INDEX IF EXISTS idx_job_queue_last_transition_time_seconds; + +DROP INDEX IF EXISTS idx_job_jobset_last_transition_time_seconds; + +DROP INDEX IF EXISTS idx_job_queue_jobset_last_transition_time_seconds; + +DROP INDEX IF EXISTS idx_job_queue_jobset_state; + +DROP INDEX IF EXISTS idx_job_jobset_pattern; + +DROP INDEX IF EXISTS idx_job_state; + +DROP INDEX IF EXISTS idx_job_submitted; + +DROP INDEX IF EXISTS idx_job_ltt_jobid; + +DROP INDEX IF EXISTS idx_job_active_queue_jobset; + +DROP INDEX IF EXISTS idx_job_queue_namespace; + +DROP INDEX IF EXISTS idx_job_latest_run_id; + +-- Step 4: Create indexes on the partitioned parent (inherited by partitions) +CREATE INDEX idx_job_queue ON job (queue); + +CREATE INDEX idx_job_queue_last_transition_time_seconds ON job ( + queue, + last_transition_time_seconds +); + +CREATE INDEX idx_job_jobset_last_transition_time_seconds ON job ( + jobset, + last_transition_time_seconds +); + +CREATE INDEX idx_job_queue_jobset_last_transition_time_seconds ON job ( + queue, + jobset, + last_transition_time_seconds +); + +CREATE INDEX idx_job_queue_jobset_state ON job (queue, jobset, state); + +CREATE INDEX idx_job_state ON job (state); + +CREATE INDEX idx_job_jobset_pattern ON job (jobset varchar_pattern_ops) WITH (fillfactor = 80); + +CREATE INDEX idx_job_ltt_jobid ON job (last_transition_time, job_id) WITH (fillfactor = 80); + +CREATE INDEX idx_job_active_queue_jobset ON job (queue, jobset) WITH (fillfactor = 80) +WHERE state IN (1, 2, 3, 8); + +CREATE INDEX idx_job_queue_namespace ON job (queue, namespace) WITH (fillfactor = 80); + +CREATE INDEX idx_job_latest_run_id ON job (latest_run_id) WITH (fillfactor = 80); + +CREATE INDEX idx_job_submitted ON job (submitted DESC); + +ALTER TABLE job ALTER COLUMN job_spec SET STORAGE EXTERNAL; + +COMMIT; diff --git a/internal/broadside/ingester/ingester.go b/internal/broadside/ingester/ingester.go index 643a07087fe..c10d65e92ab 100644 --- a/internal/broadside/ingester/ingester.go +++ b/internal/broadside/ingester/ingester.go @@ -368,7 +368,13 @@ func (i *Ingester) runBatchExecutor( if !ok { stopAndDrainTimer() if len(batch) > 0 { - i.executeBatch(ctx, batch) + // Use a fresh context for the final flush. The parent ctx is + // cancelled at this point, but the batch still needs to commit + // cleanly — otherwise the Postgres backend will be mid-rollback + // when TearDown tries to TRUNCATE, blocking on lock acquisition. + flushCtx, flushCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer flushCancel() + i.executeBatch(flushCtx, batch) } return } @@ -454,6 +460,7 @@ func (i *Ingester) submitJob( newJob.JobID, "", jobspec.StateLeased, + now, )) } @@ -465,6 +472,7 @@ func (i *Ingester) processTransition( ) { now := time.Now() cfg := i.config.JobStateTransitionConfig + submitted := trans.Submitted() jobNumber := jobspec.ExtractJobNumber(trans.JobID()) cluster, node := jobspec.GetClusterNodeForJobNumber(jobNumber) @@ -474,20 +482,22 @@ func (i *Ingester) processTransition( runID := jobspec.EncodeRunID(trans.JobID(), 0) i.routerSend(router, timestampedQuery{ query: db.SetJobLeased{ - JobID: trans.JobID(), - Time: now, - RunID: runID, + JobID: trans.JobID(), + Time: now, + RunID: runID, + Submitted: submitted, }, enqueuedAt: now, }, ctx) i.routerSend(router, timestampedQuery{ query: db.InsertJobRun{ - JobRunID: runID, - JobID: trans.JobID(), - Cluster: cluster, - Node: node, - Pool: jobspec.GetPool(jobNumber), - Time: now, + JobRunID: runID, + JobID: trans.JobID(), + Cluster: cluster, + Node: node, + Pool: jobspec.GetPool(jobNumber), + Time: now, + Submitted: submitted, }, enqueuedAt: now, }, ctx) @@ -496,14 +506,16 @@ func (i *Ingester) processTransition( trans.JobID(), runID, jobspec.StatePending, + submitted, )) case jobspec.StatePending: i.routerSend(router, timestampedQuery{ query: db.SetJobPending{ - JobID: trans.JobID(), - Time: now, - RunID: trans.RunID(), + JobID: trans.JobID(), + Time: now, + RunID: trans.RunID(), + Submitted: submitted, }, enqueuedAt: now, }, ctx) @@ -519,6 +531,7 @@ func (i *Ingester) processTransition( trans.JobID(), trans.RunID(), jobspec.StateRunning, + submitted, )) case jobspec.StateRunning: @@ -527,6 +540,7 @@ func (i *Ingester) processTransition( JobID: trans.JobID(), Time: now, LatestRunID: trans.RunID(), + Submitted: submitted, }, enqueuedAt: now, }, ctx) @@ -545,6 +559,7 @@ func (i *Ingester) processTransition( trans.JobID(), trans.RunID(), jobspec.StateSucceeded, + submitted, )) } else { heap.Push(transitions, jobspec.NewScheduledTransition( @@ -552,14 +567,16 @@ func (i *Ingester) processTransition( trans.JobID(), trans.RunID(), jobspec.StateErrored, + submitted, )) } case jobspec.StateSucceeded: i.routerSend(router, timestampedQuery{ query: db.SetJobSucceeded{ - JobID: trans.JobID(), - Time: now, + JobID: trans.JobID(), + Time: now, + Submitted: submitted, }, enqueuedAt: now, }, ctx) @@ -574,8 +591,9 @@ func (i *Ingester) processTransition( case jobspec.StateErrored: i.routerSend(router, timestampedQuery{ query: db.SetJobErrored{ - JobID: trans.JobID(), - Time: now, + JobID: trans.JobID(), + Time: now, + Submitted: submitted, }, enqueuedAt: now, }, ctx) @@ -590,8 +608,9 @@ func (i *Ingester) processTransition( }, ctx) i.routerSend(router, timestampedQuery{ query: db.InsertJobError{ - JobID: trans.JobID(), - Error: simulatedError, + JobID: trans.JobID(), + Error: simulatedError, + Submitted: submitted, }, enqueuedAt: now, }, ctx) diff --git a/internal/broadside/jobspec/state.go b/internal/broadside/jobspec/state.go index 372182de2bc..adf2b79350f 100644 --- a/internal/broadside/jobspec/state.go +++ b/internal/broadside/jobspec/state.go @@ -26,25 +26,28 @@ const ( ) type ScheduledTransition struct { - time time.Time - jobID string - runID string - toState JobState + time time.Time + jobID string + runID string + toState JobState + submitted time.Time } -func NewScheduledTransition(t time.Time, jobID, runID string, toState JobState) ScheduledTransition { +func NewScheduledTransition(t time.Time, jobID, runID string, toState JobState, submitted time.Time) ScheduledTransition { return ScheduledTransition{ - time: t, - jobID: jobID, - runID: runID, - toState: toState, + time: t, + jobID: jobID, + runID: runID, + toState: toState, + submitted: submitted, } } -func (s ScheduledTransition) Time() time.Time { return s.time } -func (s ScheduledTransition) JobID() string { return s.jobID } -func (s ScheduledTransition) RunID() string { return s.runID } -func (s ScheduledTransition) ToState() JobState { return s.toState } +func (s ScheduledTransition) Time() time.Time { return s.time } +func (s ScheduledTransition) JobID() string { return s.jobID } +func (s ScheduledTransition) RunID() string { return s.runID } +func (s ScheduledTransition) ToState() JobState { return s.toState } +func (s ScheduledTransition) Submitted() time.Time { return s.submitted } type TransitionHeap []ScheduledTransition diff --git a/internal/broadside/orchestrator/runner.go b/internal/broadside/orchestrator/runner.go index 851fcc06f30..75b66a9c0cf 100644 --- a/internal/broadside/orchestrator/runner.go +++ b/internal/broadside/orchestrator/runner.go @@ -252,11 +252,24 @@ func (r *Runner) newDatabase() (db.Database, error) { func NewDatabase(config configuration.TestConfig) (db.Database, error) { switch { case len(config.DatabaseConfig.Postgres) > 0: + var allJobAgeDays []int + seen := make(map[int]struct{}) + for _, qc := range config.QueueConfig { + for _, jsc := range qc.JobSetConfig { + for _, d := range jsc.HistoricalJobsConfig.JobAgeDays { + if _, ok := seen[d]; !ok { + allJobAgeDays = append(allJobAgeDays, d) + seen[d] = struct{}{} + } + } + } + } return db.NewPostgresDatabase( config.DatabaseConfig.Postgres, config.FeatureToggles, config.DatabaseConfig.PostgresTuningSQL, config.DatabaseConfig.PostgresTuningRevertSQL, + allJobAgeDays, ), nil case len(config.DatabaseConfig.ClickHouse) > 0: return db.NewClickHouseDatabase(config.DatabaseConfig.ClickHouse), nil