diff --git a/README.md b/README.md index aa9ac1f..fae2f24 100644 --- a/README.md +++ b/README.md @@ -6,9 +6,11 @@ Browse and render Codex session `.jsonl` files with a web UI, and optionally sha ## Features - Indexes `~/.codex/sessions/{year}/{month}/{day}` and lists sessions by date. +- Adds `/active` for day-by-day active threads across all directories, with manual end/reopen controls. +- Accepts Codex CLI notification webhooks on `/hook` and lets you inspect them on `/notifications`. - Renders conversations as HTML with markdown support and dark theme. -- Shows only user/agent messages and reasoning; tool calls and other events are omitted. -- Consecutive messages are merged; for user groups, only the last message is kept. +- Shows user/agent messages, non-empty reasoning summaries, and response items such as tool calls, tool outputs, web searches, custom tool activity, and ghost snapshots. +- Consecutive messages and reasoning items are merged; visible tool/system items stay separate. For user groups, only the last message is kept. - User messages can be trimmed to content after `## My request for Codex:` (default on). - Share button saves a hard‑to‑guess HTML file to `~/.codex/shares` and copies its URL. - Separate share server serves only exact filenames (no directory listing). @@ -57,6 +59,11 @@ Visit: - UI: http://localhost:8080/ - Share server: http://localhost:8081/ +Key pages: +- `/` date/directory browser +- `/active` active thread list (default: today in your browser time zone) +- `/notifications` received webhook notifications + ## Autostart (systemd --user) If your environment supports user services (WSL with systemd, Linux desktops), you can keep it running. @@ -107,6 +114,27 @@ systemctl --user restart codex-manager - `-full` disable trimming to `## My request for Codex:` - `-h` / `--help` +## Repository overrides +- Default path: `~/.codex/session_repository_overrides.json` +- Purpose: override the GitHub repository used for branch links for specific `cwd` prefixes +- Create this file only when you need overrides; if the file is missing, Codex Manager falls back to `session_meta.git.repository_url` +- Matching: longest `cwd_prefix` match wins +- Changes are loaded at startup, so restart Codex Manager after editing the file +- Format: + ```json + { + "version": 1, + "rules": [ + { + "cwd_prefix": "/home/makoto/codex-manager", + "repository_url": "https://github.com/makoto-soracom/codex-manager.git" + } + ] + } + ``` +- Resolution order: repository override, then `session_meta.git.repository_url` +- Scope: this only changes the repository used for branch links; branch names and resume commands still come from session metadata + ## HTMLBucket notes - Auth file path: `~/.hb/auth.json` - Auth file format: @@ -140,6 +168,36 @@ Clicking “Share”: - Copies the share URL to your clipboard - Displays a banner showing the copied URL +## Active thread state +- New top-level threads are treated as active by default. +- `/active` uses your browser time zone (stored in a cookie) to decide which day a thread belongs to. +- A thread is shown as: + - `Waiting for user` + - `Waiting for agent` + - `Ended` +- `Ended` is a manual flag stored in `~/.codex/session_state.json`. +- If a thread receives new JSONL activity after being marked ended, it automatically returns to active. +- `/active` refreshes while the page is open without changing the global `--rescan-interval`. + +## Codex CLI notifications +You can point Codex CLI `notify` to the local server: + +```toml +notify = [ + "curl", + "-sS", + "-X", "POST", + "http://localhost:8080/hook", + "-H", "Content-Type: application/json", + "--data-binary" +] +``` + +- Incoming requests are accepted on `/hook`. +- Received notifications are appended to `~/.codex/notifications.jsonl`. +- `/notifications` shows the newest notifications first, with headers and body. +- The page auto-refreshes while open so you can inspect payloads as they arrive. + ## Development ```bash go test ./... diff --git a/cmd/codex-manager/main.go b/cmd/codex-manager/main.go index 1d4e65e..4fe7865 100644 --- a/cmd/codex-manager/main.go +++ b/cmd/codex-manager/main.go @@ -14,9 +14,12 @@ import ( "strings" "time" + "codex-manager/internal/active" "codex-manager/internal/config" "codex-manager/internal/htmlbucket" + "codex-manager/internal/notifications" "codex-manager/internal/render" + "codex-manager/internal/repooverride" "codex-manager/internal/search" "codex-manager/internal/sessions" "codex-manager/internal/web" @@ -42,6 +45,32 @@ func main() { log.Printf("initial scan failed: %v", err) } + activeIdx := active.NewIndex() + if err := activeIdx.RefreshFrom(idx); err != nil { + log.Printf("initial active index build failed: %v", err) + } + + activeState, err := active.LoadStateStore(active.DefaultStatePath(cfg.SessionsDir)) + if err != nil { + log.Printf("active state load failed: %v", err) + activeState, _ = active.LoadStateStore("") + } + if err := activeState.Reconcile(activeIdx.Summaries()); err != nil { + log.Printf("initial active state reconcile failed: %v", err) + } + + notificationStore, err := notifications.LoadStore(notifications.DefaultPath(cfg.SessionsDir)) + if err != nil { + log.Printf("notification store load failed: %v", err) + notificationStore, _ = notifications.LoadStore("") + } + + repositoryOverrideStore, err := repooverride.LoadStore(repooverride.DefaultPath(cfg.SessionsDir)) + if err != nil { + log.Printf("repository override load failed: %v", err) + repositoryOverrideStore, _ = repooverride.LoadStore("") + } + searchIdx := search.NewIndex() if err := searchIdx.RefreshFrom(idx); err != nil { log.Printf("initial search index build failed: %v", err) @@ -55,6 +84,12 @@ func main() { log.Printf("rescan failed: %v", err) continue } + if err := activeIdx.RefreshFrom(idx); err != nil { + log.Printf("active reindex failed: %v", err) + } + if err := activeState.Reconcile(activeIdx.Summaries()); err != nil { + log.Printf("active state reconcile failed: %v", err) + } if err := searchIdx.RefreshFrom(idx); err != nil { log.Printf("search reindex failed: %v", err) } @@ -67,6 +102,9 @@ func main() { } server := web.NewServer(idx, searchIdx, renderer, cfg.SessionsDir, cfg.ShareDir, cfg.ShareAddr, cfg.Theme) + server.EnableActive(activeIdx, activeState, 15*time.Second) + server.EnableNotifications(notificationStore) + server.EnableRepoOverrides(repositoryOverrideStore) if htmlBucketClient != nil { server.EnableHTMLBucket(htmlBucketClient) log.Printf("Using htmlbucket share backend (%s)", htmlBucketAuthPath) diff --git a/internal/active/index.go b/internal/active/index.go new file mode 100644 index 0000000..1c9f34b --- /dev/null +++ b/internal/active/index.go @@ -0,0 +1,424 @@ +package active + +import ( + "bufio" + "encoding/json" + "fmt" + "os" + "path" + "sort" + "strings" + "sync" + "time" + + "codex-manager/internal/sessions" +) + +// WaitState represents what the active thread is waiting on. +type WaitState string + +const ( + WaitStateUser WaitState = "user_waiting" + WaitStateAgent WaitState = "agent_waiting" +) + +// Snippet is a short user/agent preview shown in the active list. +type Snippet struct { + Text string + Title string + SpeakerClass string +} + +// Summary is the active-thread summary for a single top-level session file. +type Summary struct { + Key string + SessionID string + Date sessions.DateKey + Name string + Path string + DisplayName string + ThreadName string + Cwd string + Branch string + ResumeCommand string + Size int64 + ModTime time.Time + LastActivityAt time.Time + ActivityToken string + WaitState WaitState + LastUserSnippet Snippet + LastAssistantSnippet Snippet + HasUserMessage bool +} + +const thinkingPlaceholder = "Thinking..." + +type fileIndex struct { + size int64 + modTime time.Time + threadName string + summary Summary +} + +// Index caches active summaries derived from session files. +type Index struct { + mu sync.RWMutex + files map[string]fileIndex + byKey map[string]Summary + ordered []Summary + updated time.Time +} + +// NewIndex creates an empty active summary index. +func NewIndex() *Index { + return &Index{ + files: map[string]fileIndex{}, + byKey: map[string]Summary{}, + } +} + +// LastUpdated reports when RefreshFrom last succeeded. +func (idx *Index) LastUpdated() time.Time { + idx.mu.RLock() + defer idx.mu.RUnlock() + return idx.updated +} + +// Summaries returns all cached summaries sorted by last activity descending. +func (idx *Index) Summaries() []Summary { + idx.mu.RLock() + defer idx.mu.RUnlock() + out := make([]Summary, len(idx.ordered)) + copy(out, idx.ordered) + return out +} + +// Lookup returns a summary by its stable key. +func (idx *Index) Lookup(key string) (Summary, bool) { + idx.mu.RLock() + defer idx.mu.RUnlock() + summary, ok := idx.byKey[key] + return summary, ok +} + +// RefreshFrom rebuilds changed summaries from the sessions index. +func (idx *Index) RefreshFrom(sessionsIdx *sessions.Index) error { + dates := sessionsIdx.Dates() + files := make([]sessions.SessionFile, 0, len(dates)) + for _, date := range dates { + files = append(files, sessionsIdx.SessionsByDate(date)...) + } + + idx.mu.RLock() + existing := idx.files + idx.mu.RUnlock() + + nextFiles := make(map[string]fileIndex, len(files)) + nextByKey := make(map[string]Summary, len(files)) + summaries := make([]Summary, 0, len(files)) + var firstErr error + + for _, file := range files { + if file.Meta != nil && file.Meta.IsSubagentThread() { + continue + } + if cached, ok := existing[file.Path]; ok && cached.size == file.Size && cached.modTime.Equal(file.ModTime) && cached.threadName == file.ThreadName { + nextFiles[file.Path] = cached + nextByKey[cached.summary.Key] = cached.summary + summaries = append(summaries, cached.summary) + continue + } + + summary, err := buildSummary(file) + if err != nil { + if firstErr == nil { + firstErr = err + } + if cached, ok := existing[file.Path]; ok { + nextFiles[file.Path] = cached + nextByKey[cached.summary.Key] = cached.summary + summaries = append(summaries, cached.summary) + } + continue + } + + entry := fileIndex{ + size: file.Size, + modTime: file.ModTime, + threadName: file.ThreadName, + summary: summary, + } + nextFiles[file.Path] = entry + nextByKey[summary.Key] = summary + summaries = append(summaries, summary) + } + + sort.Slice(summaries, func(i, j int) bool { + if summaries[i].LastActivityAt.Equal(summaries[j].LastActivityAt) { + if summaries[i].ModTime.Equal(summaries[j].ModTime) { + if summaries[i].Date.String() == summaries[j].Date.String() { + return summaries[i].Name < summaries[j].Name + } + return summaries[i].Date.String() > summaries[j].Date.String() + } + return summaries[i].ModTime.After(summaries[j].ModTime) + } + return summaries[i].LastActivityAt.After(summaries[j].LastActivityAt) + }) + + idx.mu.Lock() + idx.files = nextFiles + idx.byKey = nextByKey + idx.ordered = summaries + idx.updated = time.Now() + idx.mu.Unlock() + return firstErr +} + +func buildSummary(file sessions.SessionFile) (Summary, error) { + activity, err := scanActivity(file) + if err != nil { + return Summary{}, err + } + + userSnippet, assistantSnippet, hasUser, assistantAfterLatestUser, err := extractSnippets(file.Path, file.Meta) + if err != nil { + userSnippet = Snippet{Title: "User", SpeakerClass: "user"} + assistantSnippet = Snippet{Title: "Agent", SpeakerClass: "agent"} + hasUser = false + assistantAfterLatestUser = false + } + if activity.WaitState == WaitStateAgent && hasUser && !assistantAfterLatestUser { + assistantSnippet.Text = thinkingPlaceholder + } + + return Summary{ + Key: summaryKey(file), + SessionID: sessionID(file.Meta), + Date: file.Date, + Name: file.Name, + Path: file.Path, + DisplayName: file.DisplayName(), + ThreadName: file.ThreadName, + Cwd: sessions.CwdForFile(file), + Branch: branchForMeta(file.Meta), + ResumeCommand: buildResumeCommand(file.Meta), + Size: file.Size, + ModTime: file.ModTime, + LastActivityAt: activity.LastActivityAt, + ActivityToken: activity.ActivityToken, + WaitState: activity.WaitState, + LastUserSnippet: userSnippet, + LastAssistantSnippet: assistantSnippet, + HasUserMessage: hasUser, + }, nil +} + +func summaryKey(file sessions.SessionFile) string { + if file.Meta != nil { + if id := strings.TrimSpace(file.Meta.ID); id != "" { + return "id:" + id + } + } + return "path:" + path.Join(file.Date.Path(), file.Name) +} + +func sessionID(meta *sessions.SessionMeta) string { + if meta == nil { + return "" + } + return strings.TrimSpace(meta.ID) +} + +func buildResumeCommand(meta *sessions.SessionMeta) string { + if meta == nil || strings.TrimSpace(meta.ID) == "" { + return "" + } + commands := make([]string, 0, 3) + if cwd := strings.TrimSpace(meta.Cwd); cwd != "" { + commands = append(commands, "cd "+shellQuote(cwd)) + } + if branch := branchForMeta(meta); branch != "" { + commands = append(commands, "git switch "+shellQuote(branch)) + } + commands = append(commands, "codex resume "+strings.TrimSpace(meta.ID)) + return strings.Join(commands, "\n") +} + +func shellQuote(value string) string { + if value == "" { + return "''" + } + return "'" + strings.ReplaceAll(value, "'", "'\"'\"'") + "'" +} + +func branchForMeta(meta *sessions.SessionMeta) string { + if meta == nil { + return "" + } + return meta.GitBranch() +} + +type activityInfo struct { + LastActivityAt time.Time + ActivityToken string + WaitState WaitState +} + +type activityEnvelope struct { + Timestamp string `json:"timestamp"` + Type string `json:"type"` + Payload json.RawMessage `json:"payload"` +} + +func scanActivity(file sessions.SessionFile) (activityInfo, error) { + f, err := os.Open(file.Path) + if err != nil { + return activityInfo{}, err + } + defer f.Close() + + scanner := bufio.NewScanner(f) + scanner.Buffer(make([]byte, 0, 64*1024), 4*1024*1024) + + var lastActivity time.Time + lineCount := 0 + waitState := WaitStateUser + + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + lineCount++ + + var env activityEnvelope + if err := json.Unmarshal([]byte(line), &env); err != nil { + continue + } + + if ts, ok := parseTimestamp(env.Timestamp); ok { + lastActivity = ts + } + + if env.Type != "event_msg" { + continue + } + var payload struct { + Type string `json:"type"` + } + if err := json.Unmarshal(env.Payload, &payload); err != nil { + continue + } + switch payload.Type { + case "task_started": + waitState = WaitStateAgent + case "task_complete": + waitState = WaitStateUser + } + } + if err := scanner.Err(); err != nil { + return activityInfo{}, err + } + + if lastActivity.IsZero() { + if file.Meta != nil { + if ts, ok := parseTimestamp(file.Meta.Timestamp); ok { + lastActivity = ts + } + } + if lastActivity.IsZero() { + lastActivity = file.ModTime + } + } + if lineCount == 0 { + lineCount = 1 + } + + return activityInfo{ + LastActivityAt: lastActivity, + ActivityToken: fmt.Sprintf("%d:%d:%d", file.Size, lastActivity.UnixNano(), lineCount), + WaitState: waitState, + }, nil +} + +func parseTimestamp(value string) (time.Time, bool) { + value = strings.TrimSpace(value) + if value == "" { + return time.Time{}, false + } + if parsed, err := time.Parse(time.RFC3339Nano, value); err == nil { + return parsed, true + } + if parsed, err := time.Parse(time.RFC3339, value); err == nil { + return parsed, true + } + return time.Time{}, false +} + +func extractSnippets(path string, meta *sessions.SessionMeta) (Snippet, Snippet, bool, bool, error) { + session, err := sessions.ParseSession(path) + if err != nil { + return Snippet{}, Snippet{}, false, false, err + } + + userSnippet := Snippet{ + Title: "User", + SpeakerClass: "user", + } + assistantSnippet := Snippet{ + Title: "Agent", + SpeakerClass: "agent", + } + if session.Meta != nil && session.Meta.IsSubagentThread() { + userSnippet.Title = "Agent" + userSnippet.SpeakerClass = "agent" + assistantSnippet.Title = "Subagent" + assistantSnippet.SpeakerClass = "subagent" + } else if meta != nil && meta.IsSubagentThread() { + userSnippet.Title = "Agent" + userSnippet.SpeakerClass = "agent" + assistantSnippet.Title = "Subagent" + assistantSnippet.SpeakerClass = "subagent" + } + + hasUser := false + lastUserIndex := -1 + lastAssistantIndex := -1 + for index, item := range session.Items { + switch item.Role { + case "user": + if sessions.IsAutoContextUserMessage(item.Content) { + continue + } + hasUser = true + userSnippet.Text = item.Content + lastUserIndex = index + case "assistant": + assistantSnippet.Text = item.Content + lastAssistantIndex = index + } + } + userSnippet.Text = snippetFromContent(userSnippet.Text, 180) + assistantSnippet.Text = snippetFromContent(assistantSnippet.Text, 180) + return userSnippet, assistantSnippet, hasUser, lastAssistantIndex > lastUserIndex, nil +} + +func snippetFromContent(value string, max int) string { + value = strings.TrimSpace(value) + if value == "" { + return "" + } + value = strings.Join(strings.Fields(value), " ") + if max <= 0 { + return value + } + runes := []rune(value) + if len(runes) <= max { + return value + } + if max > 3 { + return string(runes[:max-3]) + "..." + } + return string(runes[:max]) +} diff --git a/internal/active/index_test.go b/internal/active/index_test.go new file mode 100644 index 0000000..35448c9 --- /dev/null +++ b/internal/active/index_test.go @@ -0,0 +1,159 @@ +package active + +import ( + "os" + "path/filepath" + "testing" + + "codex-manager/internal/sessions" +) + +func TestIndexRefreshFromBuildsTopLevelSummary(t *testing.T) { + root := t.TempDir() + sessionsDir := filepath.Join(root, "sessions") + dateDir := filepath.Join(sessionsDir, "2026", "03", "18") + if err := os.MkdirAll(dateDir, 0o755); err != nil { + t.Fatalf("mkdir: %v", err) + } + + if err := os.WriteFile(filepath.Join(root, "session_index.jsonl"), []byte("{\"id\":\"parent-1\",\"thread_name\":\"parser bug\",\"updated_at\":\"2026-03-18T08:00:00Z\"}\n"), 0o600); err != nil { + t.Fatalf("write session index: %v", err) + } + + parentPath := filepath.Join(dateDir, "parent.jsonl") + parentData := "" + + "{\"timestamp\":\"2026-03-18T07:59:00Z\",\"type\":\"session_meta\",\"payload\":{\"id\":\"parent-1\",\"timestamp\":\"2026-03-18T07:59:00Z\",\"cwd\":\"/tmp/project\",\"git\":{\"branch\":\"feature/parser-fix\",\"commit_hash\":\"abc123\"},\"originator\":\"cli\",\"cli_version\":\"0.1\"}}\n" + + "{\"timestamp\":\"2026-03-18T07:59:10Z\",\"type\":\"response_item\",\"payload\":{\"type\":\"message\",\"role\":\"user\",\"content\":[{\"type\":\"input_text\",\"text\":\"## My request for Codex:\\nFix the parser\"}]}}\n" + + "{\"timestamp\":\"2026-03-18T07:59:12Z\",\"type\":\"response_item\",\"payload\":{\"type\":\"message\",\"role\":\"assistant\",\"content\":[{\"type\":\"output_text\",\"text\":\"Investigating now.\"}]}}\n" + + "{\"timestamp\":\"2026-03-18T08:00:00Z\",\"type\":\"event_msg\",\"payload\":{\"type\":\"task_started\"}}\n" + if err := os.WriteFile(parentPath, []byte(parentData), 0o600); err != nil { + t.Fatalf("write parent session: %v", err) + } + + subagentPath := filepath.Join(dateDir, "subagent.jsonl") + subagentData := "" + + "{\"timestamp\":\"2026-03-18T08:01:00Z\",\"type\":\"session_meta\",\"payload\":{\"id\":\"agent-1\",\"forked_from_id\":\"parent-1\",\"timestamp\":\"2026-03-18T08:01:00Z\",\"cwd\":\"/tmp/project\",\"originator\":\"cli\",\"cli_version\":\"0.1\",\"source\":{\"subagent\":{\"thread_spawn\":{\"parent_thread_id\":\"parent-1\",\"depth\":1,\"agent_nickname\":\"Anscombe\",\"agent_role\":\"explorer\"}}}}}\n" + + "{\"timestamp\":\"2026-03-18T08:01:10Z\",\"type\":\"response_item\",\"payload\":{\"type\":\"message\",\"role\":\"assistant\",\"content\":[{\"type\":\"output_text\",\"text\":\"Subagent work.\"}]}}\n" + if err := os.WriteFile(subagentPath, []byte(subagentData), 0o600); err != nil { + t.Fatalf("write subagent session: %v", err) + } + + idx := sessions.NewIndex(sessionsDir) + if err := idx.Refresh(); err != nil { + t.Fatalf("refresh sessions: %v", err) + } + + activeIdx := NewIndex() + if err := activeIdx.RefreshFrom(idx); err != nil { + t.Fatalf("refresh active index: %v", err) + } + + summaries := activeIdx.Summaries() + if len(summaries) != 1 { + t.Fatalf("expected 1 top-level summary, got %d", len(summaries)) + } + + summary := summaries[0] + if summary.Key != "id:parent-1" { + t.Fatalf("unexpected key: %q", summary.Key) + } + if summary.DisplayName != "parser bug (parent.jsonl)" { + t.Fatalf("unexpected display name: %q", summary.DisplayName) + } + if summary.WaitState != WaitStateAgent { + t.Fatalf("expected agent wait state, got %q", summary.WaitState) + } + if summary.LastActivityAt.Format("2006-01-02T15:04:05Z07:00") != "2026-03-18T08:00:00Z" { + t.Fatalf("unexpected last activity: %s", summary.LastActivityAt.Format(timeLayout)) + } + if summary.LastUserSnippet.Text != "Fix the parser" { + t.Fatalf("unexpected user snippet: %q", summary.LastUserSnippet.Text) + } + if summary.LastAssistantSnippet.Text != "Investigating now." { + t.Fatalf("unexpected assistant snippet: %q", summary.LastAssistantSnippet.Text) + } + if summary.Branch != "feature/parser-fix" { + t.Fatalf("unexpected branch: %q", summary.Branch) + } + if summary.ResumeCommand != "cd '/tmp/project'\ngit switch 'feature/parser-fix'\ncodex resume parent-1" { + t.Fatalf("unexpected resume command: %q", summary.ResumeCommand) + } +} + +func TestIndexRefreshFromUsesThinkingPlaceholderWhenLatestUserHasNoAssistantReply(t *testing.T) { + root := t.TempDir() + sessionsDir := filepath.Join(root, "sessions") + dateDir := filepath.Join(sessionsDir, "2026", "03", "19") + if err := os.MkdirAll(dateDir, 0o755); err != nil { + t.Fatalf("mkdir: %v", err) + } + + filePath := filepath.Join(dateDir, "thinking.jsonl") + data := "" + + "{\"timestamp\":\"2026-03-19T01:00:00Z\",\"type\":\"session_meta\",\"payload\":{\"id\":\"session-thinking\",\"timestamp\":\"2026-03-19T01:00:00Z\",\"cwd\":\"/tmp/project\",\"originator\":\"cli\",\"cli_version\":\"0.1\"}}\n" + + "{\"timestamp\":\"2026-03-19T01:00:05Z\",\"type\":\"response_item\",\"payload\":{\"type\":\"message\",\"role\":\"user\",\"content\":[{\"type\":\"input_text\",\"text\":\"## My request for Codex:\\nEarlier request\"}]}}\n" + + "{\"timestamp\":\"2026-03-19T01:00:10Z\",\"type\":\"response_item\",\"payload\":{\"type\":\"message\",\"role\":\"assistant\",\"content\":[{\"type\":\"output_text\",\"text\":\"Older assistant message.\"}]}}\n" + + "{\"timestamp\":\"2026-03-19T01:00:20Z\",\"type\":\"response_item\",\"payload\":{\"type\":\"message\",\"role\":\"user\",\"content\":[{\"type\":\"input_text\",\"text\":\"## My request for Codex:\\nNewest request\"}]}}\n" + + "{\"timestamp\":\"2026-03-19T01:00:30Z\",\"type\":\"event_msg\",\"payload\":{\"type\":\"task_started\"}}\n" + if err := os.WriteFile(filePath, []byte(data), 0o600); err != nil { + t.Fatalf("write session: %v", err) + } + + idx := sessions.NewIndex(sessionsDir) + if err := idx.Refresh(); err != nil { + t.Fatalf("refresh sessions: %v", err) + } + + activeIdx := NewIndex() + if err := activeIdx.RefreshFrom(idx); err != nil { + t.Fatalf("refresh active index: %v", err) + } + + summaries := activeIdx.Summaries() + if len(summaries) != 1 { + t.Fatalf("expected 1 summary, got %d", len(summaries)) + } + + summary := summaries[0] + if summary.WaitState != WaitStateAgent { + t.Fatalf("expected agent wait state, got %q", summary.WaitState) + } + if summary.LastUserSnippet.Text != "Newest request" { + t.Fatalf("unexpected user snippet: %q", summary.LastUserSnippet.Text) + } + if summary.LastAssistantSnippet.Text != thinkingPlaceholder { + t.Fatalf("expected thinking placeholder, got %q", summary.LastAssistantSnippet.Text) + } +} + +func TestStateStoreReconcileClearsEndedMarkOnNewActivity(t *testing.T) { + path := filepath.Join(t.TempDir(), "session_state.json") + + store, err := LoadStateStore(path) + if err != nil { + t.Fatalf("load store: %v", err) + } + if err := store.MarkEnded("id:session-1", "token-1"); err != nil { + t.Fatalf("mark ended: %v", err) + } + if got := len(store.Snapshot()); got != 1 { + t.Fatalf("expected 1 ended mark, got %d", got) + } + + if err := store.Reconcile([]Summary{{Key: "id:session-1", ActivityToken: "token-2"}}); err != nil { + t.Fatalf("reconcile: %v", err) + } + if got := len(store.Snapshot()); got != 0 { + t.Fatalf("expected ended mark to clear, got %d", got) + } + + reloaded, err := LoadStateStore(path) + if err != nil { + t.Fatalf("reload store: %v", err) + } + if got := len(reloaded.Snapshot()); got != 0 { + t.Fatalf("expected persisted ended mark to clear, got %d", got) + } +} + +const timeLayout = "2006-01-02T15:04:05Z07:00" diff --git a/internal/active/state.go b/internal/active/state.go new file mode 100644 index 0000000..1bedd71 --- /dev/null +++ b/internal/active/state.go @@ -0,0 +1,182 @@ +package active + +import ( + "encoding/json" + "errors" + "os" + "path/filepath" + "sync" + "time" +) + +const stateFileVersion = 1 + +// EndedMark is the persisted manual end marker for a session. +type EndedMark struct { + ActivityToken string `json:"activity_token,omitempty"` + EndedAt time.Time `json:"ended_at,omitempty"` +} + +type persistedState struct { + Version int `json:"version"` + Ended map[string]EndedMark `json:"ended,omitempty"` +} + +// StateStore persists manual ended/reopened flags independently from session files. +type StateStore struct { + path string + mu sync.RWMutex + data persistedState +} + +// DefaultStatePath returns the default path for persisted session-state metadata. +func DefaultStatePath(sessionsDir string) string { + return filepath.Join(filepath.Dir(sessionsDir), "session_state.json") +} + +// LoadStateStore opens or creates an empty state store. +func LoadStateStore(path string) (*StateStore, error) { + store := &StateStore{ + path: path, + data: persistedState{ + Version: stateFileVersion, + Ended: map[string]EndedMark{}, + }, + } + if path == "" { + return store, nil + } + + data, err := os.ReadFile(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return store, nil + } + return nil, err + } + if len(data) == 0 { + return store, nil + } + + var decoded persistedState + if err := json.Unmarshal(data, &decoded); err != nil { + return nil, err + } + if decoded.Version == 0 { + decoded.Version = stateFileVersion + } + if decoded.Ended == nil { + decoded.Ended = map[string]EndedMark{} + } + store.data = decoded + return store, nil +} + +// Path returns the backing file path. +func (s *StateStore) Path() string { + if s == nil { + return "" + } + return s.path +} + +// Snapshot returns a copy of the current ended marks. +func (s *StateStore) Snapshot() map[string]EndedMark { + if s == nil { + return nil + } + s.mu.RLock() + defer s.mu.RUnlock() + out := make(map[string]EndedMark, len(s.data.Ended)) + for key, value := range s.data.Ended { + out[key] = value + } + return out +} + +// MarkEnded stores the current activity token as manually ended. +func (s *StateStore) MarkEnded(key string, token string) error { + if s == nil || key == "" { + return nil + } + s.mu.Lock() + defer s.mu.Unlock() + s.data.Version = stateFileVersion + if s.data.Ended == nil { + s.data.Ended = map[string]EndedMark{} + } + s.data.Ended[key] = EndedMark{ + ActivityToken: token, + EndedAt: time.Now().UTC(), + } + return s.saveLocked() +} + +// Reopen removes an ended mark. +func (s *StateStore) Reopen(key string) error { + if s == nil || key == "" { + return nil + } + s.mu.Lock() + defer s.mu.Unlock() + if len(s.data.Ended) == 0 { + return nil + } + if _, ok := s.data.Ended[key]; !ok { + return nil + } + delete(s.data.Ended, key) + return s.saveLocked() +} + +// Reconcile clears ended markers when the underlying session received new activity. +func (s *StateStore) Reconcile(summaries []Summary) error { + if s == nil { + return nil + } + current := make(map[string]string, len(summaries)) + for _, summary := range summaries { + current[summary.Key] = summary.ActivityToken + } + + s.mu.Lock() + defer s.mu.Unlock() + if len(s.data.Ended) == 0 { + return nil + } + + changed := false + for key, mark := range s.data.Ended { + token, ok := current[key] + if !ok || token == "" || mark.ActivityToken == "" { + continue + } + if mark.ActivityToken != token { + delete(s.data.Ended, key) + changed = true + } + } + if !changed { + return nil + } + return s.saveLocked() +} + +func (s *StateStore) saveLocked() error { + if s.path == "" { + return nil + } + if err := os.MkdirAll(filepath.Dir(s.path), 0o700); err != nil { + return err + } + payload, err := json.MarshalIndent(s.data, "", " ") + if err != nil { + return err + } + payload = append(payload, '\n') + tmpPath := s.path + ".tmp" + if err := os.WriteFile(tmpPath, payload, 0o600); err != nil { + return err + } + return os.Rename(tmpPath, s.path) +} diff --git a/internal/notifications/store.go b/internal/notifications/store.go new file mode 100644 index 0000000..3f2c816 --- /dev/null +++ b/internal/notifications/store.go @@ -0,0 +1,211 @@ +package notifications + +import ( + "bufio" + "crypto/rand" + "encoding/hex" + "encoding/json" + "errors" + "os" + "path/filepath" + "sort" + "strings" + "sync" + "time" +) + +// Entry is one received notification request. +type Entry struct { + ID string `json:"id"` + ReceivedAt time.Time `json:"received_at"` + Method string `json:"method"` + Path string `json:"path"` + ContentType string `json:"content_type,omitempty"` + UserAgent string `json:"user_agent,omitempty"` + RemoteAddr string `json:"remote_addr,omitempty"` + Headers map[string][]string `json:"headers,omitempty"` + Body string `json:"body,omitempty"` + PrettyBody string `json:"pretty_body,omitempty"` + IsJSON bool `json:"is_json,omitempty"` + Size int `json:"size"` + Preview string `json:"preview,omitempty"` +} + +// Store persists received notifications and keeps them in memory for rendering. +type Store struct { + path string + mu sync.RWMutex + entries []Entry +} + +// DefaultPath returns the default log file for received notifications. +func DefaultPath(sessionsDir string) string { + return filepath.Join(filepath.Dir(sessionsDir), "notifications.jsonl") +} + +// LoadStore opens or creates an empty store. +func LoadStore(path string) (*Store, error) { + store := &Store{path: path, entries: []Entry{}} + if path == "" { + return store, nil + } + + file, err := os.Open(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return store, nil + } + return nil, err + } + defer file.Close() + + scanner := bufio.NewScanner(file) + scanner.Buffer(make([]byte, 0, 64*1024), 4*1024*1024) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + var entry Entry + if err := json.Unmarshal([]byte(line), &entry); err != nil { + return nil, err + } + store.entries = append(store.entries, entry) + } + if err := scanner.Err(); err != nil { + return nil, err + } + return store, nil +} + +// AppendRequest records one incoming request body and metadata. +func (s *Store) AppendRequest(method string, path string, contentType string, userAgent string, remoteAddr string, headers map[string][]string, body []byte) (Entry, error) { + entry, err := buildEntry(method, path, contentType, userAgent, remoteAddr, headers, body) + if err != nil { + return Entry{}, err + } + + s.mu.Lock() + defer s.mu.Unlock() + + s.entries = append(s.entries, entry) + if err := s.appendLocked(entry); err != nil { + s.entries = s.entries[:len(s.entries)-1] + return Entry{}, err + } + return entry, nil +} + +// Entries returns notifications ordered from newest to oldest. +func (s *Store) Entries() []Entry { + s.mu.RLock() + defer s.mu.RUnlock() + if len(s.entries) == 0 { + return nil + } + out := make([]Entry, len(s.entries)) + for i := range s.entries { + out[len(s.entries)-1-i] = s.entries[i] + } + return out +} + +func buildEntry(method string, path string, contentType string, userAgent string, remoteAddr string, headers map[string][]string, body []byte) (Entry, error) { + id, err := randomID() + if err != nil { + return Entry{}, err + } + + text := strings.ToValidUTF8(string(body), "\uFFFD") + text = strings.ReplaceAll(text, "\x00", "") + pretty := "" + isJSON := false + trimmed := strings.TrimSpace(text) + if trimmed != "" { + var payload any + if err := json.Unmarshal([]byte(trimmed), &payload); err == nil { + formatted, err := json.MarshalIndent(payload, "", " ") + if err == nil { + pretty = string(formatted) + isJSON = true + } + } + } + + return Entry{ + ID: id, + ReceivedAt: time.Now().UTC(), + Method: strings.ToUpper(strings.TrimSpace(method)), + Path: strings.TrimSpace(path), + ContentType: strings.TrimSpace(contentType), + UserAgent: strings.TrimSpace(userAgent), + RemoteAddr: strings.TrimSpace(remoteAddr), + Headers: cloneHeaders(headers), + Body: text, + PrettyBody: pretty, + IsJSON: isJSON, + Size: len(body), + Preview: previewText(text, 160), + }, nil +} + +func cloneHeaders(headers map[string][]string) map[string][]string { + if len(headers) == 0 { + return nil + } + out := make(map[string][]string, len(headers)) + for key, values := range headers { + copied := append([]string(nil), values...) + sort.Strings(copied) + out[key] = copied + } + return out +} + +func (s *Store) appendLocked(entry Entry) error { + if s.path == "" { + return nil + } + if err := os.MkdirAll(filepath.Dir(s.path), 0o700); err != nil { + return err + } + payload, err := json.Marshal(entry) + if err != nil { + return err + } + payload = append(payload, '\n') + file, err := os.OpenFile(s.path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o600) + if err != nil { + return err + } + defer file.Close() + _, err = file.Write(payload) + return err +} + +func randomID() (string, error) { + buf := make([]byte, 8) + if _, err := rand.Read(buf); err != nil { + return "", err + } + return hex.EncodeToString(buf), nil +} + +func previewText(value string, max int) string { + value = strings.TrimSpace(value) + if value == "" { + return "" + } + value = strings.Join(strings.Fields(value), " ") + if max <= 0 { + return value + } + runes := []rune(value) + if len(runes) <= max { + return value + } + if max > 3 { + return string(runes[:max-3]) + "..." + } + return string(runes[:max]) +} diff --git a/internal/notifications/store_test.go b/internal/notifications/store_test.go new file mode 100644 index 0000000..dd1cdf4 --- /dev/null +++ b/internal/notifications/store_test.go @@ -0,0 +1,55 @@ +package notifications + +import ( + "path/filepath" + "testing" +) + +func TestStoreAppendRequestPersistsAndLoads(t *testing.T) { + path := filepath.Join(t.TempDir(), "notifications.jsonl") + + store, err := LoadStore(path) + if err != nil { + t.Fatalf("load store: %v", err) + } + + entry, err := store.AppendRequest( + "post", + "/hook", + "application/json", + "curl/8.0", + "127.0.0.1:12345", + map[string][]string{"X-Test": {"b", "a"}}, + []byte("{\"type\":\"task_complete\",\"message\":\"done\"}"), + ) + if err != nil { + t.Fatalf("append request: %v", err) + } + if entry.ID == "" { + t.Fatal("expected entry id") + } + if !entry.IsJSON { + t.Fatal("expected json body") + } + if entry.PrettyBody == "" { + t.Fatal("expected pretty json body") + } + if entry.Preview == "" { + t.Fatal("expected preview") + } + + reloaded, err := LoadStore(path) + if err != nil { + t.Fatalf("reload store: %v", err) + } + entries := reloaded.Entries() + if len(entries) != 1 { + t.Fatalf("expected 1 entry, got %d", len(entries)) + } + if entries[0].ID != entry.ID { + t.Fatalf("expected matching id, got %q vs %q", entries[0].ID, entry.ID) + } + if entries[0].Headers["X-Test"][0] != "a" { + t.Fatalf("expected sorted header values, got %#v", entries[0].Headers["X-Test"]) + } +} diff --git a/internal/render/templates/active.html b/internal/render/templates/active.html new file mode 100644 index 0000000..fdf45de --- /dev/null +++ b/internal/render/templates/active.html @@ -0,0 +1,233 @@ +{{ define "active" }} + + +
+ + +All dates / All directories / Notifications
+All dates{{ if .SelectedCwd }} / All directories / Directory dates{{ end }}
+All dates / Active threads / Notifications{{ if .SelectedCwd }} / All directories / Directory dates{{ end }}
No sessions found for this directory today.
+