Skip to content

Broadside: support partitioning by submitted time feature#4774

Open
mauriceyap wants to merge 6 commits intomasterfrom
broadside-partition-by-submitted
Open

Broadside: support partitioning by submitted time feature#4774
mauriceyap wants to merge 6 commits intomasterfrom
broadside-partition-by-submitted

Conversation

@mauriceyap
Copy link
Copy Markdown
Collaborator

This adds a feature toggle to Broadside which range-partitions the Postgres job table by the submitted timestamp with daily partitions. This is mutually exclusive with the existing hotColdSplit toggle.

When enabled:

  • The job table PK becomes (job_id, submitted)
  • Child tables (job_run, job_spec, jon_error) gain a submitted column
  • Ingestion bypasses LookoutDb (used in production), and instead uses direct SQL with submitted in WHERE clauses for partition pruning (this removes the risk of breaking anything running in production, at the cost of duplicating functionality - I think this is a good trade-off)
  • Historical bulk inserts split per age bucket so each INSERT targets a single partition

This also fixes a bug where the generate_series SQL function hardcoded submitted to NOW() - 24 hours, ignoring the configured jobAgeDays. It now uses a LATERAL subquery to vary the base time per age bucket.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Mar 18, 2026

Greptile Summary

This PR adds a partitionBySubmitted feature toggle to Broadside that range-partitions the Postgres job table by the submitted timestamp with daily partitions, including a dedicated ingestion path (executePartitionedBatch) that bypasses LookoutDb and uses the submitted column for partition pruning. It also fixes a pre-existing bug where generate_series historical inserts hardcoded NOW() - 24 hours regardless of jobAgeDays, replacing it with a LATERAL subquery that varies the base time per age bucket.

Key changes and issues found:

  • Phase 1 sequencing corrected: createJobsPartitioned now runs and commits before createJobSpecsPartitioned, avoiding the FK constraint race previously flagged.
  • Fragile strings.Replace replaced: targetsJobTable + replaceJobTable use a comment-stripping helper and a case-insensitive regex, with unit tests covering all the previously problematic edge cases.
  • Empty JobAgeDays guard added: The non-partitioned insertHistoricalJobChunk now returns early when JobAgeDays is empty, preventing division-by-zero in the LATERAL modulo expression.
  • strings.Title replaced with a local capitalize() helper, and flushCancel is now properly deferred in runBatchExecutor.
  • Direct-partition insert near midnight (postgres_partitioned.go:311): insertHistoricalJobChunkForBucket computes partitionName from an independent time.Now() call, separate from the one used by createDailyPartitions at startup. If historical population starts after a UTC midnight crossing, the computed partition name for a past age bucket will be one day newer than any partition that was actually created, producing a "relation does not exist" error. The forward buffer in createDailyPartitions only covers today+2, not the lookback range.
  • Unwrapped errors in three COPY helpers (postgres_partitioned.go:160/180/198): createJobSpecsPartitioned, createJobRunsPartitioned, and createJobErrorsPartitioned return bare errors, unlike the rest of the file.
  • Silent no-op on missing submittedMap entry (postgres_partitioned.go:252): A zero time.Time from an absent map key causes updateJobsPartitioned's WHERE clause to match no rows — the state transition is silently dropped with no error.
  • Non-atomic data migration in partition.go (partition.go:42): The row-copy and DROP TABLE job_unpartitioned steps run outside the partition_up.sql transaction. If the process is interrupted mid-migration and restarts, the relkind == 'p' check skips the migration entirely, leaving job_unpartitioned orphaned. Low risk since the migration is documented to run against an empty database.
  • No integration tests for the partitioned ingestion path: Existing integration tests use FeatureToggles{}. The partitioned batch execution, update, and error-insertion flows are not exercised in postgres_historical_test.go.

Confidence Score: 3/5

  • Several previously flagged issues are resolved, but two new logic concerns (midnight-boundary partition mismatch and silent no-op on missing submittedMap entries) remain and could cause undetected test failures.
  • The architecture is sound and the previously flagged FK race, strings.Replace fragility, empty-JobAgeDays division-by-zero, and flushCancel issues are all addressed. However, the direct-partition insert in insertHistoricalJobChunkForBucket uses an independent time.Now() call that can diverge from the partitions created at startup if execution crosses midnight, producing a hard runtime error (not a logic error). The silent-no-op in updateJobsPartitioned when submittedMap is missing an entry is a maintenance trap. Three CopyFrom helpers return bare errors making operational debugging harder. No integration tests cover the new partitioned execution paths.
  • internal/broadside/db/postgres_partitioned.go (midnight boundary + silent no-op + unwrapped errors) and internal/broadside/db/partition.go (non-atomic migration)

Important Files Changed

Filename Overview
internal/broadside/db/postgres_partitioned.go New file implementing partitioned batch execution — correctly sequences Phase 1 (jobs then specs), parallelises Phase 2, delegates Phase 3 to LookoutDb; issues: errors from three CopyFrom helpers are returned unwrapped, silent no-op risk when submittedMap is incomplete, and the direct-partition insert can fail near midnight boundaries.
internal/broadside/db/partition.go New file handling partition migration — correctly detects already-partitioned tables and creates daily + DEFAULT partitions; data-migration steps (row copy and old-table drop) run outside the SQL transaction, leaving job_unpartitioned orphaned if the process dies mid-migration.
internal/broadside/db/postgres.go Adds PartitionBySubmitted feature toggle dispatch, replaces deprecated strings.Title with capitalize(), fixes the strings.Replace fragility with a regex-based replaceJobTable + targetsJobTable pair with comment-stripping, adds the guard for empty JobAgeDays, and fixes the LATERAL subquery to vary base_time per age bucket.
internal/broadside/db/sql/partition_up.sql New SQL migration that converts the job table to a range-partitioned table on submitted with composite PK (job_id, submitted), adds submitted to child tables, and recreates indexes on the partitioned parent — schema matches the Lookout baseline.
internal/broadside/db/sql/partition_down.sql Revert migration: drops the partitioned job table, recreates the plain unpartitioned schema with all indexes, and removes submitted from child tables — schema faithfully reproduces the Lookout baseline including storage settings and all indexes.
internal/broadside/ingester/ingester.go Propagates the Submitted field through all state transitions via trans.Submitted(), fixes the final-flush context with a fresh Background context and defer flushCancel(), and adds the submitted timestamp to all query structs consistently.
internal/broadside/orchestrator/runner.go Collects the union of all unique JobAgeDays values across all queue/job-set configs and passes them to NewPostgresDatabase, enabling partition creation covering all configured historical age buckets.

Sequence Diagram

sequenceDiagram
    participant I as Ingester
    participant PD as PostgresDatabase
    participant PG as PostgreSQL

    rect rgb(220, 240, 255)
        Note over PD,PG: InitialiseSchema with PartitionBySubmitted
        PD->>PG: "partition_up.sql: rename job to job_unpartitioned, create partitioned job"
        PD->>PG: "CREATE TABLE job_pYYYYMMDD per jobAgeDays + today+2 buffer"
        PD->>PG: "CREATE TABLE job_default PARTITION OF job DEFAULT"
        PD->>PG: "INSERT INTO job SELECT * FROM job_unpartitioned"
        PD->>PG: "DROP TABLE job_unpartitioned"
    end

    rect rgb(255, 240, 220)
        Note over I,PG: ExecuteIngestionQueryBatch partitioned path
        I->>PD: "ExecuteIngestionQueryBatch(queries)"
        PD->>PD: "buildJobSubmittedMap + queriesToInstructionSet"
        Note over PD: "Phase 1 sequential"
        PD->>PG: "CopyFrom job (createJobsPartitioned)"
        PD->>PG: "CopyFrom job_spec (createJobSpecsPartitioned)"
        Note over PD: "Phase 2 parallel"
        par
            PD->>PG: "UPDATE job via temp table (updateJobsPartitioned)"
        and
            PD->>PG: "CopyFrom job_run (createJobRunsPartitioned)"
        and
            PD->>PG: "CopyFrom job_error (createJobErrorsPartitioned)"
        end
        Note over PD: "Phase 3 via LookoutDb"
        PD->>PG: "UPDATE job_run (LookoutDb.UpdateJobRuns)"
    end

    rect rgb(220, 255, 220)
        Note over PD,PG: PopulateHistoricalJobs partitioned path
        loop "per age bucket (ageDays in JobAgeDays)"
            PD->>PD: "baseTime = Now().Truncate(24h) minus ageDays"
            PD->>PD: "partitionName = job_pYYYYMMDD"
            PD->>PG: "INSERT INTO job_pYYYYMMDD WHERE i%nBuckets=bucketIdx"
            PD->>PG: "INSERT INTO job_spec WHERE i%nBuckets=bucketIdx"
            PD->>PG: "INSERT INTO job_run WHERE i%nBuckets=bucketIdx"
            PD->>PG: "INSERT INTO job_error WHERE i%nBuckets=bucketIdx"
        end
    end

    rect rgb(255, 220, 220)
        Note over PD,PG: TearDown with PartitionBySubmitted
        PD->>PG: "TRUNCATE all tables"
        PD->>PG: "partition_down.sql: DROP TABLE job CASCADE, recreate unpartitioned job"
        PD->>PG: "DROP COLUMN submitted from job_run, job_spec, job_error"
    end
Loading

Comments Outside Diff (4)

  1. internal/broadside/db/postgres_partitioned.go, line 160-161 (link)

    Errors returned without context wrapping

    createJobSpecsPartitioned, createJobRunsPartitioned, and createJobErrorsPartitioned all return the raw error from p.pool.CopyFrom without wrapping it in fmt.Errorf. Every other function in this file (including createJobsPartitioned) wraps errors with contextual information. A bare return err makes it difficult to distinguish which COPY operation failed when debugging.

    Same pattern applies to createJobRunsPartitioned (line 180) and createJobErrorsPartitioned (line 198).

  2. internal/broadside/db/postgres_partitioned.go, line 311-313 (link)

    Direct partition insert may fail near midnight boundary

    partitionName is computed from an independent time.Now() call on line 311, while createDailyPartitions (called earlier during InitialiseSchema) also calls time.Now() independently. For historical age buckets, if those two calls straddle a UTC midnight boundary, the partition name computed here will be one day later than the partition that was actually created.

    For example: createDailyPartitions runs at 23:59:59 on day D and creates job_p(D-7) for ageDays=7. If insertHistoricalJobChunkForBucket runs after midnight on day D+1, it computes partitionName = job_p(D+1-7) = job_p(D-6). Since job_p(D-6) was never created, PostgreSQL returns ERROR: relation "job_p..." does not exist, which is not caught by ON CONFLICT DO NOTHING.

    The forward-buffer in createDailyPartitions (+2 days) covers live-ingestion jobs, but not the lookback age buckets. Consider one of:

    1. Inserting into the parent job table instead of the named partition (sacrifices the direct-routing optimisation)
    2. Re-running createDailyPartitions before historical population
    3. Using a single time.Now() computed once in applyPartitionMigration and passed through
  3. internal/broadside/db/partition.go, line 42-47 (link)

    Data migration is non-atomic — job_unpartitioned orphaned on partial failure

    Steps 2–4 (creating partitions, migrating rows, dropping job_unpartitioned) execute outside the partition_up.sql transaction. If the process dies after partitionMigrationSQL completes but before DROP TABLE job_unpartitioned, subsequent restarts detect relkind == 'p' (line 30) and skip into the else branch, calling only createDailyPartitions. The data-migration step (INSERT INTO job SELECT * FROM job_unpartitioned) is never retried, and job_unpartitioned is silently orphaned with its rows un-migrated.

    This is low-risk in practice because partition_up.sql comments state it runs against an empty database, so there are no rows to lose. However, a defensive check would eliminate any ambiguity:

    // After checking relkind == "p", verify job_unpartitioned no longer exists.
    var exists bool
    _ = p.pool.QueryRow(ctx,
        "SELECT EXISTS(SELECT 1 FROM pg_class WHERE relname = 'job_unpartitioned')").Scan(&exists)
    if exists {
        return fmt.Errorf("job_unpartitioned still exists; previous migration may have been interrupted")
    }
  4. internal/broadside/db/postgres_partitioned.go, line 252-265 (link)

    Silent no-op updates when submittedMap entry is absent

    submittedMap[instr.JobId] returns the zero time.Time (0001-01-01 00:00:00) when the job ID is not in the map. This value is written into the temp table, and the final UPDATE filters with:

    WHERE tmp.job_id = job.job_id AND tmp.submitted = job.submitted

    Since no real job row has submitted = '0001-01-01 00:00:00', the UPDATE silently matches zero rows — the state transition is dropped with no error returned. This is particularly subtle because the batch returns nil even though no rows were updated.

    While the current code always populates submittedMap for every query type that contributes to JobsToUpdate, the silent-failure mode is a maintenance hazard: adding a new query type to queriesToInstructionSet without a corresponding buildJobSubmittedMap case would cause undetected data loss. Consider logging a warning or returning an error when a job ID is missing from the map before issuing the UPDATE.

Reviews (9): Last reviewed commit: "fix" | Re-trigger Greptile

@mauriceyap mauriceyap force-pushed the broadside-partition-by-submitted branch from cf023e3 to 9850377 Compare March 18, 2026 10:26
@mauriceyap mauriceyap force-pushed the broadside-partition-by-submitted branch from 323fb9b to fd8bbdd Compare March 20, 2026 11:18
@mauriceyap mauriceyap closed this Mar 20, 2026
@mauriceyap mauriceyap reopened this Mar 20, 2026
@mauriceyap mauriceyap enabled auto-merge (squash) March 23, 2026 10:12
@mauriceyap mauriceyap disabled auto-merge March 23, 2026 10:13
@mauriceyap
Copy link
Copy Markdown
Collaborator Author

@Mergifyio refresh

@mergify
Copy link
Copy Markdown
Contributor

mergify bot commented Mar 23, 2026

refresh

✅ Pull request refreshed

@mauriceyap
Copy link
Copy Markdown
Collaborator Author

@Mergifyio update

@mergify
Copy link
Copy Markdown
Contributor

mergify bot commented Mar 23, 2026

update

☑️ Nothing to do, the required conditions are not met

Details
  • #commits-behind > 0 [📌 update requirement]
  • -closed [📌 update requirement]
  • -conflict [📌 update requirement]
  • queue-position = -1 [📌 update requirement]

@mauriceyap
Copy link
Copy Markdown
Collaborator Author

@Mergifyio refresh

@mergify
Copy link
Copy Markdown
Contributor

mergify bot commented Mar 23, 2026

refresh

✅ Pull request refreshed

@mauriceyap
Copy link
Copy Markdown
Collaborator Author

@Mergifyio rebase

@mergify
Copy link
Copy Markdown
Contributor

mergify bot commented Mar 23, 2026

rebase

❌ Unable to rebase: Mergify can't impersonate mauriceyap

Details

User mauriceyap used as bot_account is unknown. Please make sure {login} exists and has logged into the Mergify dashboard.

This adds a feature toggle to Broadside which range-partitions the Postgres `job` table by the `submitted` timestamp with daily partitions. This is mutually exclusive with the existing `hotColdSplit` toggle.

When enabled:
- The job table PK becomes (job_id, submitted)
- Child tables (job_run, job_spec, jon_error) gain a submitted column
- Ingestion bypasses `LookoutDb` (used in production), and instead uses direct SQL with submitted in WHERE clauses for partition pruning (this removes the risk of breaking anything running in production, at the cost of duplicating functionality - I think this is a good trade-off)
- Historical bulk inserts split per age bucket so each INSERT targets a single partition

This also fixes a bug where the generate_series SQL function hardcoded submitted to `NOW() - 24 hours`, ignoring the configured `jobAgeDays`. It now uses a LATERAL subquery to vary the base time per age bucket.

Signed-off-by: Maurice Yap <mauriceyap@hotmail.co.uk>
Signed-off-by: Maurice Yap <mauriceyap@hotmail.co.uk>
Signed-off-by: Maurice Yap <mauriceyap@hotmail.co.uk>
Signed-off-by: Maurice Yap <mauriceyap@hotmail.co.uk>
Signed-off-by: Maurice Yap <mauriceyap@hotmail.co.uk>
Signed-off-by: Maurice Yap <mauriceyap@hotmail.co.uk>
@mauriceyap mauriceyap force-pushed the broadside-partition-by-submitted branch from 6729360 to c6c8bc4 Compare March 23, 2026 10:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants