Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/broadside/configs/examples/test-postgres.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
testDuration: 60s
warmupDuration: 10s

skipTearDown: false

featureToggles:
partitionBySubmitted: false
hotColdSplit: false

databaseConfig:
postgres:
host: localhost
Expand Down
16 changes: 10 additions & 6 deletions internal/broadside/actions/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,9 @@ func (a *Actor) executeReprioritisation(ctx context.Context, queue string, jobSe
queries := make([]db.IngestionQuery, 0, len(jobs))
for _, job := range jobs {
queries = append(queries, db.UpdateJobPriority{
JobID: job.JobID,
Priority: jobspec.PriorityValues,
JobID: job.JobID,
Priority: jobspec.PriorityValues,
Submitted: job.Submitted,
})
}

Expand Down Expand Up @@ -230,6 +231,7 @@ func (a *Actor) executeCancellation(ctx context.Context, queue string, jobSet st
Time: now,
CancelReason: "Bulk cancellation via Broadside test",
CancelUser: "broadside-test",
Submitted: job.Submitted,
})
}

Expand Down Expand Up @@ -257,8 +259,9 @@ func (a *Actor) executeCancellation(ctx context.Context, queue string, jobSet st

// jobInfo holds minimal information about a job needed for actions.
type jobInfo struct {
JobID string
State string
JobID string
State string
Submitted time.Time
}

// getActiveJobs queries the database for active jobs in the specified queue and job set.
Expand Down Expand Up @@ -302,8 +305,9 @@ func (a *Actor) getActiveJobs(ctx context.Context, queue string, jobSet string)
result := make([]jobInfo, 0, len(jobs))
for _, job := range jobs {
result = append(result, jobInfo{
JobID: job.JobId,
State: job.State,
JobID: job.JobId,
State: job.State,
Submitted: job.Submitted,
})
}

Expand Down
3 changes: 2 additions & 1 deletion internal/broadside/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ type TestConfig struct {
}

type FeatureToggles struct {
HotColdSplit bool `yaml:"hotColdSplit,omitempty"`
HotColdSplit bool `yaml:"hotColdSplit,omitempty"`
PartitionBySubmitted bool `yaml:"partitionBySubmitted,omitempty"`
}

type DatabaseConfig struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/broadside/configuration/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ job submissions, state transitions, and query patterns.
The main configuration type is TestConfig, which defines:

- Test duration and warmup period
- Database configuration (Postgres or ClickHouse connection parameters, optional Postgres tuning and revert SQL)
- Database configuration (Postgres or ClickHouse connection parameters, optional Postgres tuning and revert SQL, optional partition-by-submitted toggle)
- Queue configuration (queue/jobset distribution and historical job setup)
- Ingestion configuration (job submission rates and state transitions)
- Query configuration (rates for different query types)
Expand Down
3 changes: 3 additions & 0 deletions internal/broadside/configuration/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ const proportionTolerance = 0.001 // Tolerance for floating point comparison

// Validate validates the Test configuration.
func (t *TestConfig) Validate() error {
if t.FeatureToggles.HotColdSplit && t.FeatureToggles.PartitionBySubmitted {
return fmt.Errorf("feature toggles are mutually exclusive: only one of hotColdSplit, partitionBySubmitted may be enabled")
}
if t.TestDuration <= 0 {
return fmt.Errorf("testDuration must be positive, got %v", t.TestDuration)
}
Expand Down
9 changes: 9 additions & 0 deletions internal/broadside/configuration/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ func TestTest_Validate(t *testing.T) {
modify: func(t *TestConfig) {},
wantErr: false,
},
{
name: "mutually exclusive feature toggles",
modify: func(t *TestConfig) {
t.FeatureToggles.HotColdSplit = true
t.FeatureToggles.PartitionBySubmitted = true
},
wantErr: true,
errText: "feature toggles are mutually exclusive",
},
{
name: "zero test duration",
modify: func(t *TestConfig) {
Expand Down
59 changes: 35 additions & 24 deletions internal/broadside/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,9 @@ type NewJob struct {
}

type UpdateJobPriority struct {
JobID string
Priority int64
JobID string
Priority int64
Submitted time.Time
}

func (UpdateJobPriority) isIngestionQuery() {}
Expand All @@ -139,41 +140,47 @@ type SetJobCancelled struct {
Time time.Time
CancelReason string
CancelUser string
Submitted time.Time
}

func (SetJobCancelled) isIngestionQuery() {}

type SetJobSucceeded struct {
JobID string
Time time.Time
JobID string
Time time.Time
Submitted time.Time
}

func (SetJobSucceeded) isIngestionQuery() {}

type InsertJobError struct {
JobID string
Error []byte
JobID string
Error []byte
Submitted time.Time
}

func (InsertJobError) isIngestionQuery() {}

type SetJobPreempted struct {
JobID string
Time time.Time
JobID string
Time time.Time
Submitted time.Time
}

func (SetJobPreempted) isIngestionQuery() {}

type SetJobRejected struct {
JobID string
Time time.Time
JobID string
Time time.Time
Submitted time.Time
}

func (SetJobRejected) isIngestionQuery() {}

type SetJobErrored struct {
JobID string
Time time.Time
JobID string
Time time.Time
Submitted time.Time
}

func (SetJobErrored) isIngestionQuery() {}
Expand All @@ -182,6 +189,7 @@ type SetJobRunning struct {
JobID string
Time time.Time
LatestRunID string
Submitted time.Time
}

func (SetJobRunning) isIngestionQuery() {}
Expand All @@ -195,9 +203,10 @@ type SetJobRunStarted struct {
func (SetJobRunStarted) isIngestionQuery() {}

type SetJobPending struct {
JobID string
Time time.Time
RunID string
JobID string
Time time.Time
RunID string
Submitted time.Time
}

func (SetJobPending) isIngestionQuery() {}
Expand Down Expand Up @@ -243,20 +252,22 @@ type SetJobRunPreempted struct {
func (SetJobRunPreempted) isIngestionQuery() {}

type SetJobLeased struct {
JobID string
Time time.Time
RunID string
JobID string
Time time.Time
RunID string
Submitted time.Time
}

func (SetJobLeased) isIngestionQuery() {}

type InsertJobRun struct {
JobRunID string
JobID string
Cluster string
Node string
Pool string
Time time.Time
JobRunID string
JobID string
Cluster string
Node string
Pool string
Time time.Time
Submitted time.Time
}

func (InsertJobRun) isIngestionQuery() {}
14 changes: 13 additions & 1 deletion internal/broadside/db/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ Three implementations are provided:
in a single statement — ensuring no terminal state is ever written to job.
Historical job population writes directly to job_historical when the toggle is
enabled.
When the PartitionBySubmitted feature toggle is enabled (mutually exclusive
with HotColdSplit), InitialiseSchema converts the job table into a
range-partitioned table on the submitted column (sql/partition_up.sql),
creates daily partitions covering all configured jobAgeDays plus a forward
buffer, and adds a DEFAULT partition. Child tables (job_run, job_spec,
job_error) gain a NOT NULL submitted column. During ingestion,
ExecuteIngestionQueryBatch bypasses LookoutDb entirely, using direct SQL that
includes submitted in INSERT columns and UPDATE WHERE clauses for partition
pruning. Historical job population splits per age bucket per chunk so that
each INSERT targets a single partition. TearDown drops the partitioned table,
recreates the original unpartitioned schema (sql/partition_down.sql), and
removes the submitted column from child tables.
- ClickHouseDatabase: ClickHouse adapter (placeholder implementation)
- MemoryDatabase: In-memory adapter for smoke-testing Broadside

Expand Down Expand Up @@ -138,7 +150,7 @@ Create a database instance using the appropriate constructor:
"host": "localhost",
"port": "5432",
"database": "lookout",
}, nil, nil)
}, configuration.FeatureToggles{}, nil, nil, nil)

// For ClickHouse
db := db.NewClickHouseDatabase(map[string]string{
Expand Down
18 changes: 9 additions & 9 deletions internal/broadside/db/historical.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,34 +69,34 @@ func buildHistoricalJobQueries(jobNum int, params HistoricalJobsParams) []Ingest

queries := []IngestionQuery{
InsertJob{Job: newJob, JobSpec: params.JobSpecBytes},
SetJobLeased{JobID: jobID, Time: leasedTime, RunID: runID},
InsertJobRun{JobRunID: runID, JobID: jobID, Cluster: cluster, Node: node, Pool: jobspec.GetPool(jobNum), Time: leasedTime},
SetJobPending{JobID: jobID, Time: pendingTime, RunID: runID},
SetJobLeased{JobID: jobID, Time: leasedTime, RunID: runID, Submitted: baseTime},
InsertJobRun{JobRunID: runID, JobID: jobID, Cluster: cluster, Node: node, Pool: jobspec.GetPool(jobNum), Time: leasedTime, Submitted: baseTime},
SetJobPending{JobID: jobID, Time: pendingTime, RunID: runID, Submitted: baseTime},
SetJobRunPending{JobRunID: runID, Time: pendingTime},
SetJobRunning{JobID: jobID, Time: runningTime, LatestRunID: runID},
SetJobRunning{JobID: jobID, Time: runningTime, LatestRunID: runID, Submitted: baseTime},
SetJobRunStarted{JobRunID: runID, Time: runningTime, Node: node},
}

switch historicalState(jobNum, params) {
case jobspec.StateSucceeded:
queries = append(queries,
SetJobSucceeded{JobID: jobID, Time: terminalTime},
SetJobSucceeded{JobID: jobID, Time: terminalTime, Submitted: baseTime},
SetJobRunSucceeded{JobRunID: runID, Time: terminalTime},
)
case jobspec.StateErrored:
queries = append(queries,
SetJobErrored{JobID: jobID, Time: terminalTime},
SetJobErrored{JobID: jobID, Time: terminalTime, Submitted: baseTime},
SetJobRunFailed{JobRunID: runID, Time: terminalTime, Error: params.ErrorBytes, Debug: params.DebugBytes},
InsertJobError{JobID: jobID, Error: params.ErrorBytes},
InsertJobError{JobID: jobID, Error: params.ErrorBytes, Submitted: baseTime},
)
case jobspec.StateCancelled:
queries = append(queries,
SetJobCancelled{JobID: jobID, Time: terminalTime, CancelReason: "user requested", CancelUser: params.QueueName},
SetJobCancelled{JobID: jobID, Time: terminalTime, CancelReason: "user requested", CancelUser: params.QueueName, Submitted: baseTime},
SetJobRunCancelled{JobRunID: runID, Time: terminalTime},
)
case jobspec.StatePreempted:
queries = append(queries,
SetJobPreempted{JobID: jobID, Time: terminalTime},
SetJobPreempted{JobID: jobID, Time: terminalTime, Submitted: baseTime},
SetJobRunPreempted{JobRunID: runID, Time: terminalTime, Error: params.PreemptionBytes},
)
}
Expand Down
98 changes: 98 additions & 0 deletions internal/broadside/db/partition.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package db

import (
"context"
_ "embed"
"fmt"
"time"

"github.com/armadaproject/armada/internal/common/logging"
)

//go:embed sql/partition_up.sql
var partitionMigrationSQL string

//go:embed sql/partition_down.sql
var partitionRevertSQL string

// applyPartitionMigration applies the partition-by-submitted migration,
// then creates daily partitions covering all configured jobAgeDays plus
// a buffer for live ingestion, plus a DEFAULT partition.
func (p *PostgresDatabase) applyPartitionMigration(ctx context.Context, jobAgeDays []int) error {
// Check whether the job table is already partitioned (relkind 'p').
// If so, skip the one-time conversion and just ensure partitions exist.
var relkind string
if err := p.pool.QueryRow(ctx,
"SELECT relkind::text FROM pg_class WHERE relname = 'job'").Scan(&relkind); err != nil {
return fmt.Errorf("checking job table type: %w", err)
}

if relkind != "p" {
if _, err := p.pool.Exec(ctx, partitionMigrationSQL); err != nil {
return fmt.Errorf("applying partition migration: %w", err)
}

if err := p.createDailyPartitions(ctx, jobAgeDays); err != nil {
return fmt.Errorf("creating daily partitions: %w", err)
}

// Move any existing rows from the old unpartitioned table into the
// new partitioned one, then drop the old table. This must happen
// after partitions exist so Postgres can route the rows.
if _, err := p.pool.Exec(ctx, "INSERT INTO job SELECT * FROM job_unpartitioned"); err != nil {
return fmt.Errorf("moving rows to partitioned table: %w", err)
}
if _, err := p.pool.Exec(ctx, "DROP TABLE job_unpartitioned"); err != nil {
return fmt.Errorf("dropping old unpartitioned table: %w", err)
}

logging.Info("Converted job table to range-partitioned")
} else {
// Already partitioned — just ensure any new date partitions exist.
if err := p.createDailyPartitions(ctx, jobAgeDays); err != nil {
return fmt.Errorf("creating daily partitions: %w", err)
}
logging.Info("Job table already partitioned, ensured partitions are up to date")
}

return nil
}

// createDailyPartitions creates one partition per day covering all dates
// in jobAgeDays (relative to today) plus a 2-day forward buffer, and a
// DEFAULT partition as a safety net.
func (p *PostgresDatabase) createDailyPartitions(ctx context.Context, jobAgeDays []int) error {
today := time.Now().Truncate(24 * time.Hour)

dates := make(map[time.Time]struct{})
for _, days := range jobAgeDays {
date := today.AddDate(0, 0, -days)
dates[date] = struct{}{}
}
// Add today + 2 days buffer for live ingestion
for offset := 0; offset <= 2; offset++ {
dates[today.AddDate(0, 0, offset)] = struct{}{}
}

for date := range dates {
partitionName := fmt.Sprintf("job_p%s", date.Format("20060102"))
nextDay := date.AddDate(0, 0, 1)
sql := fmt.Sprintf(
"CREATE TABLE IF NOT EXISTS %s PARTITION OF job FOR VALUES FROM ('%s') TO ('%s')",
partitionName,
date.Format("2006-01-02"),
nextDay.Format("2006-01-02"),
)
if _, err := p.pool.Exec(ctx, sql); err != nil {
return fmt.Errorf("creating partition %s: %w", partitionName, err)
}
logging.Infof("Created partition %s", partitionName)
}

if _, err := p.pool.Exec(ctx, "CREATE TABLE IF NOT EXISTS job_default PARTITION OF job DEFAULT"); err != nil {
return fmt.Errorf("creating default partition: %w", err)
}
logging.Info("Created default partition job_default")

return nil
}
Loading
Loading