-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Description
User Story
As a developer, I want to automatically collect and process job outputs and artifacts so that I can access Claude's generated code, logs, and analysis results after job completion.
Architecture Reference
- Data Flow: docs/03-data-flow.md:74 (Upload artifacts)
- Container Layer: docs/02-system-components.md:871-903 (Volume configuration for artifacts)
- Artifact Storage: docs/02-system-components.md (S3 integration mentioned)
Key excerpt from docs/03-data-flow.md:74:
Container->>Storage: Upload artifacts
Acceptance Criteria
- Automatic collection of Claude-generated files and changes
- Log aggregation and packaging for download
- Git diff capture and storage
- Structured output format (JSON/YAML) for programmatic access
- Compression and archival of large outputs
- Metadata tagging for searchability
- Expiration policies for artifact retention
- Direct download URLs with temporary access tokens
Technical Implementation
1. Artifact Collector Service
package artifacts
import (
"archive/tar"
"compress/gzip"
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
)
type ArtifactCollector struct {
storageClient StorageClient
gitClient GitClient
compression CompressionService
metadata MetadataService
}
type CollectionRequest struct {
JobID string
WorkspaceDir string
GitRepoPath string
LogPaths []string
OutputFormat OutputFormat
}
type CollectedArtifacts struct {
JobID string
CollectionTime time.Time
GitChanges *GitChangeSet
GeneratedFiles []FileArtifact
Logs []LogArtifact
Metadata map[string]interface{}
StorageURLs map[string]string
}
type GitChangeSet struct {
DiffSummary string
FilesChanged []string
Additions int
Deletions int
CommitHash string
UnifiedDiff string
BinaryFiles []string
}
func (ac *ArtifactCollector) CollectJobArtifacts(ctx context.Context, req CollectionRequest) (*CollectedArtifacts, error) {
artifacts := &CollectedArtifacts{
JobID: req.JobID,
CollectionTime: time.Now(),
Metadata: make(map[string]interface{}),
StorageURLs: make(map[string]string),
}
// Phase 1: Collect Git changes
if req.GitRepoPath != "" {
changes, err := ac.collectGitChanges(ctx, req.GitRepoPath)
if err != nil {
return nil, fmt.Errorf("failed to collect git changes: %w", err)
}
artifacts.GitChanges = changes
}
// Phase 2: Collect generated files
generatedFiles, err := ac.collectGeneratedFiles(ctx, req.WorkspaceDir, req.GitRepoPath)
if err != nil {
return nil, fmt.Errorf("failed to collect generated files: %w", err)
}
artifacts.GeneratedFiles = generatedFiles
// Phase 3: Collect and process logs
logs, err := ac.collectLogs(ctx, req.LogPaths)
if err != nil {
return nil, fmt.Errorf("failed to collect logs: %w", err)
}
artifacts.Logs = logs
// Phase 4: Create archive
archivePath, err := ac.createArtifactArchive(ctx, artifacts)
if err != nil {
return nil, fmt.Errorf("failed to create archive: %w", err)
}
defer os.Remove(archivePath)
// Phase 5: Upload to storage
storageURLs, err := ac.uploadArtifacts(ctx, req.JobID, archivePath, artifacts)
if err != nil {
return nil, fmt.Errorf("failed to upload artifacts: %w", err)
}
artifacts.StorageURLs = storageURLs
return artifacts, nil
}
func (ac *ArtifactCollector) collectGitChanges(ctx context.Context, repoPath string) (*GitChangeSet, error) {
changes := &GitChangeSet{}
// Get current HEAD commit
headCommit, err := ac.gitClient.GetHeadCommit(repoPath)
if err != nil {
return nil, err
}
changes.CommitHash = headCommit
// Get diff statistics
stats, err := ac.gitClient.GetDiffStats(repoPath, "HEAD~1", "HEAD")
if err != nil {
// If no previous commit, get diff against empty tree
stats, err = ac.gitClient.GetDiffStats(repoPath, "4b825dc642cb6eb9a060e54bf8d69288fbee4904", "HEAD")
if err != nil {
return nil, err
}
}
changes.FilesChanged = stats.Files
changes.Additions = stats.Additions
changes.Deletions = stats.Deletions
changes.DiffSummary = fmt.Sprintf("%d files changed, %d insertions(+), %d deletions(-)",
len(stats.Files), stats.Additions, stats.Deletions)
// Get unified diff (with size limit)
unifiedDiff, err := ac.gitClient.GetUnifiedDiff(repoPath, "HEAD~1", "HEAD", 1000000) // 1MB limit
if err == nil {
changes.UnifiedDiff = unifiedDiff
}
// Identify binary files
for _, file := range stats.Files {
if isBinaryFile(filepath.Join(repoPath, file)) {
changes.BinaryFiles = append(changes.BinaryFiles, file)
}
}
return changes, nil
}
func (ac *ArtifactCollector) collectGeneratedFiles(ctx context.Context, workspaceDir, gitRepoPath string) ([]FileArtifact, error) {
var artifacts []FileArtifact
// Define patterns for important generated files
importantPatterns := []string{
"**/*.md", // Documentation
"**/*.json", // Configuration files
"**/*.yaml", // Configuration files
"**/*.yml", // Configuration files
"**/Dockerfile*", // Docker files
"**/*.sh", // Scripts
"**/Makefile*", // Build files
}
err := filepath.Walk(workspaceDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return nil // Skip files we can't access
}
// Skip .git directory and node_modules
if strings.Contains(path, "/.git/") || strings.Contains(path, "/node_modules/") {
return filepath.SkipDir
}
if info.IsDir() {
return nil
}
// Check if file matches important patterns
relPath, _ := filepath.Rel(workspaceDir, path)
isImportant := false
for _, pattern := range importantPatterns {
if matched, _ := filepath.Match(pattern, relPath); matched {
isImportant = true
break
}
}
// Check if file was modified by Claude
wasModified := false
if gitRepoPath != "" {
gitRelPath, _ := filepath.Rel(gitRepoPath, path)
status, _ := ac.gitClient.GetFileStatus(gitRepoPath, gitRelPath)
wasModified = status == "modified" || status == "added"
}
if isImportant || wasModified {
content, err := os.ReadFile(path)
if err != nil {
return nil
}
// Calculate checksum
hash := sha256.Sum256(content)
artifact := FileArtifact{
Path: relPath,
Size: info.Size(),
ModTime: info.ModTime(),
SHA256: fmt.Sprintf("%x", hash),
IsGenerated: wasModified,
IsImportant: isImportant,
ContentType: detectContentType(path),
}
// Store first 1KB preview for text files
if strings.HasPrefix(artifact.ContentType, "text/") && len(content) > 0 {
preview := content
if len(preview) > 1024 {
preview = preview[:1024]
}
artifact.Preview = string(preview)
}
artifacts = append(artifacts, artifact)
}
return nil
})
return artifacts, err
}
func (ac *ArtifactCollector) createArtifactArchive(ctx context.Context, artifacts *CollectedArtifacts) (string, error) {
// Create temporary file for archive
tmpFile, err := os.CreateTemp("", fmt.Sprintf("artifacts-%s-*.tar.gz", artifacts.JobID))
if err != nil {
return "", err
}
defer tmpFile.Close()
// Create gzip writer
gzWriter := gzip.NewWriter(tmpFile)
defer gzWriter.Close()
// Create tar writer
tarWriter := tar.NewWriter(gzWriter)
defer tarWriter.Close()
// Write metadata file
metadataJSON, _ := json.MarshalIndent(artifacts, "", " ")
metadataHeader := &tar.Header{
Name: "metadata.json",
Mode: 0644,
Size: int64(len(metadataJSON)),
ModTime: time.Now(),
Typeflag: tar.TypeReg,
}
if err := tarWriter.WriteHeader(metadataHeader); err != nil {
return "", err
}
if _, err := tarWriter.Write(metadataJSON); err != nil {
return "", err
}
// Write git diff if available
if artifacts.GitChanges != nil && artifacts.GitChanges.UnifiedDiff != "" {
diffHeader := &tar.Header{
Name: "changes.diff",
Mode: 0644,
Size: int64(len(artifacts.GitChanges.UnifiedDiff)),
ModTime: time.Now(),
Typeflag: tar.TypeReg,
}
if err := tarWriter.WriteHeader(diffHeader); err != nil {
return "", err
}
if _, err := tarWriter.Write([]byte(artifacts.GitChanges.UnifiedDiff)); err != nil {
return "", err
}
}
// Write logs
for _, log := range artifacts.Logs {
logHeader := &tar.Header{
Name: fmt.Sprintf("logs/%s", log.Name),
Mode: 0644,
Size: int64(len(log.Content)),
ModTime: log.Timestamp,
Typeflag: tar.TypeReg,
}
if err := tarWriter.WriteHeader(logHeader); err != nil {
return "", err
}
if _, err := tarWriter.Write([]byte(log.Content)); err != nil {
return "", err
}
}
return tmpFile.Name(), nil
}2. Structured Output Formatter
package output
import (
"encoding/json"
"encoding/xml"
"fmt"
"gopkg.in/yaml.v3"
)
type OutputFormatter struct {
formats map[OutputFormat]Formatter
}
type OutputFormat string
const (
FormatJSON OutputFormat = "json"
FormatYAML OutputFormat = "yaml"
FormatXML OutputFormat = "xml"
FormatMarkdown OutputFormat = "markdown"
FormatStructured OutputFormat = "structured"
)
type JobOutput struct {
JobID string `json:"job_id" yaml:"job_id"`
Status string `json:"status" yaml:"status"`
StartTime time.Time `json:"start_time" yaml:"start_time"`
EndTime time.Time `json:"end_time" yaml:"end_time"`
Duration time.Duration `json:"duration" yaml:"duration"`
Summary ExecutionSummary `json:"summary" yaml:"summary"`
Changes []Change `json:"changes" yaml:"changes"`
Outputs map[string]interface{} `json:"outputs" yaml:"outputs"`
Errors []Error `json:"errors,omitempty" yaml:"errors,omitempty"`
Metrics ExecutionMetrics `json:"metrics" yaml:"metrics"`
}
type ExecutionSummary struct {
Task string `json:"task" yaml:"task"`
Repository string `json:"repository" yaml:"repository"`
Branch string `json:"branch" yaml:"branch"`
FilesModified int `json:"files_modified" yaml:"files_modified"`
FilesAdded int `json:"files_added" yaml:"files_added"`
FilesDeleted int `json:"files_deleted" yaml:"files_deleted"`
LinesAdded int `json:"lines_added" yaml:"lines_added"`
LinesDeleted int `json:"lines_deleted" yaml:"lines_deleted"`
ClaudeModel string `json:"claude_model" yaml:"claude_model"`
TokensUsed int `json:"tokens_used" yaml:"tokens_used"`
KeyHighlights []string `json:"key_highlights" yaml:"key_highlights"`
}
func (of *OutputFormatter) FormatJobOutput(output *JobOutput, format OutputFormat) ([]byte, error) {
switch format {
case FormatJSON:
return of.formatJSON(output)
case FormatYAML:
return of.formatYAML(output)
case FormatMarkdown:
return of.formatMarkdown(output)
case FormatStructured:
return of.formatStructured(output)
default:
return nil, fmt.Errorf("unsupported format: %s", format)
}
}
func (of *OutputFormatter) formatMarkdown(output *JobOutput) ([]byte, error) {
md := &strings.Builder{}
fmt.Fprintf(md, "# Job Execution Report\n\n")
fmt.Fprintf(md, "**Job ID:** `%s`\n", output.JobID)
fmt.Fprintf(md, "**Status:** %s\n", output.Status)
fmt.Fprintf(md, "**Duration:** %s\n\n", output.Duration)
fmt.Fprintf(md, "## Summary\n\n")
fmt.Fprintf(md, "- **Task:** %s\n", output.Summary.Task)
fmt.Fprintf(md, "- **Repository:** %s\n", output.Summary.Repository)
fmt.Fprintf(md, "- **Branch:** %s\n", output.Summary.Branch)
fmt.Fprintf(md, "- **Files Modified:** %d (added: %d, deleted: %d)\n",
output.Summary.FilesModified, output.Summary.FilesAdded, output.Summary.FilesDeleted)
fmt.Fprintf(md, "- **Lines Changed:** +%d -%d\n",
output.Summary.LinesAdded, output.Summary.LinesDeleted)
fmt.Fprintf(md, "- **Claude Model:** %s (tokens: %d)\n\n",
output.Summary.ClaudeModel, output.Summary.TokensUsed)
if len(output.Summary.KeyHighlights) > 0 {
fmt.Fprintf(md, "### Key Highlights\n\n")
for _, highlight := range output.Summary.KeyHighlights {
fmt.Fprintf(md, "- %s\n", highlight)
}
fmt.Fprintf(md, "\n")
}
if len(output.Changes) > 0 {
fmt.Fprintf(md, "## Changes\n\n")
for _, change := range output.Changes {
fmt.Fprintf(md, "### %s\n\n", change.File)
fmt.Fprintf(md, "- **Action:** %s\n", change.Action)
fmt.Fprintf(md, "- **Type:** %s\n", change.Type)
if change.Description != "" {
fmt.Fprintf(md, "- **Description:** %s\n", change.Description)
}
fmt.Fprintf(md, "\n")
}
}
if len(output.Errors) > 0 {
fmt.Fprintf(md, "## Errors\n\n")
for _, err := range output.Errors {
fmt.Fprintf(md, "- **%s:** %s\n", err.Type, err.Message)
if err.Context != "" {
fmt.Fprintf(md, " - Context: %s\n", err.Context)
}
}
}
return []byte(md.String()), nil
}3. Streaming Artifact Processor
package streaming
import (
"bufio"
"context"
"io"
"sync"
)
type StreamingProcessor struct {
bufferSize int
maxLineLen int
outputBuffer *CircularBuffer
patterns []PatternMatcher
}
type PatternMatcher struct {
Name string
Pattern *regexp.Regexp
Handler func(match []string, context string)
}
func (sp *StreamingProcessor) ProcessJobOutput(ctx context.Context, reader io.Reader) (*ProcessedOutput, error) {
scanner := bufio.NewScanner(reader)
scanner.Buffer(make([]byte, sp.maxLineLen), sp.maxLineLen)
output := &ProcessedOutput{
Sections: make(map[string][]string),
Highlights: make([]Highlight, 0),
Statistics: make(map[string]int),
StartTime: time.Now(),
}
lineNum := 0
var currentSection string
sectionContent := make([]string, 0)
for scanner.Scan() {
lineNum++
line := scanner.Text()
// Detect section changes
if newSection := sp.detectSection(line); newSection != "" {
if currentSection != "" {
output.Sections[currentSection] = sectionContent
sectionContent = make([]string, 0)
}
currentSection = newSection
}
// Apply pattern matching
for _, matcher := range sp.patterns {
if matches := matcher.Pattern.FindStringSubmatch(line); matches != nil {
highlight := Highlight{
LineNumber: lineNum,
Text: line,
Type: matcher.Name,
Timestamp: time.Now(),
}
output.Highlights = append(output.Highlights, highlight)
if matcher.Handler != nil {
matcher.Handler(matches, sp.getContext(lineNum))
}
}
}
// Add to current section
sectionContent = append(sectionContent, line)
// Update statistics
sp.updateStatistics(output.Statistics, line)
}
// Save final section
if currentSection != "" {
output.Sections[currentSection] = sectionContent
}
output.EndTime = time.Now()
output.TotalLines = lineNum
return output, scanner.Err()
}
// Pattern definitions for important events
func (sp *StreamingProcessor) initializePatterns() {
sp.patterns = []PatternMatcher{
{
Name: "error",
Pattern: regexp.MustCompile(`(?i)(error|exception|failed|failure):\s*(.+)`),
Handler: func(match []string, context string) {
// Log error for alerting
log.Errorf("Job error detected: %s", match[2])
},
},
{
Name: "file_created",
Pattern: regexp.MustCompile(`(?i)created?\s+file:\s*(.+)`),
},
{
Name: "file_modified",
Pattern: regexp.MustCompile(`(?i)modified?\s+file:\s*(.+)`),
},
{
Name: "claude_response",
Pattern: regexp.MustCompile(`Claude response:\s*(.+)`),
},
{
Name: "git_commit",
Pattern: regexp.MustCompile(`\[(\w+)\s+([0-9a-f]{7,})\]\s+(.+)`),
},
}
}4. Artifact Storage Manager
package storage
import (
"fmt"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
)
type ArtifactStorageManager struct {
s3Client *s3.S3
bucketName string
cdnBaseURL string
signatureTTL time.Duration
retentionDays map[string]int
}
func (asm *ArtifactStorageManager) StoreArtifacts(ctx context.Context, jobID string, artifacts map[string]io.Reader) (map[string]*StorageResult, error) {
results := make(map[string]*StorageResult)
for name, reader := range artifacts {
key := asm.generateStorageKey(jobID, name)
// Upload with appropriate storage class
storageClass := asm.determineStorageClass(name)
putInput := &s3.PutObjectInput{
Bucket: aws.String(asm.bucketName),
Key: aws.String(key),
Body: reader,
StorageClass: aws.String(storageClass),
Metadata: aws.StringMap(map[string]string{
"job-id": jobID,
"artifact": name,
"uploaded-at": time.Now().Format(time.RFC3339),
}),
}
// Set expiration based on artifact type
if days, ok := asm.retentionDays[name]; ok {
putInput.Expires = aws.Time(time.Now().AddDate(0, 0, days))
}
result, err := asm.s3Client.PutObjectWithContext(ctx, putInput)
if err != nil {
return nil, fmt.Errorf("failed to upload %s: %w", name, err)
}
// Generate signed URL for download
downloadURL, err := asm.generateDownloadURL(key)
if err != nil {
return nil, err
}
results[name] = &StorageResult{
Key: key,
ETag: *result.ETag,
DownloadURL: downloadURL,
CDNUrl: fmt.Sprintf("%s/%s", asm.cdnBaseURL, key),
ExpiresAt: time.Now().Add(asm.signatureTTL),
}
}
return results, nil
}
func (asm *ArtifactStorageManager) generateDownloadURL(key string) (string, error) {
req, _ := asm.s3Client.GetObjectRequest(&s3.GetObjectInput{
Bucket: aws.String(asm.bucketName),
Key: aws.String(key),
})
urlStr, err := req.Presign(asm.signatureTTL)
if err != nil {
return "", fmt.Errorf("failed to generate presigned URL: %w", err)
}
return urlStr, nil
}
// Lifecycle policy configuration
func (asm *ArtifactStorageManager) SetupLifecyclePolicy() error {
rules := []*s3.LifecycleRule{
{
ID: aws.String("transition-logs-to-glacier"),
Status: aws.String("Enabled"),
Filter: &s3.LifecycleRuleFilter{
Prefix: aws.String("logs/"),
},
Transitions: []*s3.Transition{
{
Days: aws.Int64(30),
StorageClass: aws.String("GLACIER"),
},
},
},
{
ID: aws.String("expire-temp-artifacts"),
Status: aws.String("Enabled"),
Filter: &s3.LifecycleRuleFilter{
Prefix: aws.String("temp/"),
},
Expiration: &s3.LifecycleExpiration{
Days: aws.Int64(7),
},
},
}
_, err := asm.s3Client.PutBucketLifecycleConfiguration(&s3.PutBucketLifecycleConfigurationInput{
Bucket: aws.String(asm.bucketName),
LifecycleConfiguration: &s3.BucketLifecycleConfiguration{
Rules: rules,
},
})
return err
}5. Real-time Artifact Streaming
package realtime
import (
"github.com/gorilla/websocket"
)
type ArtifactStreamer struct {
hub *StreamHub
collectors map[string]*StreamCollector
bufferSize int
}
type StreamHub struct {
clients map[string]*StreamClient
broadcast chan StreamMessage
register chan *StreamClient
unregister chan *StreamClient
}
type StreamMessage struct {
JobID string `json:"job_id"`
Type string `json:"type"`
Timestamp time.Time `json:"timestamp"`
Data interface{} `json:"data"`
}
func (as *ArtifactStreamer) StreamJobArtifacts(jobID string, conn *websocket.Conn) error {
client := &StreamClient{
JobID: jobID,
Conn: conn,
Send: make(chan StreamMessage, 256),
}
as.hub.register <- client
defer func() {
as.hub.unregister <- client
}()
// Start collector for this job
collector := as.startCollector(jobID)
// Stream artifacts as they're generated
go func() {
for artifact := range collector.Artifacts {
message := StreamMessage{
JobID: jobID,
Type: "artifact",
Timestamp: time.Now(),
Data: map[string]interface{}{
"name": artifact.Name,
"type": artifact.Type,
"size": artifact.Size,
"url": artifact.URL,
},
}
client.Send <- message
}
}()
// Handle client messages
for {
select {
case message, ok := <-client.Send:
if !ok {
return nil
}
if err := conn.WriteJSON(message); err != nil {
return err
}
case <-time.After(60 * time.Second):
// Send ping to keep connection alive
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return err
}
}
}
}6. Artifact Search and Indexing
type ArtifactIndexer struct {
elasticClient *elastic.Client
indexName string
}
func (ai *ArtifactIndexer) IndexArtifact(artifact *CollectedArtifacts) error {
doc := map[string]interface{}{
"job_id": artifact.JobID,
"timestamp": artifact.CollectionTime,
"repository": artifact.Metadata["repository"],
"branch": artifact.Metadata["branch"],
"task": artifact.Metadata["task"],
"files_changed": len(artifact.GeneratedFiles),
"total_size": calculateTotalSize(artifact),
"git_commit": artifact.GitChanges.CommitHash,
"status": artifact.Metadata["status"],
"tags": extractTags(artifact),
}
_, err := ai.elasticClient.Index().
Index(ai.indexName).
Id(artifact.JobID).
BodyJson(doc).
Do(context.Background())
return err
}Dependencies
- S3-compatible object storage (AWS S3, MinIO)
- Compression libraries (gzip, tar)
- Git command-line tools
- Optional: Elasticsearch for artifact search
Definition of Done
- Artifact collection runs after every job completion
- All generated files are captured and stored
- Git diffs are properly formatted and stored
- Multiple output formats supported (JSON, YAML, Markdown)
- Download URLs generated with proper expiration
- Large artifacts compressed efficiently
- Retention policies enforced automatically
- Performance impact on job execution < 5%
Effort Estimate
Story Points: 8 (Complex file processing and storage requirements)
Labels
- epic/container-execution
- priority/high
- component/artifacts
- size/l
- storage