Skip to content

[User Story] Create Comprehensive Audit Logging System #52

@prodigy

Description

@prodigy

User Story

As a system administrator, I want to track all Git operations with detailed audit logs, so that I can ensure compliance, debug issues, and analyze system usage patterns.

Acceptance Criteria

  • Log all Git operations (clone, fetch, commit, push, rebase)
  • Capture operation metadata (user, timestamp, duration, status)
  • Record conflict resolution details and Claude interactions
  • Implement structured logging with correlation IDs
  • Support log aggregation and search capabilities
  • Enable compliance reporting (SOC2, GDPR)
  • Provide audit trail for security investigations
  • Implement log retention policies
  • Support real-time log streaming
  • Generate audit reports and analytics

Technical Implementation

Audit Logger Architecture

// pkg/audit/logger.go
package audit

import (
    "context"
    "time"
    "github.com/google/uuid"
)

type AuditLogger struct {
    storage      AuditStorage
    enricher     *LogEnricher
    sanitizer    *LogSanitizer
    shipper      *LogShipper
    config       *AuditConfig
}

type AuditConfig struct {
    EnableRealtime    bool
    RetentionDays     int
    ComplianceMode    ComplianceMode
    SensitiveFields   []string
    AggregationRules  []AggregationRule
}

type AuditEntry struct {
    ID            string
    Timestamp     time.Time
    CorrelationID string
    UserID        string
    OrganizationID string
    Operation     OperationType
    Resource      ResourceInfo
    Details       map[string]interface{}
    Result        OperationResult
    Duration      time.Duration
    IPAddress     string
    UserAgent     string
    SessionID     string
}

type OperationType string

const (
    OpGitClone          OperationType = "git.clone"
    OpGitFetch          OperationType = "git.fetch"
    OpGitCommit         OperationType = "git.commit"
    OpGitPush           OperationType = "git.push"
    OpGitRebase         OperationType = "git.rebase"
    OpConflictDetected  OperationType = "conflict.detected"
    OpConflictResolved  OperationType = "conflict.resolved"
    OpClaudeRequest     OperationType = "claude.request"
    OpValidationFailed  OperationType = "validation.failed"
)

func (l *AuditLogger) LogOperation(ctx context.Context, op OperationType, details interface{}) error {
    entry := &AuditEntry{
        ID:            uuid.New().String(),
        Timestamp:     time.Now().UTC(),
        CorrelationID: GetCorrelationID(ctx),
        UserID:        GetUserID(ctx),
        OrganizationID: GetOrgID(ctx),
        Operation:     op,
        IPAddress:     GetClientIP(ctx),
        UserAgent:     GetUserAgent(ctx),
        SessionID:     GetSessionID(ctx),
    }
    
    // Enrich with operation-specific details
    enriched := l.enricher.Enrich(entry, details)
    
    // Sanitize sensitive data
    sanitized := l.sanitizer.Sanitize(enriched)
    
    // Store audit entry
    if err := l.storage.Store(ctx, sanitized); err != nil {
        return fmt.Errorf("failed to store audit entry: %w", err)
    }
    
    // Ship to aggregation system if enabled
    if l.config.EnableRealtime {
        go l.shipper.Ship(sanitized)
    }
    
    return nil
}

Git Operations Auditing

// pkg/audit/git_auditor.go
type GitAuditor struct {
    logger *AuditLogger
}

func (a *GitAuditor) AuditClone(ctx context.Context, req CloneRequest, result CloneResult) {
    details := GitCloneDetails{
        RepositoryURL:  a.sanitizeURL(req.URL),
        Branch:         req.Branch,
        Depth:          req.Depth,
        Size:           result.Size,
        Duration:       result.Duration,
        Success:        result.Error == nil,
        ErrorMessage:   a.sanitizeError(result.Error),
        CacheHit:       result.CacheHit,
    }
    
    a.logger.LogOperation(ctx, OpGitClone, details)
}

func (a *GitAuditor) AuditPush(ctx context.Context, req PushRequest, result PushResult) {
    details := GitPushDetails{
        RepositoryID:   req.RepositoryID,
        Branch:         req.Branch,
        CommitCount:    result.CommitCount,
        BytesUploaded:  result.BytesUploaded,
        Duration:       result.Duration,
        Success:        result.Error == nil,
        RetryCount:     result.RetryCount,
        ProtectedBranch: req.IsProtected,
    }
    
    a.logger.LogOperation(ctx, OpGitPush, details)
}

func (a *GitAuditor) AuditConflictResolution(ctx context.Context, conflict Conflict, resolution Resolution) {
    details := ConflictResolutionDetails{
        JobID:            GetJobID(ctx),
        FilePath:         conflict.FilePath,
        ConflictType:     conflict.Type,
        Complexity:       conflict.Complexity,
        ResolutionMethod: resolution.Method,
        ClaudeModel:      resolution.ClaudeModel,
        TokensUsed:       resolution.TokensUsed,
        Cost:             resolution.Cost,
        ValidationPassed: resolution.ValidationResult.Valid,
        Duration:         resolution.Duration,
        Success:          resolution.Success,
    }
    
    a.logger.LogOperation(ctx, OpConflictResolved, details)
}

Compliance and Security Auditing

// pkg/audit/compliance.go
type ComplianceAuditor struct {
    logger      *AuditLogger
    classifier  *DataClassifier
    regulations []Regulation
}

type Regulation interface {
    Name() string
    RequiredFields() []string
    ValidateEntry(entry *AuditEntry) error
}

type GDPRAuditor struct{}

func (g *GDPRAuditor) AuditDataAccess(ctx context.Context, access DataAccess) {
    // Log data access for GDPR compliance
    details := GDPRAccessDetails{
        DataSubjectID:   access.SubjectID,
        DataCategory:    access.Category,
        Purpose:         access.Purpose,
        LegalBasis:      access.LegalBasis,
        RetentionPeriod: access.RetentionDays,
        ThirdPartyShare: access.SharedWith,
    }
    
    // Must be stored for 3 years minimum
    ctx = context.WithValue(ctx, "retention_override", 1095)
    
    g.logger.LogOperationWithCompliance(ctx, OpDataAccess, details, ComplianceGDPR)
}

type SOC2Auditor struct{}

func (s *SOC2Auditor) AuditSecurityEvent(ctx context.Context, event SecurityEvent) {
    details := SOC2SecurityDetails{
        EventType:       event.Type,
        Severity:        event.Severity,
        AffectedSystems: event.Systems,
        RemediationSteps: event.Remediation,
        DetectionMethod: event.Detection,
        ResponseTime:    event.ResponseDuration,
    }
    
    s.logger.LogOperationWithCompliance(ctx, OpSecurityEvent, details, ComplianceSOC2)
}

Log Sanitization

// pkg/audit/sanitizer.go
type LogSanitizer struct {
    patterns []SanitizationPattern
    config   *SanitizerConfig
}

type SanitizationPattern struct {
    Name    string
    Pattern *regexp.Regexp
    Replace string
}

func DefaultSanitizationPatterns() []SanitizationPattern {
    return []SanitizationPattern{
        {
            Name:    "ssh_key",
            Pattern: regexp.MustCompile(`-----BEGIN.*PRIVATE KEY-----[\s\S]*?-----END.*PRIVATE KEY-----`),
            Replace: "[REDACTED_SSH_KEY]",
        },
        {
            Name:    "api_key",
            Pattern: regexp.MustCompile(`(?i)(api[_-]?key|token)["']?\s*[:=]\s*["']?([a-zA-Z0-9_\-\.]+)`),
            Replace: "$1=[REDACTED]",
        },
        {
            Name:    "password",
            Pattern: regexp.MustCompile(`(?i)(password|passwd|pwd)["']?\s*[:=]\s*["']?([^"'\s]+)`),
            Replace: "$1=[REDACTED]",
        },
        {
            Name:    "credit_card",
            Pattern: regexp.MustCompile(`\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b`),
            Replace: "[REDACTED_CC]",
        },
    }
}

func (s *LogSanitizer) Sanitize(entry *AuditEntry) *AuditEntry {
    sanitized := *entry // Copy
    
    // Sanitize string fields
    sanitized.Details = s.sanitizeMap(entry.Details)
    
    // Hash sensitive identifiers if configured
    if s.config.HashUserIDs {
        sanitized.UserID = s.hashID(entry.UserID)
    }
    
    return &sanitized
}

func (s *LogSanitizer) sanitizeMap(data map[string]interface{}) map[string]interface{} {
    result := make(map[string]interface{})
    
    for key, value := range data {
        switch v := value.(type) {
        case string:
            result[key] = s.sanitizeString(v)
        case map[string]interface{}:
            result[key] = s.sanitizeMap(v)
        default:
            result[key] = value
        }
    }
    
    return result
}

Log Aggregation and Search

// pkg/audit/aggregator.go
type LogAggregator struct {
    storage     AuditStorage
    indexer     *LogIndexer
    searcher    *LogSearcher
}

type SearchQuery struct {
    TimeRange    TimeRange
    Operations   []OperationType
    Users        []string
    Resources    []string
    FullText     string
    Filters      map[string]interface{}
    Aggregations []Aggregation
}

func (a *LogAggregator) Search(ctx context.Context, query SearchQuery) (*SearchResult, error) {
    // Build search query
    esQuery := a.buildElasticsearchQuery(query)
    
    // Execute search
    results, err := a.searcher.Search(ctx, esQuery)
    if err != nil {
        return nil, err
    }
    
    // Apply aggregations
    if len(query.Aggregations) > 0 {
        results.Aggregations = a.applyAggregations(results.Entries, query.Aggregations)
    }
    
    return results, nil
}

func (a *LogAggregator) GenerateReport(ctx context.Context, reportType ReportType, params ReportParams) (*Report, error) {
    switch reportType {
    case ReportTypeCompliance:
        return a.generateComplianceReport(ctx, params)
    case ReportTypeSecurity:
        return a.generateSecurityReport(ctx, params)
    case ReportTypeUsage:
        return a.generateUsageReport(ctx, params)
    default:
        return nil, fmt.Errorf("unknown report type: %s", reportType)
    }
}

Real-time Log Streaming

// pkg/audit/streaming.go
type LogStreamer struct {
    subscribers map[string]chan *AuditEntry
    filters     map[string]StreamFilter
    mu          sync.RWMutex
}

type StreamFilter struct {
    Operations []OperationType
    Users      []string
    Resources  []string
}

func (s *LogStreamer) Subscribe(ctx context.Context, filter StreamFilter) (<-chan *AuditEntry, error) {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    id := uuid.New().String()
    ch := make(chan *AuditEntry, 100)
    
    s.subscribers[id] = ch
    s.filters[id] = filter
    
    // Clean up on context cancellation
    go func() {
        <-ctx.Done()
        s.Unsubscribe(id)
    }()
    
    return ch, nil
}

func (s *LogStreamer) Publish(entry *AuditEntry) {
    s.mu.RLock()
    defer s.mu.RUnlock()
    
    for id, ch := range s.subscribers {
        filter := s.filters[id]
        
        if s.matchesFilter(entry, filter) {
            select {
            case ch <- entry:
            default:
                // Channel full, skip
            }
        }
    }
}

Retention and Archival

// pkg/audit/retention.go
type RetentionManager struct {
    storage  AuditStorage
    archiver *LogArchiver
    config   *RetentionConfig
}

type RetentionPolicy struct {
    Name            string
    RetentionDays   int
    ArchiveEnabled  bool
    CompressArchive bool
    Operations      []OperationType
}

func (m *RetentionManager) ApplyRetentionPolicies(ctx context.Context) error {
    for _, policy := range m.config.Policies {
        if err := m.applyPolicy(ctx, policy); err != nil {
            log.Errorf("Failed to apply retention policy %s: %v", policy.Name, err)
        }
    }
    
    return nil
}

func (m *RetentionManager) applyPolicy(ctx context.Context, policy RetentionPolicy) error {
    cutoff := time.Now().AddDate(0, 0, -policy.RetentionDays)
    
    // Find entries to process
    entries, err := m.storage.FindOlderThan(ctx, cutoff, policy.Operations)
    if err != nil {
        return err
    }
    
    if policy.ArchiveEnabled {
        // Archive before deletion
        archive := &Archive{
            Name:      fmt.Sprintf("%s_%s", policy.Name, time.Now().Format("20060102")),
            Entries:   entries,
            Compressed: policy.CompressArchive,
        }
        
        if err := m.archiver.Archive(ctx, archive); err != nil {
            return fmt.Errorf("archival failed: %w", err)
        }
    }
    
    // Delete old entries
    return m.storage.DeleteBatch(ctx, entries)
}

Analytics and Reporting

// pkg/audit/analytics.go
type AuditAnalytics struct {
    aggregator *LogAggregator
    visualizer *DataVisualizer
}

func (a *AuditAnalytics) GenerateUsageMetrics(ctx context.Context, timeRange TimeRange) (*UsageMetrics, error) {
    // Aggregate by operation type
    opCounts, err := a.aggregator.CountByOperation(ctx, timeRange)
    if err != nil {
        return nil, err
    }
    
    // Calculate trends
    trends := a.calculateTrends(opCounts, timeRange)
    
    // Top users
    topUsers, err := a.aggregator.TopUsers(ctx, timeRange, 10)
    if err != nil {
        return nil, err
    }
    
    // Conflict resolution stats
    conflictStats, err := a.analyzeConflictResolutions(ctx, timeRange)
    if err != nil {
        return nil, err
    }
    
    return &UsageMetrics{
        TimeRange:        timeRange,
        OperationCounts:  opCounts,
        Trends:           trends,
        TopUsers:         topUsers,
        ConflictStats:    conflictStats,
        ClaudeUsage:      a.analyzeClaudeUsage(ctx, timeRange),
    }, nil
}

Architecture References

Audit Logging Requirements

Reference: /docs/02-system-components.md:170

The architecture emphasizes audit logging for compliance:

  • Audit logging for compliance

Database Schema for Audit Logs

Reference: /docs/02-system-components.md:660-683

-- Enhanced audit logs with partitioning
CREATE TABLE audit_logs (
    id UUID DEFAULT gen_random_uuid(),
    organization_id UUID REFERENCES organizations(id),
    user_id UUID REFERENCES users(id),
    action VARCHAR(255) NOT NULL,
    resource_type VARCHAR(100),
    resource_id UUID,
    details JSONB,
    ip_address INET,
    user_agent TEXT,
    request_id UUID,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (id, created_at)
) PARTITION BY RANGE (created_at);

Git Operations Tracking

Reference: /docs/02-system-components.md:599-611

CREATE TABLE git_operations (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    job_id UUID REFERENCES jobs(id),
    operation_type VARCHAR(50) NOT NULL,
    repository_url TEXT,
    branch VARCHAR(255),
    status VARCHAR(50) NOT NULL,
    duration_ms INTEGER,
    error_message TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

Event-Driven Logging

Reference: /docs/03-data-flow.md:231-264

The system publishes events for audit logging:

graph LR
    subgraph "Event Producers"
        A[Job Manager]
        B[Git Service]
        C[Worker Container]
        D[Claude Service]
    end
    
    subgraph "Event Consumers"
        H[Audit Logger]
    end
Loading

Dependencies

  • PostgreSQL: Audit log storage with partitioning
  • Elasticsearch: Log indexing and search
  • Kafka: Real-time log streaming
  • S3: Log archival
  • Grafana: Analytics dashboards

Definition of Done

  • Unit tests cover audit operations with 90%+ coverage
  • Integration tests verify end-to-end audit trail
  • All Git operations produce audit logs
  • Sensitive data is properly sanitized
  • Compliance reports meet SOC2/GDPR requirements
  • Log search returns results in <2s
  • Documentation includes audit log schema

Effort Estimate

13 Story Points - Complex with compliance requirements

Labels

  • backend
  • security
  • compliance
  • monitoring
  • epic-5

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions