-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Description
User Story
As a job execution system, I want to reliably commit and push validated changes to remote repositories, so that resolved conflicts are successfully integrated even with transient network issues.
Acceptance Criteria
- Create commits with proper attribution (user + Claude co-author)
- Sign commits with GPG when configured
- Implement exponential backoff retry for push operations
- Handle push rejections (non-fast-forward, protected branch)
- Support force push with lease for safety
- Atomic operations with rollback on failure
- Preserve commit message templates per repository
- Track push metrics (success rate, retry count, duration)
- Handle large commits with progress reporting
- Support different Git providers (GitHub, GitLab, Bitbucket)
Technical Implementation
Commit Service
// pkg/git/commit_service.go
package git
import (
"context"
"time"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing/object"
)
type CommitService struct {
gitClient *GitClient
signer *CommitSigner
attributor *CommitAttributor
config *CommitConfig
metrics *MetricsCollector
}
type CommitConfig struct {
DefaultAuthor Author
ClaudeCoAuthor Author
SignCommits bool
MessageTemplate string
MaxMessageLength int
}
type Author struct {
Name string
Email string
}
type CommitRequest struct {
Repository *git.Repository
JobID string
Message string
Files []string
Author *Author
SigningKey *SigningKey
}
func (s *CommitService) CreateCommit(ctx context.Context, req CommitRequest) (*object.Commit, error) {
w, err := req.Repository.Worktree()
if err != nil {
return nil, fmt.Errorf("failed to get worktree: %w", err)
}
// Stage files
for _, file := range req.Files {
if err := w.Add(file); err != nil {
return nil, fmt.Errorf("failed to stage %s: %w", file, err)
}
}
// Verify staged changes
status, err := w.Status()
if err != nil {
return nil, err
}
if len(status) == 0 {
return nil, fmt.Errorf("no changes to commit")
}
// Build commit message with attribution
message := s.buildCommitMessage(req.Message, req.JobID)
// Prepare commit options
commitOpts := &git.CommitOptions{
Author: s.getAuthor(req.Author),
}
// Sign commit if configured
if s.config.SignCommits && req.SigningKey != nil {
commitOpts.SignKey = req.SigningKey.Entity
}
// Create commit
hash, err := w.Commit(message, commitOpts)
if err != nil {
return nil, fmt.Errorf("commit failed: %w", err)
}
// Get commit object
commit, err := req.Repository.CommitObject(hash)
if err != nil {
return nil, err
}
// Record metrics
s.metrics.RecordCommit(req.JobID, len(req.Files), len(message))
return commit, nil
}
func (s *CommitService) buildCommitMessage(userMessage, jobID string) string {
// Apply template
message := s.config.MessageTemplate
message = strings.ReplaceAll(message, "{message}", userMessage)
message = strings.ReplaceAll(message, "{job_id}", jobID)
message = strings.ReplaceAll(message, "{timestamp}", time.Now().Format(time.RFC3339))
// Add Claude co-author attribution
attribution := fmt.Sprintf("\n\nCo-authored-by: %s <%s>",
s.config.ClaudeCoAuthor.Name,
s.config.ClaudeCoAuthor.Email,
)
message += attribution
// Add job metadata
metadata := fmt.Sprintf("\n\nFlowForge-Job-ID: %s", jobID)
message += metadata
// Truncate if too long
if len(message) > s.config.MaxMessageLength {
message = message[:s.config.MaxMessageLength-3] + "..."
}
return message
}Push Service with Retry
// pkg/git/push_service.go
type PushService struct {
gitClient *GitClient
retryConfig *RetryConfig
metrics *MetricsCollector
}
type RetryConfig struct {
MaxAttempts int
InitialBackoff time.Duration
MaxBackoff time.Duration
BackoffMultiplier float64
}
type PushRequest struct {
Repository *git.Repository
RemoteName string
Branch string
Force bool
ForceWithLease bool
Credentials Credentials
}
func (s *PushService) Push(ctx context.Context, req PushRequest) error {
pushOpts := &git.PushOptions{
RemoteName: req.RemoteName,
RefSpecs: []config.RefSpec{s.buildRefSpec(req)},
Auth: req.Credentials.ToAuth(),
Progress: s.createProgressReporter(ctx),
}
if req.Force {
pushOpts.Force = true
}
if req.ForceWithLease {
pushOpts.ForceWithLease = &git.ForceWithLease{}
}
// Retry with exponential backoff
return s.retryPush(ctx, req.Repository, pushOpts)
}
func (s *PushService) retryPush(ctx context.Context, repo *git.Repository, opts *git.PushOptions) error {
backoff := s.retryConfig.InitialBackoff
for attempt := 1; attempt <= s.retryConfig.MaxAttempts; attempt++ {
err := s.attemptPush(ctx, repo, opts)
if err == nil {
s.metrics.RecordPushSuccess(attempt)
return nil
}
// Check if error is retryable
if !s.isRetryable(err) {
s.metrics.RecordPushFailure(err.Error())
return fmt.Errorf("non-retryable push error: %w", err)
}
if attempt == s.retryConfig.MaxAttempts {
s.metrics.RecordPushFailure("max_retries_exceeded")
return fmt.Errorf("push failed after %d attempts: %w", attempt, err)
}
// Log retry attempt
log.Warnf("Push attempt %d failed: %v. Retrying in %v", attempt, err, backoff)
// Wait with backoff
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(backoff):
// Update backoff for next attempt
backoff = time.Duration(float64(backoff) * s.retryConfig.BackoffMultiplier)
if backoff > s.retryConfig.MaxBackoff {
backoff = s.retryConfig.MaxBackoff
}
}
}
return fmt.Errorf("push failed: max attempts reached")
}
func (s *PushService) attemptPush(ctx context.Context, repo *git.Repository, opts *git.PushOptions) error {
// Create timeout context for individual attempt
pushCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
// Attempt push
err := repo.PushContext(pushCtx, opts)
if err == nil {
return nil
}
// Handle specific errors
if err == git.NoErrAlreadyUpToDate {
return nil
}
if errors.Is(err, git.ErrNonFastForwardUpdate) {
return s.handleNonFastForward(repo, opts)
}
return err
}
func (s *PushService) isRetryable(err error) bool {
// Network errors are retryable
if isNetworkError(err) {
return true
}
// Authentication errors might be retryable (token refresh)
if isAuthError(err) && s.gitClient.CanRefreshAuth() {
return true
}
// Rate limiting is retryable
if isRateLimitError(err) {
return true
}
// Non-fast-forward is not retryable without intervention
if errors.Is(err, git.ErrNonFastForwardUpdate) {
return false
}
return false
}Progress Reporting
// pkg/git/progress.go
type ProgressReporter struct {
ctx context.Context
updates chan<- ProgressUpdate
metrics *MetricsCollector
}
type ProgressUpdate struct {
Phase string
Current int64
Total int64
Message string
Timestamp time.Time
}
func (r *ProgressReporter) Write(p []byte) (n int, err error) {
// Parse git progress output
progress := r.parseGitProgress(string(p))
if progress != nil {
select {
case <-r.ctx.Done():
return len(p), r.ctx.Err()
case r.updates <- *progress:
// Update sent
default:
// Channel full, skip update
}
}
return len(p), nil
}
func (r *ProgressReporter) parseGitProgress(line string) *ProgressUpdate {
// Parse various git progress formats
patterns := []struct {
regex *regexp.Regexp
handler func([]string) *ProgressUpdate
}{
{
// Counting objects: 100% (10/10), done.
regex: regexp.MustCompile(`Counting objects:\s+(\d+)%\s+\((\d+)/(\d+)\)`),
handler: func(m []string) *ProgressUpdate {
current, _ := strconv.ParseInt(m[2], 10, 64)
total, _ := strconv.ParseInt(m[3], 10, 64)
return &ProgressUpdate{
Phase: "counting_objects",
Current: current,
Total: total,
Message: m[0],
}
},
},
{
// Writing objects: 100% (10/10), 1.50 KiB | 1.50 MiB/s, done.
regex: regexp.MustCompile(`Writing objects:\s+(\d+)%\s+\((\d+)/(\d+)\)`),
handler: func(m []string) *ProgressUpdate {
current, _ := strconv.ParseInt(m[2], 10, 64)
total, _ := strconv.ParseInt(m[3], 10, 64)
return &ProgressUpdate{
Phase: "writing_objects",
Current: current,
Total: total,
Message: m[0],
}
},
},
}
for _, pattern := range patterns {
if matches := pattern.regex.FindStringSubmatch(line); matches != nil {
update := pattern.handler(matches)
update.Timestamp = time.Now()
return update
}
}
return nil
}Atomic Operations
// pkg/git/atomic.go
type AtomicGitOperation struct {
repo *git.Repository
rollback []func() error
completed []string
}
func (a *AtomicGitOperation) AddRollback(fn func() error, description string) {
a.rollback = append(a.rollback, fn)
a.completed = append(a.completed, description)
}
func (a *AtomicGitOperation) Commit(message string) error {
w, err := a.repo.Worktree()
if err != nil {
return err
}
// Save current HEAD for rollback
head, err := a.repo.Head()
if err != nil {
return err
}
a.AddRollback(func() error {
return w.Reset(&git.ResetOptions{
Commit: head.Hash(),
Mode: git.HardReset,
})
}, "reset to previous HEAD")
// Perform commit
_, err = w.Commit(message, &git.CommitOptions{})
return err
}
func (a *AtomicGitOperation) Rollback() error {
var errs []error
// Execute rollback functions in reverse order
for i := len(a.rollback) - 1; i >= 0; i-- {
if err := a.rollback[i](); err != nil {
errs = append(errs, fmt.Errorf("rollback '%s' failed: %w", a.completed[i], err))
}
}
if len(errs) > 0 {
return fmt.Errorf("rollback errors: %v", errs)
}
return nil
}Provider-Specific Handling
// pkg/git/providers.go
type GitProvider interface {
Name() string
ValidateURL(url string) bool
HandlePushError(err error) error
GetPushOptions(base *git.PushOptions) *git.PushOptions
}
type GitHubProvider struct{}
func (p *GitHubProvider) HandlePushError(err error) error {
// Handle GitHub-specific errors
if strings.Contains(err.Error(), "protected branch") {
return fmt.Errorf("cannot push to protected branch: create a pull request instead")
}
if strings.Contains(err.Error(), "Repository not found") {
return fmt.Errorf("repository not found or insufficient permissions")
}
return err
}
type GitLabProvider struct{}
func (p *GitLabProvider) GetPushOptions(base *git.PushOptions) *git.PushOptions {
// Add GitLab-specific push options
base.PushOptions = map[string]string{
"merge_request.create": "true",
"merge_request.target": "main",
}
return base
}Architecture References
Git Operations Flow
Reference: /docs/03-data-flow.md:60-78
The commit and push flow in the job execution:
sequenceDiagram
Container->>Git: Commit changes
Container->>Git: Rebase on target
alt Rebase conflict
Git-->>Container: Conflict detected
Container->>Claude: Resolve conflicts
Claude-->>Container: Resolution
Container->>Git: Apply resolution
end
Container->>Git: Push changes
Commit Attribution
Reference: /docs/02-system-components.md:195-219
The job schema includes metadata for tracking:
interface Job {
id: string;
userId: string;
repositoryId: string;
metadata: {
createdAt: Date;
startedAt?: Date;
completedAt?: Date;
workerId?: string;
attempts: number;
};
}Git Operations Audit
Reference: /docs/02-system-components.md:599-611
CREATE TABLE git_operations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
job_id UUID REFERENCES jobs(id),
operation_type VARCHAR(50) NOT NULL, -- clone, pull, push, rebase, etc.
repository_url TEXT,
branch VARCHAR(255),
commit_sha VARCHAR(40),
status VARCHAR(50) NOT NULL,
duration_ms INTEGER,
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);Dependencies
- go-git/v5: Git operations
- golang.org/x/crypto/openpgp: Commit signing
- PostgreSQL: Operation tracking
- Prometheus: Metrics collection
Definition of Done
- Unit tests cover commit and push operations with 90%+ coverage
- Integration tests verify push retry logic
- Commits are properly attributed with co-author
- GPG signing works when configured
- Push retry handles network failures gracefully
- Progress reporting works for large pushes
- Documentation includes commit message template examples
Effort Estimate
8 Story Points - Moderate complexity with retry logic
Labels
- backend
- git
- epic-5