Skip to content

[User Story] Add Repository Caching Layer for Performance #51

@prodigy

Description

@prodigy

User Story

As a job execution system, I want to cache frequently used repositories to avoid repeated cloning, so that jobs start faster and reduce network bandwidth usage.

Acceptance Criteria

  • Implement multi-tier caching (local disk, distributed cache)
  • Support incremental updates (fetch only new commits)
  • Maintain cache consistency across worker nodes
  • Implement cache eviction policies (LRU, size-based)
  • Support concurrent access with proper locking
  • Compress cached repositories for storage efficiency
  • Track cache metrics (hit rate, size, age)
  • Provide cache warming for critical repositories
  • Handle corrupted cache entries gracefully
  • Support cache invalidation on demand

Technical Implementation

Repository Cache Architecture

// pkg/git/cache.go
package git

import (
    "context"
    "sync"
    "time"
)

type RepositoryCache struct {
    localStorage  *LocalStorage
    sharedStorage *SharedStorage
    index         *CacheIndex
    locker        *CacheLockManager
    compressor    *Compressor
    metrics       *CacheMetrics
    config        *CacheConfig
}

type CacheConfig struct {
    LocalCacheDir      string
    MaxLocalSize       int64
    MaxSharedSize      int64
    EvictionPolicy     EvictionPolicy
    CompressionEnabled bool
    WarmupEnabled      bool
    TTL                time.Duration
}

type CacheEntry struct {
    RepositoryID   string
    URL            string
    Branch         string
    CommitHash     string
    Size           int64
    CompressedSize int64
    LastAccessed   time.Time
    CreatedAt      time.Time
    AccessCount    int
    Location       CacheLocation
}

type CacheLocation struct {
    Type     StorageType // local, shared
    Path     string
    NodeID   string
}

func (c *RepositoryCache) Get(ctx context.Context, req CacheRequest) (*CachedRepository, error) {
    key := c.generateKey(req.URL, req.Branch)
    
    // Acquire read lock
    lock := c.locker.AcquireReadLock(ctx, key)
    defer lock.Release()
    
    // Check local cache first
    entry, err := c.localStorage.Get(key)
    if err == nil {
        c.metrics.RecordHit("local")
        return c.prepareRepository(ctx, entry)
    }
    
    // Check shared cache
    entry, err = c.sharedStorage.Get(key)
    if err == nil {
        c.metrics.RecordHit("shared")
        // Promote to local cache
        if err := c.promoteToLocal(ctx, entry); err != nil {
            log.Warnf("Failed to promote to local cache: %v", err)
        }
        return c.prepareRepository(ctx, entry)
    }
    
    c.metrics.RecordMiss()
    return nil, ErrCacheMiss
}

func (c *RepositoryCache) Put(ctx context.Context, repo *Repository, req CacheRequest) error {
    key := c.generateKey(req.URL, req.Branch)
    
    // Acquire write lock
    lock := c.locker.AcquireWriteLock(ctx, key)
    defer lock.Release()
    
    // Prepare cache entry
    entry, err := c.createCacheEntry(repo, req)
    if err != nil {
        return fmt.Errorf("failed to create cache entry: %w", err)
    }
    
    // Compress if enabled
    if c.config.CompressionEnabled {
        compressed, err := c.compressor.CompressRepository(repo.Path)
        if err != nil {
            return fmt.Errorf("compression failed: %w", err)
        }
        entry.CompressedSize = int64(len(compressed))
    }
    
    // Check cache size and evict if necessary
    if err := c.ensureSpace(ctx, entry.Size); err != nil {
        return fmt.Errorf("cache space error: %w", err)
    }
    
    // Store in appropriate tier
    if entry.Size < c.config.MaxLocalSize/10 { // Small repos in local
        err = c.localStorage.Put(key, entry)
    } else {
        err = c.sharedStorage.Put(key, entry)
    }
    
    if err != nil {
        return err
    }
    
    // Update index
    c.index.Add(entry)
    c.metrics.RecordPut(entry.Size)
    
    return nil
}

Incremental Updates

// pkg/git/cache_updater.go
type CacheUpdater struct {
    cache     *RepositoryCache
    gitClient *GitClient
}

func (u *CacheUpdater) UpdateCache(ctx context.Context, entry *CacheEntry) error {
    // Open cached repository
    repo, err := u.openCachedRepo(entry)
    if err != nil {
        return fmt.Errorf("failed to open cached repo: %w", err)
    }
    
    // Fetch latest changes
    remote, err := repo.Remote("origin")
    if err != nil {
        return err
    }
    
    // Use shallow fetch for efficiency
    fetchOpts := &git.FetchOptions{
        RemoteName: "origin",
        Depth:      1,
        Auth:       u.gitClient.GetAuth(),
        Progress:   u.createProgressReporter(),
    }
    
    err = remote.Fetch(fetchOpts)
    if err != nil && err != git.NoErrAlreadyUpToDate {
        return fmt.Errorf("fetch failed: %w", err)
    }
    
    // Update cache metadata
    head, err := repo.Head()
    if err != nil {
        return err
    }
    
    entry.CommitHash = head.Hash().String()
    entry.LastAccessed = time.Now()
    entry.AccessCount++
    
    // Re-compress if needed
    if u.cache.config.CompressionEnabled {
        if err := u.recompress(entry); err != nil {
            log.Warnf("Recompression failed: %v", err)
        }
    }
    
    return u.cache.index.Update(entry)
}

Cache Eviction

// pkg/git/cache_eviction.go
type EvictionPolicy interface {
    SelectVictims(entries []*CacheEntry, requiredSpace int64) []*CacheEntry
}

type LRUEvictionPolicy struct{}

func (p *LRUEvictionPolicy) SelectVictims(entries []*CacheEntry, requiredSpace int64) []*CacheEntry {
    // Sort by last accessed time
    sort.Slice(entries, func(i, j int) bool {
        return entries[i].LastAccessed.Before(entries[j].LastAccessed)
    })
    
    var victims []*CacheEntry
    var freedSpace int64
    
    for _, entry := range entries {
        if freedSpace >= requiredSpace {
            break
        }
        victims = append(victims, entry)
        freedSpace += entry.Size
    }
    
    return victims
}

type SizeBasedEvictionPolicy struct {
    maxAge time.Duration
}

func (p *SizeBasedEvictionPolicy) SelectVictims(entries []*CacheEntry, requiredSpace int64) []*CacheEntry {
    // First remove old entries
    cutoff := time.Now().Add(-p.maxAge)
    var candidates []*CacheEntry
    
    for _, entry := range entries {
        if entry.CreatedAt.Before(cutoff) {
            candidates = append(candidates, entry)
        }
    }
    
    // Then sort by size/access ratio
    sort.Slice(candidates, func(i, j int) bool {
        ratioI := float64(candidates[i].Size) / float64(candidates[i].AccessCount+1)
        ratioJ := float64(candidates[j].Size) / float64(candidates[j].AccessCount+1)
        return ratioI > ratioJ
    })
    
    return p.selectBySpace(candidates, requiredSpace)
}

Distributed Cache Coordination

// pkg/git/cache_coordinator.go
type CacheCoordinator struct {
    nodeID       string
    registry     ServiceRegistry
    consistency  *ConsistencyManager
    broadcaster  *EventBroadcaster
}

func (c *CacheCoordinator) NotifyCacheUpdate(entry *CacheEntry) error {
    event := CacheEvent{
        Type:      CacheEventUpdate,
        NodeID:    c.nodeID,
        Entry:     entry,
        Timestamp: time.Now(),
    }
    
    return c.broadcaster.Broadcast(event)
}

func (c *CacheCoordinator) HandleCacheEvent(event CacheEvent) error {
    switch event.Type {
    case CacheEventUpdate:
        // Another node updated cache
        return c.consistency.MarkStale(event.Entry.RepositoryID)
        
    case CacheEventInvalidate:
        // Invalidate local copy
        return c.invalidateLocal(event.Entry.RepositoryID)
        
    case CacheEventWarmup:
        // Participate in cache warming
        return c.warmupCache(event.Entry)
    }
    
    return nil
}

type ConsistencyManager struct {
    versions map[string]CacheVersion
    mu       sync.RWMutex
}

type CacheVersion struct {
    CommitHash   string
    LastModified time.Time
    NodeID       string
}

func (m *ConsistencyManager) CheckConsistency(repoID string, localVersion string) bool {
    m.mu.RLock()
    defer m.mu.RUnlock()
    
    version, exists := m.versions[repoID]
    if !exists {
        return true // No known version, assume consistent
    }
    
    return version.CommitHash == localVersion
}

Cache Warming

// pkg/git/cache_warmer.go
type CacheWarmer struct {
    cache       *RepositoryCache
    gitClient   *GitClient
    scheduler   *WarmupScheduler
}

type WarmupConfig struct {
    Repositories []WarmupRepository
    Schedule     string // Cron expression
    Parallelism  int
}

type WarmupRepository struct {
    URL      string
    Branches []string
    Priority int
}

func (w *CacheWarmer) Start(ctx context.Context) error {
    return w.scheduler.Schedule(w.config.Schedule, func() {
        if err := w.warmupRepositories(ctx); err != nil {
            log.Errorf("Cache warmup failed: %v", err)
        }
    })
}

func (w *CacheWarmer) warmupRepositories(ctx context.Context) error {
    // Sort by priority
    repos := w.sortByPriority(w.config.Repositories)
    
    // Use worker pool
    sem := make(chan struct{}, w.config.Parallelism)
    errCh := make(chan error, len(repos))
    
    for _, repo := range repos {
        sem <- struct{}{}
        go func(r WarmupRepository) {
            defer func() { <-sem }()
            
            for _, branch := range r.Branches {
                if err := w.warmupBranch(ctx, r.URL, branch); err != nil {
                    errCh <- fmt.Errorf("warmup %s:%s failed: %w", r.URL, branch, err)
                }
            }
        }(repo)
    }
    
    // Wait for completion
    for i := 0; i < w.config.Parallelism; i++ {
        sem <- struct{}{}
    }
    
    close(errCh)
    
    // Collect errors
    var errs []error
    for err := range errCh {
        errs = append(errs, err)
    }
    
    if len(errs) > 0 {
        return fmt.Errorf("warmup errors: %v", errs)
    }
    
    return nil
}

Cache Metrics

// pkg/git/cache_metrics.go
type CacheMetrics struct {
    hits        *prometheus.CounterVec
    misses      prometheus.Counter
    evictions   prometheus.Counter
    size        prometheus.Gauge
    hitRate     prometheus.Gauge
    operations  *prometheus.HistogramVec
}

func (m *CacheMetrics) RecordHit(tier string) {
    m.hits.WithLabelValues(tier).Inc()
    m.updateHitRate()
}

func (m *CacheMetrics) RecordMiss() {
    m.misses.Inc()
    m.updateHitRate()
}

func (m *CacheMetrics) updateHitRate() {
    total := m.getTotalHits() + m.getMisses()
    if total > 0 {
        rate := float64(m.getTotalHits()) / float64(total)
        m.hitRate.Set(rate)
    }
}

func (m *CacheMetrics) GetStats() CacheStats {
    return CacheStats{
        HitRate:      m.hitRate.Get(),
        TotalHits:    m.getTotalHits(),
        TotalMisses:  m.getMisses(),
        CacheSize:    m.size.Get(),
        EvictionCount: m.getEvictions(),
    }
}

Architecture References

Performance Optimization

Reference: /docs/01-architecture-overview.md:180-185

The architecture emphasizes performance through caching:

  • Container image caching
  • Job result caching
  • Lazy loading and pagination
  • Efficient database queries with indexes

Repository Caching in Git Service

Reference: /docs/02-system-components.md:122-134

Object Storage is used for repository caches:

#### Object Storage (S3-compatible)
- Job artifacts and logs
- Repository caches
- Backup storage

Cache Layers

Reference: /docs/03-data-flow.md:268-283

The system implements multiple cache layers:

graph TD
    A[Client Request] --> B{CDN Cache?}
    B -->|Hit| C[Return Cached]
    B -->|Miss| D{API Cache?}
    D -->|Hit| E[Return from API Cache]
    D -->|Miss| F{Redis Cache?}
    F -->|Hit| G[Return from Redis]
    F -->|Miss| H[Database Query]
Loading

Redis Data Structures

Reference: /docs/02-system-components.md:693-711

Redis is used for distributed caching:

# Locks
lock:repo:{repo_id} -> {worker_id}

# Queues
queue:high -> [job_ids...]
queue:normal -> [job_ids...]
queue:low -> [job_ids...]

Dependencies

  • go-git/v5: Repository operations
  • MinIO/S3: Shared cache storage
  • Redis: Cache index and coordination
  • zstd: Compression library
  • Prometheus: Metrics collection

Definition of Done

  • Unit tests cover caching operations with 85%+ coverage
  • Integration tests verify distributed cache consistency
  • Cache hit rate >70% for frequently used repositories
  • Compression reduces storage by >50%
  • Cache warming completes in <10 minutes
  • Metrics dashboard shows cache performance
  • Documentation includes cache tuning guide

Effort Estimate

13 Story Points - Complex distributed caching system

Labels

  • backend
  • performance
  • caching
  • epic-5

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions