Skip to content

feat: add checkpointing into the backfill so we can resume from the l…#519

Open
knrc wants to merge 2 commits intosecuresign:mainfrom
knrc:checkpointing
Open

feat: add checkpointing into the backfill so we can resume from the l…#519
knrc wants to merge 2 commits intosecuresign:mainfrom
knrc:checkpointing

Conversation

@knrc
Copy link

@knrc knrc commented Jan 12, 2026

User description

…ast completed, contiguous index


PR Type

Enhancement


Description

  • Add checkpointing support for resuming interrupted backfills

  • Implement checkpoint persistence for both Redis and MySQL backends

  • Track last completed contiguous index to enable safe resumption

  • Add CLI flags for checkpoint control and interval configuration

  • Refactor error handling to fail fast on critical errors

  • Add comprehensive test scenarios for checkpoint functionality


Diagram Walkthrough

flowchart LR
  A["Backfill Start"] --> B{"Checkpoint<br/>Exists?"}
  B -->|Yes| C["Resume from<br/>Last Index"]
  B -->|No| D["Start from<br/>--start"]
  C --> E["Process Entries"]
  D --> E
  E --> F["Track Completed<br/>Indices"]
  F --> G{"Interval<br/>Reached?"}
  G -->|Yes| H["Save Checkpoint"]
  G -->|No| F
  H --> I{"More<br/>Entries?"}
  I -->|Yes| E
  I -->|No| J["Final Checkpoint<br/>Save"]
  J --> K["Complete"]
Loading

File Walkthrough

Relevant files
Enhancement
main.go
Implement checkpointing with resume capability                     

cmd/backfill-index/main.go

  • Add checkpoint state structures and database schema for MySQL
    checkpoint table
  • Implement checkpoint interface methods for both Redis and MySQL
    clients
  • Add CLI flags for checkpoint-interval, reset-checkpoint, and
    checkpoint-key
  • Refactor populate function to manage checkpoint loading, saving, and
    resumption logic
  • Replace error accumulation pattern with fail-fast error handling
  • Track contiguous completed indices to ensure safe resumption from last
    completed entry
  • Add checkpoint goroutine to asynchronously save progress at configured
    intervals
+265/-65
Tests
backfill-test.sh
Add checkpoint testing scenarios and helpers                         

tests/backfill-test.sh

  • Add clear_checkpoint helper function to reset checkpoint state between
    test scenarios
  • Update check_all_entries to accept expected size parameters for Redis
    and MySQL
  • Add Scenario 5 comprehensive test covering checkpoint save, resume,
    and reset functionality
  • Verify checkpoint persistence across backfill runs with new log
    entries
  • Test --reset-checkpoint flag to clear saved progress and start fresh
  • Validate final checkpoint state matches expected end index
+150/-2 

@qodo-code-review
Copy link

qodo-code-review bot commented Jan 12, 2026

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
Insecure temp file

Description: The test script writes logs to predictable paths under /tmp (e.g.,
/tmp/backfill-resume.log and /tmp/backfill-reset.log) using tee, which can be exploited
via symlink/hardlink attacks to clobber or disclose arbitrary files if the script is ever
run with elevated privileges or in shared environments.
backfill-test.sh [287-336]

Referred Code
    go run cmd/backfill-index/main.go --rekor-address $REKOR_ADDRESS \
        --redis-hostname $REDIS_HOST --redis-port $REDIS_PORT --redis-password $REDIS_PASSWORD \
        --concurrency 5 --start 0 --end $new_end_index \
        --checkpoint-interval 2 2>&1 | tee /tmp/backfill-resume.log
else
    go run cmd/backfill-index/main.go --rekor-address $REKOR_ADDRESS \
        --mysql-dsn "${MYSQL_USER}:${MYSQL_PASSWORD}@tcp(${MYSQL_HOST}:${MYSQL_PORT})/${MYSQL_DB}" \
        --concurrency 5 --start 0 --end $new_end_index \
        --checkpoint-interval 2 2>&1 | tee /tmp/backfill-resume.log
fi

if ! grep -q "Resuming from checkpoint: last completed index $initial_checkpoint" /tmp/backfill-resume.log ; then
    echo "Scenario 5: FAILED - Did not resume from checkpoint"
    cat /tmp/backfill-resume.log
    exit 1
fi
echo "Verified: Backfill resumed from checkpoint index $initial_checkpoint"

# Verify all entries are indexed and checkpoint is at the end
# After adding 8 new entries (13-20), total is 21 entries:
#   Redis: 20 index + 16 new index + 1 checkpoint = 37 keys


 ... (clipped 29 lines)
Ticket Compliance
🎫 No ticket provided
  • Create ticket/issue
Codebase Duplication Compliance
Codebase context is not defined

Follow the guide to enable codebase context checks.

Custom Compliance
🟢
Generic: Meaningful Naming and Self-Documenting Code

Objective: Ensure all identifiers clearly express their purpose and intent, making code
self-documenting

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Robust Error Handling and Edge Case Management

Objective: Ensure comprehensive error handling that provides meaningful context and graceful
degradation

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

🔴
Generic: Secure Logging Practices

Objective: To ensure logs are useful for debugging and auditing without exposing sensitive
information like PII, PHI, or cardholder data.

Status:
Unstructured sensitive logs: New logging/printing emits unstructured text and includes operational identifiers (UUIDs
and index keys) which may be sensitive and violates the requirement for structured logs
without sensitive data.

Referred Code
				fmt.Printf("Uploaded entry %s, index %d, key %s\n", uuid, index, key)
			}
		}

		if useCheckpointing {
			select {
			case checkpointChan <- checkpointUpdate{index: index}:
			case <-ctx.Done():
				return nil
			}
		}

		fmt.Printf("Completed log index %d\n", index)
		return nil
	})
}
err = group.Wait()

if useCheckpointing {
	close(checkpointChan)
	<-checkpointDone


 ... (clipped 13 lines)

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Comprehensive Audit Trails

Objective: To create a detailed and reliable record of critical system actions for security analysis
and compliance.

Status:
Missing user context: Newly added checkpoint write/delete actions are logged but do not include any user
identity context (and it is unclear if this tool runs in a user-attributed context), so
audit-trail completeness cannot be verified from the diff.

Referred Code
	if err := indexClient.deleteCheckpoint(ctx, checkpointKeyName); err != nil {
		log.Printf("Warning: failed to delete checkpoint: %v", err)
	} else {
		log.Println("Checkpoint reset - starting fresh")
	}
} else {
	checkpoint, err := indexClient.loadCheckpoint(ctx, checkpointKeyName)
	if err != nil {
		log.Printf("Warning: failed to load checkpoint: %v", err)
	} else if checkpoint != nil {
		log.Printf("Resuming from checkpoint: last completed index %d", checkpoint.LastCompletedIndex)
		*startIndex = checkpoint.LastCompletedIndex + 1
		if *startIndex > *endIndex {
			log.Printf("Checkpoint at %d is already past end index %d - nothing to do", checkpoint.LastCompletedIndex, *endIndex)
			return nil
		}
		log.Printf("Processing entries from index %d to %d", *startIndex, *endIndex)
	} else {
		log.Printf("No checkpoint found - starting fresh from index %d to %d", *startIndex, *endIndex)
	}
}


 ... (clipped 35 lines)

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Error Handling

Objective: To prevent the leakage of sensitive system information through error messages while
providing sufficient detail for internal debugging.

Status:
Potentially detailed errors: New error paths wrap and return underlying backend errors (e.g., Redis/MySQL/Rekor errors)
which may be user-visible in CLI output and could expose internal system details depending
on how errors are surfaced by the caller.

Referred Code
	return fmt.Errorf("retrieving log uuid by index %d: %w", index, err)
}

for uuid, entry := range resp.Payload {
	// uuid is the global UUID - tree ID and entry UUID
	e, _, _, err := unmarshalEntryImpl(entry.Body.(string))
	if err != nil {
		return fmt.Errorf("error unmarshalling entry at index %d for %s: %w", index, uuid, err)
	}
	keys, err := e.IndexKeys()
	if err != nil {
		return fmt.Errorf("error building index keys at index %d for %s: %w", index, uuid, err)
	}
	for _, key := range keys {
		if err := indexClient.idempotentAddToIndex(ctx, key, uuid); err != nil {
			if errors.Is(err, context.Canceled) {
				return nil
			}
			return fmt.Errorf("error inserting UUID %s with key %s at index %d: %w", uuid, key, index, err)
		}

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Security-First Input Validation and Data Handling

Objective: Ensure all data inputs are validated, sanitized, and handled securely to prevent
vulnerabilities

Status:
Checkpoint key validation: The new external input --checkpoint-key is used to construct Redis keys and persisted as a
MySQL primary key without visible validation/normalization, so it is unclear whether
constraints (length/charset) and abuse cases are handled.

Referred Code
checkpointKeyName = *checkpointKey
if checkpointKeyName == "" {
	checkpointKeyName = "default"
}

Learn more about managing compliance generic rules or creating your own custom rules

  • Update
Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
⚪ - Requires Further Human Verification
🏷️ - Compliance label

@knrc knrc requested a review from osmman January 12, 2026 16:20
@qodo-code-review
Copy link

qodo-code-review bot commented Jan 12, 2026

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent unbounded memory usage

To prevent potential unbounded memory usage in the checkpointing goroutine,
modify the logic to prune the completedIndices map whenever possible, rather
than only when the next sequential index arrives.

cmd/backfill-index/main.go [337-342]

-				if update.index == highestCompleted+1 {
-					for i := update.index; completedIndices[i]; i++ {
-						highestCompleted = i
-						delete(completedIndices, i)
-					}
+				// Try to advance the highest contiguous completed index
+				for completedIndices[highestCompleted+1] {
+					highestCompleted++
+					delete(completedIndices, highestCompleted)
 				}
  • Apply / Chat
Suggestion importance[1-10]: 8

__

Why: This suggestion correctly identifies a potential memory leak due to unbounded growth of the completedIndices map in out-of-order processing scenarios, proposing a fix that prevents high memory usage on long runs.

Medium
General
Use parent context for timeout

In the checkpointing goroutine, use the parent ctx instead of
context.Background() when creating the timeout context for saving checkpoints to
allow for cancellation propagation.

cmd/backfill-index/main.go [351-358]

-saveCtx, saveCancel := context.WithTimeout(context.Background(), 5*time.Second)
+saveCtx, saveCancel := context.WithTimeout(ctx, 5*time.Second)
 if err := indexClient.saveCheckpoint(saveCtx, checkpointKeyName, state); err != nil {
     log.Printf("Warning: failed to save checkpoint: %v", err)
 } else {
     log.Printf("Checkpoint saved: last completed index %d", highestCompleted)
 }
 saveCancel()
  • Apply / Chat
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly points out that using context.Background() detaches the checkpoint-saving operation from the main application context, preventing it from respecting cancellation signals like SIGTERM.

Medium
Simplify timestamp handling in MySQL

Simplify the loadCheckpoint function for MySQL by scanning the LastUpdated
timestamp directly into a time.Time field, removing the need for manual
byte-to-string-to-time parsing.

cmd/backfill-index/main.go [556-579]

 func (c *mysqlClient) loadCheckpoint(ctx context.Context, checkpointKey string) (*checkpointState, error) {
 	var state checkpointState
-	var lastUpdatedBytes []byte
 	err := c.client.QueryRowContext(ctx, mysqlCheckpointLoadStmt, checkpointKey).Scan(
 		&state.LastCompletedIndex,
-		&lastUpdatedBytes,
+		&state.LastUpdated,
 	)
 	if err == sql.ErrNoRows {
 		return nil, nil
 	}
 	if err != nil {
 		return nil, fmt.Errorf("loading checkpoint from MySQL: %w", err)
 	}
 
-	if len(lastUpdatedBytes) > 0 {
-		parsedTime, err := time.Parse("2006-01-02 15:04:05", string(lastUpdatedBytes))
-		if err != nil {
-			return nil, fmt.Errorf("parsing LastUpdated timestamp: %w", err)
-		}
-		state.LastUpdated = parsedTime
-	}
-
 	return &state, nil
 }
  • Apply / Chat
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies that scanning a MySQL TIMESTAMP to []byte and parsing it is brittle; scanning directly to time.Time is more robust and simplifies the code.

Low
  • Update

@codecov-commenter
Copy link

codecov-commenter commented Jan 12, 2026

Codecov Report

❌ Patch coverage is 0% with 158 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (main@da7bae8). Learn more about missing BASE report.

Files with missing lines Patch % Lines
cmd/backfill-index/main.go 0.00% 158 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff           @@
##             main     #519   +/-   ##
=======================================
  Coverage        ?   25.66%           
=======================================
  Files           ?      191           
  Lines           ?    19783           
  Branches        ?        0           
=======================================
  Hits            ?     5078           
  Misses          ?    13938           
  Partials        ?      767           
Flag Coverage Δ
e2etests 50.06% <ø> (?)
unittests 16.47% <0.00%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

…ast completed, contiguous index

Signed-off-by: Kevin Conner <kev.conner@gmail.com>
@knrc knrc force-pushed the checkpointing branch 2 times, most recently from 8dd2a54 to 293d061 Compare January 12, 2026 18:49
… and prune docker volumes between the harness tests.

Signed-off-by: Kevin Conner <kev.conner@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants