From bcdc1e8fe4d5c8119910c94ab334f891a8817391 Mon Sep 17 00:00:00 2001 From: Maximiliano Churichi Date: Fri, 6 Mar 2026 01:51:24 +0000 Subject: [PATCH] test: address review feedback with table-driven coverage improvements --- cmd/peek/main_test.go | 447 ++++++++++++++++++++++++++++ pkg/parser/parser_test.go | 66 +++- pkg/query/lucene_behavior_test.go | 163 ++++++++++ pkg/server/server_test.go | 376 +++++++++++++++++++++++ pkg/storage/badger_behavior_test.go | 195 ++++++++++++ 5 files changed, 1239 insertions(+), 8 deletions(-) create mode 100644 cmd/peek/main_test.go create mode 100644 pkg/query/lucene_behavior_test.go create mode 100644 pkg/server/server_test.go create mode 100644 pkg/storage/badger_behavior_test.go diff --git a/cmd/peek/main_test.go b/cmd/peek/main_test.go new file mode 100644 index 0000000..1b52dd1 --- /dev/null +++ b/cmd/peek/main_test.go @@ -0,0 +1,447 @@ +package main + +import ( + "io" + "net" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/mchurichi/peek/internal/config" + "github.com/mchurichi/peek/pkg/storage" +) + +func TestValidateNoPositionalArgs(t *testing.T) { + if err := validateNoPositionalArgs(nil); err != nil { + t.Fatalf("expected nil error, got %v", err) + } + if err := validateNoPositionalArgs([]string{"unexpected"}); err == nil { + t.Fatalf("expected error for positional args") + } +} + +func TestParseDuration(t *testing.T) { + tests := []struct { + in string + want time.Duration + err bool + }{ + {in: "24h", want: 24 * time.Hour}, + {in: "7d", want: 168 * time.Hour}, + {in: "2w", want: 336 * time.Hour}, + {in: "bad", err: true}, + } + for _, tt := range tests { + got, err := parseDuration(tt.in) + if tt.err { + if err == nil { + t.Fatalf("parseDuration(%q) expected error", tt.in) + } + continue + } + if err != nil { + t.Fatalf("parseDuration(%q) error = %v", tt.in, err) + } + if got != tt.want { + t.Fatalf("parseDuration(%q) = %v, want %v", tt.in, got, tt.want) + } + } +} + +func TestExpandPath(t *testing.T) { + home := t.TempDir() + t.Setenv("HOME", home) + got := expandPath("~/peek") + want := filepath.Join(home, "peek") + if got != want { + t.Fatalf("expandPath mismatch: got %q want %q", got, want) + } + + if plain := expandPath("/tmp/peek"); plain != "/tmp/peek" { + t.Fatalf("plain path changed: %q", plain) + } +} + +func TestRunDbCommandValidation(t *testing.T) { + tests := []struct { + name string + args []string + }{ + {name: "missing", args: nil}, + {name: "unknown", args: []string{"unknown"}}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := runDbCommand(tt.args); err == nil { + t.Fatalf("expected error for args=%v", tt.args) + } + }) + } +} + +func TestRunDbSubcommandsWithTempDB(t *testing.T) { + dbPath := t.TempDir() + + tests := []struct { + name string + run func() error + }{ + {name: "stats", run: func() error { return runDbStats([]string{"--db-path", dbPath}) }}, + {name: "clean level", run: func() error { return runDbClean([]string{"--db-path", dbPath, "--level", "DEBUG", "--force"}) }}, + {name: "clean older than", run: func() error { return runDbClean([]string{"--db-path", dbPath, "--older-than", "1h", "--force"}) }}, + {name: "clean all", run: func() error { return runDbClean([]string{"--db-path", dbPath, "--force"}) }}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := tt.run(); err != nil { + t.Fatalf("%s error = %v", tt.name, err) + } + }) + } +} + +func TestRunServerModeGracefulShutdown(t *testing.T) { + cfg := &config.Config{} + *cfg = *config.DefaultConfig() + cfg.Storage.DBPath = t.TempDir() + cfg.Server.AutoOpenBrowser = false + cfg.Server.Port = 0 + + go func() { + time.Sleep(300 * time.Millisecond) + p, _ := os.FindProcess(os.Getpid()) + _ = p.Signal(os.Interrupt) + }() + + if err := runServerMode(cfg); err != nil { + t.Fatalf("runServerMode() error = %v", err) + } +} + +func TestRunCollectModeProcessesInputAndShutsDown(t *testing.T) { + cfg := &config.Config{} + *cfg = *config.DefaultConfig() + cfg.Storage.DBPath = t.TempDir() + cfg.Server.AutoOpenBrowser = false + cfg.Server.Port = 0 + cfg.Parsing.Format = "json" + + orig := os.Stdin + r, w, err := os.Pipe() + if err != nil { + t.Fatalf("os.Pipe() error = %v", err) + } + os.Stdin = r + defer func() { os.Stdin = orig }() + + go func() { + _, _ = w.WriteString(`{"timestamp":"2025-01-01T00:00:00Z","level":"INFO","message":"ok"}` + "\n") + _ = w.Close() + }() + + go func() { + time.Sleep(500 * time.Millisecond) + p, _ := os.FindProcess(os.Getpid()) + _ = p.Signal(os.Interrupt) + }() + + if err := runCollectMode(cfg, true); err != nil { + t.Fatalf("runCollectMode() error = %v", err) + } +} + +func captureStdout(t *testing.T, fn func()) string { + t.Helper() + orig := os.Stdout + r, w, err := os.Pipe() + if err != nil { + t.Fatalf("os.Pipe() error = %v", err) + } + os.Stdout = w + fn() + _ = w.Close() + os.Stdout = orig + out, _ := io.ReadAll(r) + return string(out) +} + +func TestPrintHelpersAndMainSubcommands(t *testing.T) { + tests := []struct { + name string + run func(*testing.T) + }{ + {name: "printVersion", run: func(t *testing.T) { + if out := captureStdout(t, printVersion); out == "" { + t.Fatalf("printVersion produced no output") + } + }}, + {name: "printHelp", run: func(t *testing.T) { + if out := captureStdout(t, printHelp); out == "" { + t.Fatalf("printHelp produced no output") + } + }}, + {name: "main version", run: func(t *testing.T) { + origArgs := os.Args + defer func() { os.Args = origArgs }() + os.Args = []string{"peek", "version"} + if out := captureStdout(t, main); out == "" { + t.Fatalf("main version output empty") + } + }}, + {name: "main help", run: func(t *testing.T) { + origArgs := os.Args + defer func() { os.Args = origArgs }() + os.Args = []string{"peek", "help"} + if out := captureStdout(t, main); out == "" { + t.Fatalf("main help output empty") + } + }}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { tt.run(t) }) + } +} + +func TestIsStdinPipedAndOpenBrowser(t *testing.T) { + orig := os.Stdin + r, w, err := os.Pipe() + if err != nil { + t.Fatalf("os.Pipe() error = %v", err) + } + defer func() { + os.Stdin = orig + _ = r.Close() + _ = w.Close() + }() + + os.Stdin = r + if !isStdinPiped() { + t.Fatalf("expected stdin to look piped") + } + + openBrowser("http://localhost:0") +} + +func TestMainDbSubcommands(t *testing.T) { + dbPath := t.TempDir() + origArgs := os.Args + defer func() { os.Args = origArgs }() + + tests := [][]string{ + {"peek", "db", "stats", "--db-path", dbPath}, + {"peek", "db", "clean", "--db-path", dbPath, "--force"}, + } + + for _, args := range tests { + os.Args = args + _ = captureStdout(t, main) + } +} + +func TestMainCollectModeFlow(t *testing.T) { + dbPath := t.TempDir() + origArgs := os.Args + origStdin := os.Stdin + defer func() { + os.Args = origArgs + os.Stdin = origStdin + }() + + r, w, err := os.Pipe() + if err != nil { + t.Fatalf("os.Pipe() error = %v", err) + } + os.Stdin = r + + os.Args = []string{"peek", "--db-path", dbPath, "--format", "json", "--no-browser", "--port", "0", "--all"} + + go func() { + _, _ = w.WriteString(`{"timestamp":"2025-01-01T00:00:00Z","level":"INFO","message":"from-main"}` + "\n") + _ = w.Close() + }() + go func() { + time.Sleep(700 * time.Millisecond) + p, _ := os.FindProcess(os.Getpid()) + _ = p.Signal(os.Interrupt) + }() + + main() +} + +func TestRunDbCleanValidationAndAbortPaths(t *testing.T) { + dbPath := t.TempDir() + + if err := runDbClean([]string{"--db-path", dbPath, "--older-than", "nonsense", "--force"}); err == nil { + t.Fatalf("expected invalid duration error") + } + + if err := runDbClean([]string{"--db-path", dbPath, "--level", "MISSING", "--force"}); err != nil { + t.Fatalf("expected no-op clean for missing level, got %v", err) + } + + origStdin := os.Stdin + r, w, err := os.Pipe() + if err != nil { + t.Fatalf("os.Pipe() error = %v", err) + } + os.Stdin = r + defer func() { + os.Stdin = origStdin + _ = r.Close() + _ = w.Close() + }() + + _, _ = w.WriteString("n\n") + _ = w.Close() + if err := runDbClean([]string{"--db-path", dbPath}); err != nil { + t.Fatalf("runDbClean abort path error = %v", err) + } +} + +func TestRunDbStatsConfigError(t *testing.T) { + cfg := filepath.Join(t.TempDir(), "bad.toml") + if err := os.WriteFile(cfg, []byte("[storage\n"), 0o644); err != nil { + t.Fatalf("WriteFile() error = %v", err) + } + if err := runDbStats([]string{"--config", cfg}); err == nil { + t.Fatalf("expected config parse error") + } +} + +func TestRunServerModePortInUseReturnsError(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + defer ln.Close() + port := ln.Addr().(*net.TCPAddr).Port + + cfg := config.DefaultConfig() + cfg.Storage.DBPath = t.TempDir() + cfg.Server.AutoOpenBrowser = false + cfg.Server.Port = port + + err = runServerMode(cfg) + if err == nil { + t.Fatalf("expected server start error for occupied port") + } +} + +func TestRunDbStatsAndCleanWithExistingLogs(t *testing.T) { + dbPath := t.TempDir() + db, err := storage.NewBadgerStorage(storage.Config{DBPath: dbPath, RetentionSize: 1024 * 1024 * 100, RetentionDays: 30}) + if err != nil { + t.Fatalf("NewBadgerStorage() error = %v", err) + } + if err := db.Store(&storage.LogEntry{ID: "1", Timestamp: time.Now().UTC().Add(-time.Hour), Level: "INFO", Message: "m1", Raw: "m1"}); err != nil { + t.Fatalf("Store() error = %v", err) + } + if err := db.Store(&storage.LogEntry{ID: "2", Timestamp: time.Now().UTC(), Level: "ERROR", Message: "m2", Raw: "m2"}); err != nil { + t.Fatalf("Store() error = %v", err) + } + if err := db.Close(); err != nil { + t.Fatalf("Close() error = %v", err) + } + + if err := runDbStats([]string{"--db-path", dbPath}); err != nil { + t.Fatalf("runDbStats() error = %v", err) + } + + origStdin := os.Stdin + r, w, err := os.Pipe() + if err != nil { + t.Fatalf("os.Pipe() error = %v", err) + } + os.Stdin = r + defer func() { + os.Stdin = origStdin + _ = r.Close() + _ = w.Close() + }() + _, _ = w.WriteString("y\n") + _ = w.Close() + + if err := runDbClean([]string{"--db-path", dbPath, "--level", "ERROR"}); err != nil { + t.Fatalf("runDbClean(confirm yes) error = %v", err) + } +} + +func TestMainFatalPaths(t *testing.T) { + tests := []struct { + name string + args string + }{ + {name: "unknown command", args: "peek unknowncmd"}, + {name: "positional arg", args: "peek --no-browser extra"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cmd := exec.Command(os.Args[0], "-test.run", "TestMainFatalHelper") + cmd.Env = append(os.Environ(), "PEEK_FATAL_HELPER_ARGS="+tt.args) + err := cmd.Run() + if err == nil { + t.Fatalf("expected non-zero exit for %s", tt.name) + } + }) + } +} + +func TestMainFatalHelper(t *testing.T) { + args := os.Getenv("PEEK_FATAL_HELPER_ARGS") + if args == "" { + t.Skip("helper") + } + os.Args = strings.Split(args, " ") + main() +} + +func TestRunCollectModeFreshModeWithExistingLogs(t *testing.T) { + dbPath := t.TempDir() + db, err := storage.NewBadgerStorage(storage.Config{DBPath: dbPath, RetentionSize: 1024 * 1024 * 100, RetentionDays: 30}) + if err != nil { + t.Fatalf("NewBadgerStorage() error = %v", err) + } + if err := db.Store(&storage.LogEntry{ID: "seed", Timestamp: time.Now().UTC().Add(-time.Minute), Level: "INFO", Message: "seed", Raw: "seed"}); err != nil { + t.Fatalf("Store() error = %v", err) + } + if err := db.Close(); err != nil { + t.Fatalf("Close() error = %v", err) + } + + cfg := config.DefaultConfig() + cfg.Storage.DBPath = dbPath + cfg.Server.AutoOpenBrowser = false + cfg.Server.Port = 0 + cfg.Parsing.Format = "json" + + orig := os.Stdin + r, w, err := os.Pipe() + if err != nil { + t.Fatalf("os.Pipe() error = %v", err) + } + os.Stdin = r + defer func() { os.Stdin = orig }() + + go func() { + _, _ = w.WriteString("not-json\n") + _, _ = w.WriteString(`{"timestamp":"2025-01-01T00:00:00Z","level":"INFO","message":"ok"}` + "\n") + _ = w.Close() + }() + + go func() { + time.Sleep(500 * time.Millisecond) + p, _ := os.FindProcess(os.Getpid()) + _ = p.Signal(os.Interrupt) + }() + + if err := runCollectMode(cfg, false); err != nil { + t.Fatalf("runCollectMode(fresh mode) error = %v", err) + } +} diff --git a/pkg/parser/parser_test.go b/pkg/parser/parser_test.go index bc325de..773344a 100644 --- a/pkg/parser/parser_test.go +++ b/pkg/parser/parser_test.go @@ -6,9 +6,9 @@ import ( func TestJSONParser_CanParse(t *testing.T) { tests := []struct { - name string - line string - want bool + name string + line string + want bool }{ { name: "valid JSON object", @@ -57,12 +57,12 @@ func TestJSONParser_Parse(t *testing.T) { wantErr bool }{ { - name: "complete JSON log", - line: `{"timestamp":"2024-01-15T10:30:00Z","level":"ERROR","message":"test error","service":"api"}`, - wantLevel: "ERROR", - wantMessage: "test error", + name: "complete JSON log", + line: `{"timestamp":"2024-01-15T10:30:00Z","level":"ERROR","message":"test error","service":"api"}`, + wantLevel: "ERROR", + wantMessage: "test error", wantFieldKey: "service", - wantErr: false, + wantErr: false, }, { name: "JSON with msg field", @@ -350,3 +350,53 @@ func TestGenerateID(t *testing.T) { t.Errorf("generateID() length = %d, want 16", len(id1)) } } + +func TestParseLogfmtEdgeCases(t *testing.T) { + tests := []struct { + name string + line string + assertion func(*testing.T, map[string]string) + }{ + { + name: "key without value", + line: `level=INFO empty=`, + assertion: func(t *testing.T, got map[string]string) { + t.Helper() + if got["empty"] != "" { + t.Fatalf("expected empty value, got %q", got["empty"]) + } + }, + }, + { + name: "ignores token without equals", + line: `level=INFO baretoken msg="ok"`, + assertion: func(t *testing.T, got map[string]string) { + t.Helper() + if _, ok := got["baretoken"]; ok { + t.Fatalf("unexpected baretoken key") + } + if got["msg"] != "ok" { + t.Fatalf("expected msg key") + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := parseLogfmt(tt.line) + tt.assertion(t, got) + }) + } +} + +func TestLogfmtParser_ParsesTimestampField(t *testing.T) { + p := NewLogfmtParser() + entry, err := p.Parse(`timestamp=2024-01-15T10:30:00Z level=INFO msg="ok"`) + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + if entry.Timestamp.IsZero() { + t.Fatalf("expected parsed timestamp") + } +} diff --git a/pkg/query/lucene_behavior_test.go b/pkg/query/lucene_behavior_test.go new file mode 100644 index 0000000..bcdd19b --- /dev/null +++ b/pkg/query/lucene_behavior_test.go @@ -0,0 +1,163 @@ +package query + +import ( + "testing" + "time" + + "github.com/mchurichi/peek/pkg/storage" +) + +func TestParseDurationExtended(t *testing.T) { + tests := []struct { + name string + input string + want time.Duration + wantErr bool + }{ + {name: "weeks", input: "2w", want: 14 * 24 * time.Hour}, + {name: "days", input: "3d", want: 3 * 24 * time.Hour}, + {name: "go duration", input: "30m", want: 30 * time.Minute}, + {name: "invalid", input: "abc", wantErr: true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parseDurationExtended(tt.input) + if tt.wantErr { + if err == nil { + t.Fatalf("parseDurationExtended(%q) expected error", tt.input) + } + return + } + if err != nil { + t.Fatalf("parseDurationExtended(%q) error = %v", tt.input, err) + } + if got != tt.want { + t.Fatalf("parseDurationExtended(%q) = %v, want %v", tt.input, got, tt.want) + } + }) + } +} + +func TestParseTimeValueFormats(t *testing.T) { + p := &parser{} + tests := []struct { + name string + input string + }{ + {name: "date", input: "2025-01-01"}, + {name: "datetime", input: "2025-01-01T10:11:12"}, + {name: "epoch milliseconds", input: "1735689600000"}, + {name: "relative now", input: "now-1h"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := p.parseTimeValue(tt.input); got.IsZero() { + t.Fatalf("parseTimeValue(%q) returned zero time", tt.input) + } + }) + } +} + +func TestQueryMatchingBehavior(t *testing.T) { + entry := &storage.LogEntry{Level: "ERROR", Message: "db failure", Fields: map[string]interface{}{"service": "api", "status": 503}} + + tests := []struct { + name string + query string + want bool + wantErr bool + }{ + {name: "boolean query", query: `(level:ERROR AND service:api) OR message:timeout`, want: true}, + {name: "not and keyword", query: `NOT level:INFO AND message:failure`, want: true}, + {name: "parse error", query: `(level:ERROR`, wantErr: true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + q, err := Parse(tt.query) + if tt.wantErr { + if err == nil { + t.Fatalf("Parse(%q) expected error", tt.query) + } + return + } + if err != nil { + t.Fatalf("Parse(%q) error = %v", tt.query, err) + } + if got := q.Match(entry); got != tt.want { + t.Fatalf("query match = %v, want %v", got, tt.want) + } + }) + } +} + +func TestParseRangeAndTokenReader(t *testing.T) { + entry := &storage.LogEntry{Fields: map[string]interface{}{"status": 503}} + + rf, err := (&parser{}).parseRange("status", "[500 TO 599]") + if err != nil { + t.Fatalf("parseRange() error = %v", err) + } + if !rf.Match(entry) { + t.Fatalf("expected numeric range filter to match") + } + + p := &parser{input: `"hello world" [1 TO 2] bare`, pos: 0} + tokens := []string{`"hello world"`, `[1 TO 2]`, `bare`} + for i, want := range tokens { + if i > 0 { + p.skipWhitespace() + } + if got := p.readToken(); got != want { + t.Fatalf("token %d = %q, want %q", i, got, want) + } + } +} + +func TestFilterMatchingEdgeCases(t *testing.T) { + entry := &storage.LogEntry{ + Timestamp: time.Date(2025, 1, 1, 10, 0, 0, 0, time.UTC), + Level: "INFO", + Message: "service started", + Fields: map[string]interface{}{ + "service": "api-server", + "latency": "45.5", + "status": 201, + }, + } + + tests := []struct { + name string + filter Filter + want bool + }{ + {name: "exact field match", filter: &FieldFilter{Field: "message", Value: "service started", Exact: true}, want: true}, + {name: "wildcard message", filter: &WildcardFilter{Field: "message", Pattern: "service*"}, want: true}, + {name: "wildcard missing field", filter: &WildcardFilter{Field: "missing", Pattern: "*"}, want: false}, + {name: "timestamp before range", filter: &TimestampRangeFilter{Start: time.Date(2025, 1, 1, 11, 0, 0, 0, time.UTC)}, want: false}, + {name: "numeric from string", filter: &NumericRangeFilter{Field: "latency", Start: 40, End: 50}, want: true}, + {name: "numeric missing field", filter: &NumericRangeFilter{Field: "missing", Start: 1, End: 2}, want: false}, + {name: "numeric parse failure", filter: &NumericRangeFilter{Field: "service", Start: 1, End: 2}, want: false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.filter.Match(entry); got != tt.want { + t.Fatalf("match = %v, want %v", got, tt.want) + } + }) + } +} + +func TestParseTimestampRangeQuery(t *testing.T) { + rf, err := (&parser{}).parseRange("timestamp", "[2025-01-01T00:00:00Z TO 2025-01-02T00:00:00Z]") + if err != nil { + t.Fatalf("parseRange() error = %v", err) + } + entry := &storage.LogEntry{Timestamp: time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC)} + if !rf.Match(entry) { + t.Fatalf("expected timestamp range filter to match entry") + } +} diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go new file mode 100644 index 0000000..7e087c7 --- /dev/null +++ b/pkg/server/server_test.go @@ -0,0 +1,376 @@ +package server + +import ( + "bytes" + "encoding/json" + "fmt" + "net" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/gorilla/websocket" + "github.com/mchurichi/peek/pkg/storage" +) + +func newTestStorage(t *testing.T) *storage.BadgerStorage { + t.Helper() + db, err := storage.NewBadgerStorage(storage.Config{DBPath: t.TempDir(), RetentionSize: 1024 * 1024 * 100, RetentionDays: 7}) + if err != nil { + t.Fatalf("NewBadgerStorage() error = %v", err) + } + t.Cleanup(func() { _ = db.Close() }) + return db +} + +func storeLog(t *testing.T, db *storage.BadgerStorage, id, level, msg string, ts time.Time, fields map[string]interface{}) { + t.Helper() + if err := db.Store(&storage.LogEntry{ID: id, Timestamp: ts, Level: level, Message: msg, Fields: fields, Raw: msg}); err != nil { + t.Fatalf("Store() error = %v", err) + } +} + +func TestHTTPHandlers(t *testing.T) { + db := newTestStorage(t) + now := time.Now().UTC() + storeLog(t, db, "1", "INFO", "started", now.Add(-2*time.Minute), map[string]interface{}{"service": "api"}) + storeLog(t, db, "2", "ERROR", "failed", now.Add(-time.Minute), map[string]interface{}{"service": "worker"}) + + s := NewServer(db, nil) + + tests := []struct { + name string + method string + target string + body string + handler func(http.ResponseWriter, *http.Request) + wantStatus int + check func(*testing.T, *httptest.ResponseRecorder) + }{ + { + name: "health", + method: http.MethodGet, + target: "/health", + handler: s.handleHealth, + wantStatus: http.StatusOK, + check: func(t *testing.T, rr *httptest.ResponseRecorder) { + t.Helper() + if ct := rr.Header().Get("Content-Type"); !strings.Contains(ct, "application/json") { + t.Fatalf("content-type = %q", ct) + } + }, + }, + {name: "stats", method: http.MethodGet, target: "/stats", handler: s.handleStats, wantStatus: http.StatusOK}, + {name: "query method check", method: http.MethodGet, target: "/query", handler: s.handleQuery, wantStatus: http.StatusMethodNotAllowed}, + {name: "query invalid json", method: http.MethodPost, target: "/query", body: "{", handler: s.handleQuery, wantStatus: http.StatusBadRequest}, + {name: "query invalid lucene", method: http.MethodPost, target: "/query", body: `{"query":"level:[bad"}`, handler: s.handleQuery, wantStatus: http.StatusBadRequest}, + { + name: "query defaults and time range", + method: http.MethodPost, + target: "/query", + wantStatus: http.StatusOK, + handler: s.handleQuery, + body: mustJSON(t, map[string]interface{}{"query": "*", "start": now.Add(-90 * time.Second).Format(time.RFC3339), "end": now.Add(5 * time.Second).Format(time.RFC3339)}), + check: func(t *testing.T, rr *httptest.ResponseRecorder) { + t.Helper() + var resp struct { + Logs []storage.LogEntry `json:"logs"` + Total int `json:"total"` + } + if err := json.NewDecoder(rr.Body).Decode(&resp); err != nil { + t.Fatalf("decode: %v", err) + } + if resp.Total != 1 || len(resp.Logs) != 1 { + t.Fatalf("unexpected query result: total=%d len=%d", resp.Total, len(resp.Logs)) + } + }, + }, + {name: "fields", method: http.MethodGet, target: "/fields?start=invalid&end=invalid", handler: s.handleFields, wantStatus: http.StatusOK}, + {name: "fields method not allowed", method: http.MethodPost, target: "/fields", handler: s.handleFields, wantStatus: http.StatusMethodNotAllowed}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req := httptest.NewRequest(tt.method, tt.target, bytes.NewBufferString(tt.body)) + rr := httptest.NewRecorder() + tt.handler(rr, req) + if rr.Code != tt.wantStatus { + t.Fatalf("status = %d body=%s", rr.Code, rr.Body.String()) + } + if tt.check != nil { + tt.check(t, rr) + } + }) + } +} + +func mustJSON(t *testing.T, v any) string { + t.Helper() + b, err := json.Marshal(v) + if err != nil { + t.Fatalf("json marshal: %v", err) + } + return string(b) +} + +func TestWebSocketSubscribeAndBroadcast(t *testing.T) { + db := newTestStorage(t) + now := time.Now().UTC() + storeLog(t, db, "1", "ERROR", "boom", now.Add(-time.Second), map[string]interface{}{"service": "api"}) + storeLog(t, db, "2", "INFO", "ok", now, map[string]interface{}{"service": "api"}) + + s := NewServer(db, nil) + mux := http.NewServeMux() + mux.HandleFunc("/logs", s.handleWebSocket) + ts := httptest.NewServer(mux) + defer ts.Close() + + wsURL := "ws" + strings.TrimPrefix(ts.URL, "http") + "/logs" + conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("Dial() error = %v", err) + } + defer conn.Close() + + if err := conn.WriteJSON(map[string]string{"action": "subscribe", "query": "level:ERROR"}); err != nil { + t.Fatalf("WriteJSON subscribe: %v", err) + } + + _ = conn.SetReadDeadline(time.Now().Add(2 * time.Second)) + var first map[string]interface{} + if err := conn.ReadJSON(&first); err != nil { + t.Fatalf("ReadJSON initial: %v", err) + } + if first["type"] != "results" { + t.Fatalf("expected results message, got %#v", first) + } + + s.BroadcastLog(&storage.LogEntry{ID: "3", Timestamp: time.Now(), Level: "INFO", Message: "skip", Fields: map[string]interface{}{}, Raw: "skip"}) + s.BroadcastLog(&storage.LogEntry{ID: "4", Timestamp: time.Now(), Level: "ERROR", Message: "pass", Fields: map[string]interface{}{}, Raw: "pass"}) + + var msg map[string]interface{} + if err := conn.ReadJSON(&msg); err != nil { + t.Fatalf("ReadJSON broadcast: %v", err) + } + if msg["type"] != "log" { + t.Fatalf("expected log message, got %#v", msg) + } + + if err := conn.WriteJSON(map[string]string{"action": "unsubscribe"}); err != nil { + t.Fatalf("WriteJSON unsubscribe: %v", err) + } +} + +func TestStaticHandlers(t *testing.T) { + db := newTestStorage(t) + s := NewServer(db, nil) + + tests := []struct { + name string + target string + handler func(http.ResponseWriter, *http.Request) + wantStatus int + check func(*testing.T, *httptest.ResponseRecorder) + }{ + { + name: "index", + target: "/", + handler: s.handleIndex, + wantStatus: http.StatusOK, + check: func(t *testing.T, rr *httptest.ResponseRecorder) { + t.Helper() + if !strings.Contains(rr.Body.String(), "<") { + t.Fatalf("unexpected index response") + } + }, + }, + { + name: "vanjs", + target: "/van.min.js", + handler: s.handleVanJS, + wantStatus: http.StatusOK, + check: func(t *testing.T, rr *httptest.ResponseRecorder) { + t.Helper() + if rr.Header().Get("Cache-Control") == "" { + t.Fatalf("expected cache-control header") + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req := httptest.NewRequest(http.MethodGet, tt.target, nil) + rr := httptest.NewRecorder() + tt.handler(rr, req) + if rr.Code != tt.wantStatus { + t.Fatalf("status = %d", rr.Code) + } + tt.check(t, rr) + }) + } +} + +func TestStartBroadcastWorkerSendsNewEntries(t *testing.T) { + db := newTestStorage(t) + s := NewServer(db, nil) + + c := &client{ + send: make(chan interface{}, 10), + done: make(chan struct{}), + filter: &storage.AllFilter{}, + } + + s.mu.Lock() + s.clients[nil] = c + s.mu.Unlock() + + s.StartBroadcastWorker() + time.Sleep(120 * time.Millisecond) + storeLog(t, db, "late", "INFO", "late log", time.Now().UTC(), map[string]interface{}{}) + + select { + case <-c.send: + case <-time.After(2 * time.Second): + t.Fatalf("timed out waiting for broadcast worker message") + } + + close(c.done) +} + +func TestStartServesHTTP(t *testing.T) { + db := newTestStorage(t) + s := NewServer(db, nil) + + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + port := ln.Addr().(*net.TCPAddr).Port + _ = ln.Close() + + go func() { + _ = s.Start(port) + }() + + url := fmt.Sprintf("http://127.0.0.1:%d/health", port) + deadline := time.Now().Add(2 * time.Second) + for { + resp, err := http.Get(url) + if err == nil { + _ = resp.Body.Close() + if resp.StatusCode == http.StatusOK { + return + } + } + if time.Now().After(deadline) { + t.Fatalf("server did not become ready") + } + time.Sleep(50 * time.Millisecond) + } +} + +type failingWriter struct { + header http.Header +} + +func (w *failingWriter) Header() http.Header { + if w.header == nil { + w.header = make(http.Header) + } + return w.header +} +func (w *failingWriter) WriteHeader(statusCode int) {} +func (w *failingWriter) Write([]byte) (int, error) { return 0, fmt.Errorf("write failed") } + +func TestHandlerErrorsWhenStorageUnavailable(t *testing.T) { + db, err := storage.NewBadgerStorage(storage.Config{DBPath: t.TempDir(), RetentionSize: 1024 * 1024 * 100, RetentionDays: 7}) + if err != nil { + t.Fatalf("NewBadgerStorage() error = %v", err) + } + s := NewServer(db, nil) + if err := db.Close(); err != nil { + t.Fatalf("Close() error = %v", err) + } + + tests := []struct { + name string + handler func(http.ResponseWriter, *http.Request) + method string + target string + body string + wantStatus int + }{ + {name: "health", handler: s.handleHealth, method: http.MethodGet, target: "/health", wantStatus: http.StatusInternalServerError}, + {name: "stats", handler: s.handleStats, method: http.MethodGet, target: "/stats", wantStatus: http.StatusInternalServerError}, + {name: "query", handler: s.handleQuery, method: http.MethodPost, target: "/query", body: `{"query":"*"}`, wantStatus: http.StatusInternalServerError}, + {name: "fields", handler: s.handleFields, method: http.MethodGet, target: "/fields", wantStatus: http.StatusInternalServerError}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rr := httptest.NewRecorder() + req := httptest.NewRequest(tt.method, tt.target, bytes.NewBufferString(tt.body)) + tt.handler(rr, req) + if rr.Code != tt.wantStatus { + t.Fatalf("status = %d", rr.Code) + } + }) + } +} + +func TestNewServerWithStartTimeAndVanJSWriteFailure(t *testing.T) { + db := newTestStorage(t) + start := time.Now().Add(-time.Minute) + s := NewServer(db, &start) + if s.defaultFilter == nil { + t.Fatalf("expected default filter in fresh mode") + } + + fw := &failingWriter{} + s.handleVanJS(fw, httptest.NewRequest(http.MethodGet, "/van.min.js", nil)) +} + +func TestHandleWebSocketUpgradeFailureAndInitialResultsError(t *testing.T) { + db, err := storage.NewBadgerStorage(storage.Config{DBPath: t.TempDir(), RetentionSize: 1024 * 1024 * 100, RetentionDays: 7}) + if err != nil { + t.Fatalf("NewBadgerStorage() error = %v", err) + } + s := NewServer(db, nil) + + // Regular HTTP request without websocket headers should fail upgrade path gracefully. + s.handleWebSocket(httptest.NewRecorder(), httptest.NewRequest(http.MethodGet, "/logs", nil)) + + if err := db.Close(); err != nil { + t.Fatalf("Close() error = %v", err) + } + + c := &client{send: make(chan interface{}, 1), done: make(chan struct{})} + s.sendInitialResults(c, &storage.AllFilter{}) + select { + case <-c.send: + t.Fatalf("did not expect initial results when storage query fails") + default: + } + + close(c.done) + s.sendInitialResults(c, &storage.AllFilter{}) +} + +func TestWritePumpExitsWhenDoneClosed(t *testing.T) { + s := &Server{} + c := &client{send: make(chan interface{}, 1), done: make(chan struct{})} + close(c.done) + s.writePump(c) +} + +func TestSendInitialResultsHonorsDoneChannel(t *testing.T) { + db := newTestStorage(t) + storeLog(t, db, "seed", "INFO", "seed", time.Now().UTC(), map[string]interface{}{}) + s := NewServer(db, nil) + c := &client{send: make(chan interface{}, 1), done: make(chan struct{})} + close(c.done) + s.sendInitialResults(c, &storage.AllFilter{}) +} diff --git a/pkg/storage/badger_behavior_test.go b/pkg/storage/badger_behavior_test.go new file mode 100644 index 0000000..5598d6d --- /dev/null +++ b/pkg/storage/badger_behavior_test.go @@ -0,0 +1,195 @@ +package storage + +import ( + "errors" + "strings" + "testing" + "time" +) + +func newBehaviorStorage(t *testing.T) *BadgerStorage { + t.Helper() + s, err := NewBadgerStorage(Config{DBPath: t.TempDir(), RetentionSize: 1024 * 1024 * 100, RetentionDays: 30}) + if err != nil { + t.Fatalf("NewBadgerStorage() error = %v", err) + } + t.Cleanup(func() { _ = s.Close() }) + return s +} + +func addEntry(t *testing.T, s *BadgerStorage, id string, ts time.Time, level string, fields map[string]interface{}) { + t.Helper() + if err := s.Store(&LogEntry{ID: id, Timestamp: ts, Level: level, Message: id, Fields: fields, Raw: id}); err != nil { + t.Fatalf("Store(%s) error = %v", id, err) + } +} + +func TestQueryWithTimeRangeAndFieldDiscovery(t *testing.T) { + s := newBehaviorStorage(t) + base := time.Now().UTC().Add(-3 * time.Hour) + addEntry(t, s, "old", base, "INFO", map[string]interface{}{"service": "api", "status": 200}) + addEntry(t, s, "mid", base.Add(time.Hour), "ERROR", map[string]interface{}{"service": "worker", "status": 500}) + addEntry(t, s, "new", base.Add(2*time.Hour), "ERROR", map[string]interface{}{"service": "api", "status": 500}) + + tr := &TimeRange{Start: base.Add(30 * time.Minute), End: base.Add(90 * time.Minute)} + results, total, err := s.QueryWithTimeRange(AllFilter{}, tr, 10, 0) + if err != nil { + t.Fatalf("QueryWithTimeRange() error = %v", err) + } + if total != 1 || len(results) != 1 || results[0].ID != "mid" { + t.Fatalf("unexpected range query result: total=%d len=%d", total, len(results)) + } + + fields, err := s.GetFields(base.Add(30*time.Minute), base.Add(3*time.Hour)) + if err != nil { + t.Fatalf("GetFields() error = %v", err) + } + if len(fields) == 0 { + t.Fatalf("expected fields to be discovered") + } +} + +func TestDeletionAndOldestNewest(t *testing.T) { + s := newBehaviorStorage(t) + now := time.Now().UTC() + addEntry(t, s, "1", now.Add(-2*time.Hour), "INFO", nil) + addEntry(t, s, "2", now.Add(-time.Hour), "DEBUG", nil) + addEntry(t, s, "3", now, "DEBUG", nil) + + oldest, newest, err := s.GetOldestNewest() + if err != nil { + t.Fatalf("GetOldestNewest() error = %v", err) + } + if !oldest.Before(newest) { + t.Fatalf("expected oldest < newest") + } + + deleted, err := s.DeleteByLevel("DEBUG") + if err != nil || deleted != 2 { + t.Fatalf("DeleteByLevel() deleted=%d err=%v", deleted, err) + } + + addEntry(t, s, "4", now.Add(-3*time.Hour), "INFO", nil) + cutDeleted, err := s.DeleteOlderThan(now.Add(-90 * time.Minute)) + if err != nil || cutDeleted == 0 { + t.Fatalf("DeleteOlderThan() deleted=%d err=%v", cutDeleted, err) + } + + addEntry(t, s, "5", now.Add(-10*time.Minute), "WARN", nil) + + allDeleted, err := s.DeleteAll() + if err != nil { + t.Fatalf("DeleteAll() error = %v", err) + } + if allDeleted == 0 { + t.Fatalf("expected DeleteAll() to remove remaining entries") + } +} + +func TestSubscribeAndMaintenanceAPIs(t *testing.T) { + s := newBehaviorStorage(t) + ch, cancel := s.Subscribe(AllFilter{}) + defer cancel() + + now := time.Now().UTC() + addEntry(t, s, "sub-1", now.Add(150*time.Millisecond), "INFO", map[string]interface{}{"service": "api"}) + + select { + case <-ch: + case <-time.After(2 * time.Second): + t.Fatalf("timed out waiting for subscription update") + } + + if err := s.Sync(); err != nil { + t.Fatalf("Sync() error = %v", err) + } + if s.GetDBPath() == "" { + t.Fatalf("GetDBPath() returned empty path") + } + + // Compaction may or may not reclaim space, but should not fail on a healthy DB. + if _, err := s.CompactDatabaseFully(); err != nil { + t.Fatalf("CompactDatabaseFully() error = %v", err) + } +} + +func TestInternalRetentionHelpersAndCompactDatabase(t *testing.T) { + s := newBehaviorStorage(t) + now := time.Now().UTC() + addEntry(t, s, "a", now.Add(-3*time.Hour), "INFO", nil) + addEntry(t, s, "b", now.Add(-2*time.Hour), "INFO", nil) + addEntry(t, s, "c", now.Add(-time.Hour), "INFO", nil) + + if err := s.deleteOldestEntries(1); err != nil { + t.Fatalf("deleteOldestEntries() error = %v", err) + } + + // CompactDatabase may report no rewrite depending on DB state; both are acceptable. + _ = s.CompactDatabase() + + s.retentionSize = 1 + if err := s.enforceRetention(); err != nil { + t.Fatalf("enforceRetention(size) error = %v", err) + } + + s.retentionSize = 0 + s.retentionDays = 1 + if err := s.enforceRetention(); err != nil { + t.Fatalf("enforceRetention(days) error = %v", err) + } +} + +func TestScanPropagatesCallbackError(t *testing.T) { + s := newBehaviorStorage(t) + addEntry(t, s, "scan", time.Now().UTC(), "INFO", nil) + + expected := "stop" + err := s.Scan(func(*LogEntry) error { return errors.New(expected) }) + if err == nil || !strings.Contains(err.Error(), expected) { + t.Fatalf("Scan() error = %v", err) + } +} + +func TestCompactDatabaseFullyAfterCloseReturnsError(t *testing.T) { + s, err := NewBadgerStorage(Config{DBPath: t.TempDir(), RetentionSize: 1024 * 1024 * 100, RetentionDays: 30}) + if err != nil { + t.Fatalf("NewBadgerStorage() error = %v", err) + } + if err := s.Store(&LogEntry{ID: "closed", Timestamp: time.Now().UTC(), Level: "INFO", Message: "closed", Raw: "closed"}); err != nil { + t.Fatalf("Store() error = %v", err) + } + if err := s.Close(); err != nil { + t.Fatalf("Close() error = %v", err) + } + if _, err := s.CompactDatabaseFully(); err == nil { + t.Fatalf("expected compaction error on closed database") + } +} + +func TestGetStatsTracksUnknownLevelAndExpandPath(t *testing.T) { + s := newBehaviorStorage(t) + addEntry(t, s, "u", time.Now().UTC(), "", nil) + + stats, err := s.GetStats() + if err != nil { + t.Fatalf("GetStats() error = %v", err) + } + if stats.Levels["Unknown"] == 0 { + t.Fatalf("expected Unknown level bucket") + } + + home := t.TempDir() + t.Setenv("HOME", home) + if got := expandPath("~/peek-db"); !strings.Contains(got, home) { + t.Fatalf("expandPath did not use HOME: %q", got) + } +} + +func TestEnforceRetentionNoopWhenDisabled(t *testing.T) { + s := newBehaviorStorage(t) + s.retentionDays = 0 + s.retentionSize = 0 + if err := s.enforceRetention(); err != nil { + t.Fatalf("enforceRetention() error = %v", err) + } +}