From 0172ebf9f3cebef3450de307cda76be7a6a21d05 Mon Sep 17 00:00:00 2001 From: Robin White Date: Tue, 17 Mar 2026 14:15:50 -0400 Subject: [PATCH 01/10] feat(tui): status bar with connection state --- internal/tui/statusbar.go | 83 ++++++++++++++++++++++++++++++++++ internal/tui/statusbar_test.go | 47 +++++++++++++++++++ 2 files changed, 130 insertions(+) create mode 100644 internal/tui/statusbar.go create mode 100644 internal/tui/statusbar_test.go diff --git a/internal/tui/statusbar.go b/internal/tui/statusbar.go new file mode 100644 index 0000000..10ee269 --- /dev/null +++ b/internal/tui/statusbar.go @@ -0,0 +1,83 @@ +package tui + +import ( + "fmt" + + "github.com/charmbracelet/lipgloss" +) + +var ( + statusBarStyle = lipgloss.NewStyle(). + Background(lipgloss.Color("236")). + Foreground(lipgloss.Color("252")). + Padding(0, 1) + statusConnectedStyle = lipgloss.NewStyle(). + Foreground(lipgloss.Color("42")) + statusDisconnectedStyle = lipgloss.NewStyle(). + Foreground(lipgloss.Color("196")) + statusErrorStyle = lipgloss.NewStyle(). + Foreground(lipgloss.Color("196")). + Bold(true) +) + +type StatusBar struct { + connected bool + threadCount int + selectedThread string + errorMessage string + width int +} + +func NewStatusBar() *StatusBar { + return &StatusBar{} +} + +func (statusBar *StatusBar) SetConnected(connected bool) { + statusBar.connected = connected + if connected { + statusBar.errorMessage = "" + } +} + +func (statusBar *StatusBar) SetThreadCount(count int) { + statusBar.threadCount = count +} + +func (statusBar *StatusBar) SetSelectedThread(name string) { + statusBar.selectedThread = name +} + +func (statusBar *StatusBar) SetError(message string) { + statusBar.errorMessage = message +} + +func (statusBar *StatusBar) SetWidth(width int) { + statusBar.width = width +} + +func (statusBar StatusBar) View() string { + var left string + if statusBar.connected { + left = statusConnectedStyle.Render("● Connected") + } else { + left = statusDisconnectedStyle.Render("○ Disconnected") + } + + if statusBar.errorMessage != "" { + left += " " + statusErrorStyle.Render(statusBar.errorMessage) + } + + middle := "" + if statusBar.threadCount > 0 { + middle = fmt.Sprintf(" | %d threads", statusBar.threadCount) + } + + right := "" + if statusBar.selectedThread != "" { + right = fmt.Sprintf(" | %s", statusBar.selectedThread) + } + + content := left + middle + right + style := statusBarStyle.Width(statusBar.width) + return style.Render(content) +} diff --git a/internal/tui/statusbar_test.go b/internal/tui/statusbar_test.go new file mode 100644 index 0000000..3016d51 --- /dev/null +++ b/internal/tui/statusbar_test.go @@ -0,0 +1,47 @@ +package tui + +import ( + "strings" + "testing" +) + +func TestStatusBarConnected(t *testing.T) { + bar := NewStatusBar() + bar.SetConnected(true) + bar.SetThreadCount(3) + bar.SetSelectedThread("Build web app") + + output := bar.View() + + if !strings.Contains(output, "Connected") { + t.Errorf("expected Connected in output:\n%s", output) + } + if !strings.Contains(output, "3 threads") { + t.Errorf("expected thread count in output:\n%s", output) + } + if !strings.Contains(output, "Build web app") { + t.Errorf("expected selected thread in output:\n%s", output) + } +} + +func TestStatusBarDisconnected(t *testing.T) { + bar := NewStatusBar() + bar.SetConnected(false) + + output := bar.View() + + if !strings.Contains(output, "Disconnected") { + t.Errorf("expected Disconnected in output:\n%s", output) + } +} + +func TestStatusBarError(t *testing.T) { + bar := NewStatusBar() + bar.SetError("connection lost") + + output := bar.View() + + if !strings.Contains(output, "connection lost") { + t.Errorf("expected error in output:\n%s", output) + } +} From 5207bb1bfab27ef06e7517c90e36fac3bdb9d6e2 Mon Sep 17 00:00:00 2001 From: Robin White Date: Tue, 17 Mar 2026 14:19:22 -0400 Subject: [PATCH 02/10] feat: wire app-server lifecycle into TUI startup --- cmd/dj/main.go | 10 ++- internal/tui/actions_test.go | 4 +- internal/tui/app.go | 167 +++++++++++++---------------------- internal/tui/app_session.go | 101 +++++++++++++++++++++ internal/tui/app_test.go | 30 +++---- internal/tui/bridge.go | 7 +- internal/tui/bridge_test.go | 5 +- internal/tui/msgs.go | 5 ++ 8 files changed, 198 insertions(+), 131 deletions(-) create mode 100644 internal/tui/app_session.go diff --git a/cmd/dj/main.go b/cmd/dj/main.go index 1cc16ac..975a0b3 100644 --- a/cmd/dj/main.go +++ b/cmd/dj/main.go @@ -7,6 +7,7 @@ import ( tea "github.com/charmbracelet/bubbletea" "github.com/spf13/cobra" + "github.com/robinojw/dj/internal/appserver" "github.com/robinojw/dj/internal/config" "github.com/robinojw/dj/internal/state" "github.com/robinojw/dj/internal/tui" @@ -37,12 +38,15 @@ func runApp(cmd *cobra.Command, args []string) error { return fmt.Errorf("load config: %w", err) } - _ = cfg - + client := appserver.NewClient(cfg.AppServer.Command, cfg.AppServer.Args...) store := state.NewThreadStore() - app := tui.NewAppModel(store) + app := tui.NewAppModel(store, client) program := tea.NewProgram(app, tea.WithAltScreen()) + app.SetProgram(program) + _, err = program.Run() + + client.Stop() return err } diff --git a/internal/tui/actions_test.go b/internal/tui/actions_test.go index 37a6c93..a4e2255 100644 --- a/internal/tui/actions_test.go +++ b/internal/tui/actions_test.go @@ -28,7 +28,7 @@ func TestMenuEnterDispatchesFork(t *testing.T) { store := state.NewThreadStore() store.Add("t-1", "Test") - app := NewAppModel(store) + app := NewAppModel(store, nil) ctrlB := tea.KeyMsg{Type: tea.KeyCtrlB} updated, _ := app.Update(ctrlB) @@ -64,7 +64,7 @@ func TestMenuEnterDispatchesDelete(t *testing.T) { store := state.NewThreadStore() store.Add("t-1", "Test") - app := NewAppModel(store) + app := NewAppModel(store, nil) ctrlB := tea.KeyMsg{Type: tea.KeyCtrlB} updated, _ := app.Update(ctrlB) diff --git a/internal/tui/app.go b/internal/tui/app.go index bd2a41f..0f16cbf 100644 --- a/internal/tui/app.go +++ b/internal/tui/app.go @@ -1,8 +1,11 @@ package tui import ( + "context" + tea "github.com/charmbracelet/bubbletea" "github.com/charmbracelet/lipgloss" + "github.com/robinojw/dj/internal/appserver" "github.com/robinojw/dj/internal/state" ) @@ -19,12 +22,15 @@ var titleStyle = lipgloss.NewStyle(). type AppModel struct { store *state.ThreadStore + client *appserver.Client + program *tea.Program canvas CanvasModel tree TreeModel session *SessionModel prefix *PrefixHandler menu MenuModel help HelpModel + statusBar *StatusBar menuVisible bool helpVisible bool focus int @@ -32,16 +38,22 @@ type AppModel struct { height int } -func NewAppModel(store *state.ThreadStore) AppModel { +func NewAppModel(store *state.ThreadStore, client *appserver.Client) AppModel { return AppModel{ - store: store, - canvas: NewCanvasModel(store), - tree: NewTreeModel(store), - prefix: NewPrefixHandler(), - help: NewHelpModel(), + store: store, + client: client, + canvas: NewCanvasModel(store), + tree: NewTreeModel(store), + prefix: NewPrefixHandler(), + help: NewHelpModel(), + statusBar: NewStatusBar(), } } +func (app *AppModel) SetProgram(program *tea.Program) { + app.program = program +} + func (app AppModel) Focus() int { return app.focus } @@ -51,7 +63,35 @@ func (app AppModel) HelpVisible() bool { } func (app AppModel) Init() tea.Cmd { - return nil + if app.client == nil { + return nil + } + + return func() tea.Msg { + ctx := context.Background() + if err := app.client.Start(ctx); err != nil { + return AppServerErrorMsg{Err: err} + } + + router := appserver.NewNotificationRouter() + app.client.Router = router + + if app.program != nil { + WireEventBridge(router, app.program) + } + + go app.client.ReadLoop(app.client.Dispatch) + + caps, err := app.client.Initialize(ctx) + if err != nil { + return AppServerErrorMsg{Err: err} + } + + return AppServerConnectedMsg{ + ServerName: caps.ServerInfo.Name, + ServerVersion: caps.ServerInfo.Version, + } + } } func (app AppModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { @@ -61,6 +101,7 @@ func (app AppModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { case tea.WindowSizeMsg: app.width = msg.Width app.height = msg.Height + app.statusBar.SetWidth(msg.Width) if app.session != nil { app.session.SetSize(msg.Width, msg.Height) } @@ -74,6 +115,12 @@ func (app AppModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { return app.handleThreadDelta(msg) case CommandOutputMsg: return app.handleCommandOutput(msg) + case AppServerConnectedMsg: + app.statusBar.SetConnected(true) + return app, nil + case AppServerErrorMsg: + app.statusBar.SetError(msg.Error()) + return app, nil } return app, nil } @@ -171,114 +218,20 @@ func (app *AppModel) handleCanvasArrow(msg tea.KeyMsg) { } } -func (app AppModel) handleSessionKey(msg tea.KeyMsg) (tea.Model, tea.Cmd) { - switch msg.Type { - case tea.KeyCtrlC: - return app, tea.Quit - case tea.KeyEsc: - app.closeSession() - return app, nil - case tea.KeyUp, tea.KeyDown, tea.KeyPgUp, tea.KeyPgDown: - app.scrollSession(msg) - return app, nil - } - return app, nil -} - -func (app AppModel) openSession() (tea.Model, tea.Cmd) { - threadID := app.canvas.SelectedThreadID() - if threadID == "" { - return app, nil - } - - thread, exists := app.store.Get(threadID) - if !exists { - return app, nil - } - - session := NewSessionModel(thread) - session.SetSize(app.width, app.height) - app.session = &session - app.focus = FocusSession - return app, nil -} - -func (app *AppModel) closeSession() { - app.session = nil - app.focus = FocusCanvas -} - -func (app *AppModel) scrollSession(msg tea.KeyMsg) { - if app.session == nil { - return - } - - switch msg.Type { - case tea.KeyUp: - app.session.viewport.ScrollUp(1) - case tea.KeyDown: - app.session.viewport.ScrollDown(1) - case tea.KeyPgUp: - app.session.viewport.HalfPageUp() - case tea.KeyPgDown: - app.session.viewport.HalfPageDown() - } -} - -func (app *AppModel) refreshSession() { - if app.session == nil { - return - } - app.session.Refresh() -} - -func (app AppModel) handleThreadMessage(msg ThreadMessageMsg) (tea.Model, tea.Cmd) { - thread, exists := app.store.Get(msg.ThreadID) - if !exists { - return app, nil - } - thread.AppendMessage(state.ChatMessage{ - ID: msg.MessageID, - Role: msg.Role, - Content: msg.Content, - }) - app.refreshSession() - return app, nil -} - -func (app AppModel) handleThreadDelta(msg ThreadDeltaMsg) (tea.Model, tea.Cmd) { - thread, exists := app.store.Get(msg.ThreadID) - if !exists { - return app, nil - } - thread.AppendDelta(msg.MessageID, msg.Delta) - app.refreshSession() - return app, nil -} - -func (app AppModel) handleCommandOutput(msg CommandOutputMsg) (tea.Model, tea.Cmd) { - thread, exists := app.store.Get(msg.ThreadID) - if !exists { - return app, nil - } - thread.AppendOutput(msg.ExecID, msg.Data) - app.refreshSession() - return app, nil -} - func (app AppModel) View() string { title := titleStyle.Render("DJ — Codex TUI Visualizer") + status := app.statusBar.View() if app.helpVisible { - return title + "\n" + app.help.View() + "\n" + return title + "\n" + app.help.View() + "\n" + status } if app.menuVisible { - return title + "\n" + app.menu.View() + "\n" + return title + "\n" + app.menu.View() + "\n" + status } if app.focus == FocusSession && app.session != nil { - return title + "\n" + app.session.View() + "\n" + return title + "\n" + app.session.View() + "\n" + status } canvas := app.canvas.View() @@ -286,8 +239,8 @@ func (app AppModel) View() string { if app.focus == FocusTree { treeView := app.tree.View() body := lipgloss.JoinHorizontal(lipgloss.Top, treeView+" ", canvas) - return title + "\n" + body + "\n" + return title + "\n" + body + "\n" + status } - return title + "\n" + canvas + "\n" + return title + "\n" + canvas + "\n" + status } diff --git a/internal/tui/app_session.go b/internal/tui/app_session.go new file mode 100644 index 0000000..1a8bcff --- /dev/null +++ b/internal/tui/app_session.go @@ -0,0 +1,101 @@ +package tui + +import ( + tea "github.com/charmbracelet/bubbletea" + "github.com/robinojw/dj/internal/state" +) + +func (app AppModel) handleSessionKey(msg tea.KeyMsg) (tea.Model, tea.Cmd) { + switch msg.Type { + case tea.KeyCtrlC: + return app, tea.Quit + case tea.KeyEsc: + app.closeSession() + return app, nil + case tea.KeyUp, tea.KeyDown, tea.KeyPgUp, tea.KeyPgDown: + app.scrollSession(msg) + return app, nil + } + return app, nil +} + +func (app AppModel) openSession() (tea.Model, tea.Cmd) { + threadID := app.canvas.SelectedThreadID() + if threadID == "" { + return app, nil + } + + thread, exists := app.store.Get(threadID) + if !exists { + return app, nil + } + + session := NewSessionModel(thread) + session.SetSize(app.width, app.height) + app.session = &session + app.focus = FocusSession + return app, nil +} + +func (app *AppModel) closeSession() { + app.session = nil + app.focus = FocusCanvas +} + +func (app *AppModel) scrollSession(msg tea.KeyMsg) { + if app.session == nil { + return + } + + switch msg.Type { + case tea.KeyUp: + app.session.viewport.ScrollUp(1) + case tea.KeyDown: + app.session.viewport.ScrollDown(1) + case tea.KeyPgUp: + app.session.viewport.HalfPageUp() + case tea.KeyPgDown: + app.session.viewport.HalfPageDown() + } +} + +func (app *AppModel) refreshSession() { + if app.session == nil { + return + } + app.session.Refresh() +} + +func (app AppModel) handleThreadMessage(msg ThreadMessageMsg) (tea.Model, tea.Cmd) { + thread, exists := app.store.Get(msg.ThreadID) + if !exists { + return app, nil + } + thread.AppendMessage(state.ChatMessage{ + ID: msg.MessageID, + Role: msg.Role, + Content: msg.Content, + }) + app.refreshSession() + return app, nil +} + +func (app AppModel) handleThreadDelta(msg ThreadDeltaMsg) (tea.Model, tea.Cmd) { + thread, exists := app.store.Get(msg.ThreadID) + if !exists { + return app, nil + } + thread.AppendDelta(msg.MessageID, msg.Delta) + app.refreshSession() + return app, nil +} + +func (app AppModel) handleCommandOutput(msg CommandOutputMsg) (tea.Model, tea.Cmd) { + thread, exists := app.store.Get(msg.ThreadID) + if !exists { + return app, nil + } + thread.AppendOutput(msg.ExecID, msg.Data) + app.refreshSession() + return app, nil +} diff --git a/internal/tui/app_test.go b/internal/tui/app_test.go index bac2a3d..be26558 100644 --- a/internal/tui/app_test.go +++ b/internal/tui/app_test.go @@ -12,7 +12,7 @@ func TestAppHandlesArrowKeys(t *testing.T) { store.Add("t-1", "First") store.Add("t-2", "Second") - app := NewAppModel(store) + app := NewAppModel(store, nil) rightKey := tea.KeyMsg{Type: tea.KeyRight} updated, _ := app.Update(rightKey) @@ -27,7 +27,7 @@ func TestAppHandlesThreadStatusMsg(t *testing.T) { store := state.NewThreadStore() store.Add("t-1", "Initial") - app := NewAppModel(store) + app := NewAppModel(store, nil) msg := ThreadStatusMsg{ ThreadID: "t-1", @@ -49,7 +49,7 @@ func TestAppToggleFocus(t *testing.T) { store := state.NewThreadStore() store.Add("t-1", "Test") - app := NewAppModel(store) + app := NewAppModel(store, nil) if app.Focus() != FocusCanvas { t.Errorf("expected canvas focus, got %d", app.Focus()) @@ -69,7 +69,7 @@ func TestAppTreeNavigationWhenFocused(t *testing.T) { store.Add("t-1", "First") store.Add("t-2", "Second") - app := NewAppModel(store) + app := NewAppModel(store, nil) toggleKey := tea.KeyMsg{Type: tea.KeyRunes, Runes: []rune{'t'}} updated, _ := app.Update(toggleKey) @@ -86,7 +86,7 @@ func TestAppTreeNavigationWhenFocused(t *testing.T) { func TestAppHandlesQuit(t *testing.T) { store := state.NewThreadStore() - app := NewAppModel(store) + app := NewAppModel(store, nil) quitKey := tea.KeyMsg{Type: tea.KeyCtrlC} _, cmd := app.Update(quitKey) @@ -100,7 +100,7 @@ func TestAppEnterOpensSession(t *testing.T) { store := state.NewThreadStore() store.Add("t-1", "Test Task") - app := NewAppModel(store) + app := NewAppModel(store, nil) enterKey := tea.KeyMsg{Type: tea.KeyEnter} updated, _ := app.Update(enterKey) @@ -115,7 +115,7 @@ func TestAppEscClosesSession(t *testing.T) { store := state.NewThreadStore() store.Add("t-1", "Test Task") - app := NewAppModel(store) + app := NewAppModel(store, nil) enterKey := tea.KeyMsg{Type: tea.KeyEnter} updated, _ := app.Update(enterKey) @@ -132,7 +132,7 @@ func TestAppEscClosesSession(t *testing.T) { func TestAppEnterWithNoThreadsDoesNothing(t *testing.T) { store := state.NewThreadStore() - app := NewAppModel(store) + app := NewAppModel(store, nil) enterKey := tea.KeyMsg{Type: tea.KeyEnter} updated, _ := app.Update(enterKey) @@ -147,7 +147,7 @@ func TestAppCtrlBMOpensMenu(t *testing.T) { store := state.NewThreadStore() store.Add("t-1", "Test") - app := NewAppModel(store) + app := NewAppModel(store, nil) ctrlB := tea.KeyMsg{Type: tea.KeyCtrlB} updated, _ := app.Update(ctrlB) @@ -166,7 +166,7 @@ func TestAppMenuEscCloses(t *testing.T) { store := state.NewThreadStore() store.Add("t-1", "Test") - app := NewAppModel(store) + app := NewAppModel(store, nil) ctrlB := tea.KeyMsg{Type: tea.KeyCtrlB} updated, _ := app.Update(ctrlB) @@ -189,7 +189,7 @@ func TestAppCtrlBEscCancelsPrefix(t *testing.T) { store := state.NewThreadStore() store.Add("t-1", "Test") - app := NewAppModel(store) + app := NewAppModel(store, nil) ctrlB := tea.KeyMsg{Type: tea.KeyCtrlB} updated, _ := app.Update(ctrlB) @@ -208,7 +208,7 @@ func TestAppMenuNavigation(t *testing.T) { store := state.NewThreadStore() store.Add("t-1", "Test") - app := NewAppModel(store) + app := NewAppModel(store, nil) ctrlB := tea.KeyMsg{Type: tea.KeyCtrlB} updated, _ := app.Update(ctrlB) @@ -229,7 +229,7 @@ func TestAppMenuNavigation(t *testing.T) { func TestAppHelpToggle(t *testing.T) { store := state.NewThreadStore() - app := NewAppModel(store) + app := NewAppModel(store, nil) helpKey := tea.KeyMsg{Type: tea.KeyRunes, Runes: []rune{'?'}} updated, _ := app.Update(helpKey) @@ -249,7 +249,7 @@ func TestAppHelpToggle(t *testing.T) { func TestAppHelpEscCloses(t *testing.T) { store := state.NewThreadStore() - app := NewAppModel(store) + app := NewAppModel(store, nil) helpKey := tea.KeyMsg{Type: tea.KeyRunes, Runes: []rune{'?'}} updated, _ := app.Update(helpKey) @@ -268,7 +268,7 @@ func TestAppSessionRefreshesOnMessage(t *testing.T) { store := state.NewThreadStore() store.Add("t-1", "Test") - app := NewAppModel(store) + app := NewAppModel(store, nil) enterKey := tea.KeyMsg{Type: tea.KeyEnter} updated, _ := app.Update(enterKey) diff --git a/internal/tui/bridge.go b/internal/tui/bridge.go index df64794..78dfc17 100644 --- a/internal/tui/bridge.go +++ b/internal/tui/bridge.go @@ -1,9 +1,12 @@ package tui -import "github.com/robinojw/dj/internal/appserver" +import ( + tea "github.com/charmbracelet/bubbletea" + "github.com/robinojw/dj/internal/appserver" +) type MessageSender interface { - Send(msg any) + Send(msg tea.Msg) } func WireEventBridge(router *appserver.NotificationRouter, sender MessageSender) { diff --git a/internal/tui/bridge_test.go b/internal/tui/bridge_test.go index 8ea6ef2..aed4988 100644 --- a/internal/tui/bridge_test.go +++ b/internal/tui/bridge_test.go @@ -3,14 +3,15 @@ package tui import ( "testing" + tea "github.com/charmbracelet/bubbletea" "github.com/robinojw/dj/internal/appserver" ) type mockSender struct { - messages []any + messages []tea.Msg } -func (mock *mockSender) Send(msg any) { +func (mock *mockSender) Send(msg tea.Msg) { mock.messages = append(mock.messages, msg) } diff --git a/internal/tui/msgs.go b/internal/tui/msgs.go index b5d1949..bc673c7 100644 --- a/internal/tui/msgs.go +++ b/internal/tui/msgs.go @@ -40,6 +40,11 @@ type ThreadDeletedMsg struct { ThreadID string } +type AppServerConnectedMsg struct { + ServerName string + ServerVersion string +} + type AppServerErrorMsg struct { Err error } From 44cfec258404a3540e91dde2d24169f7da440f9a Mon Sep 17 00:00:00 2001 From: Robin White Date: Tue, 17 Mar 2026 14:20:37 -0400 Subject: [PATCH 03/10] feat(tui): 'n' key creates new thread via app-server --- internal/tui/app.go | 26 ++++++++++++++++++++++++++ internal/tui/app_test.go | 12 ++++++++++++ 2 files changed, 38 insertions(+) diff --git a/internal/tui/app.go b/internal/tui/app.go index 0f16cbf..b8f81c6 100644 --- a/internal/tui/app.go +++ b/internal/tui/app.go @@ -2,6 +2,7 @@ package tui import ( "context" + "fmt" tea "github.com/charmbracelet/bubbletea" "github.com/charmbracelet/lipgloss" @@ -115,6 +116,10 @@ func (app AppModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { return app.handleThreadDelta(msg) case CommandOutputMsg: return app.handleCommandOutput(msg) + case ThreadCreatedMsg: + app.store.Add(msg.ThreadID, msg.Title) + app.statusBar.SetThreadCount(len(app.store.All())) + return app, nil case AppServerConnectedMsg: app.statusBar.SetConnected(true) return app, nil @@ -166,10 +171,31 @@ func (app AppModel) handleRune(msg tea.KeyMsg) (tea.Model, tea.Cmd) { app.toggleFocus() case "?": app.helpVisible = !app.helpVisible + case "n": + return app, app.createThreadCmd() } return app, nil } +func (app AppModel) createThreadCmd() tea.Cmd { + return func() tea.Msg { + if app.client == nil { + return AppServerErrorMsg{Err: fmt.Errorf("not connected to app-server")} + } + + ctx := context.Background() + result, err := app.client.CreateThread(ctx, "New thread") + if err != nil { + return AppServerErrorMsg{Err: err} + } + + return ThreadCreatedMsg{ + ThreadID: result.ThreadID, + Title: "New thread", + } + } +} + func (app AppModel) handleHelpKey(msg tea.KeyMsg) (tea.Model, tea.Cmd) { isToggle := msg.Type == tea.KeyRunes && msg.String() == "?" isEsc := msg.Type == tea.KeyEsc diff --git a/internal/tui/app_test.go b/internal/tui/app_test.go index be26558..4c68763 100644 --- a/internal/tui/app_test.go +++ b/internal/tui/app_test.go @@ -287,3 +287,15 @@ func TestAppSessionRefreshesOnMessage(t *testing.T) { t.Errorf("expected session focus maintained, got %d", app.Focus()) } } + +func TestAppNewThread(t *testing.T) { + store := state.NewThreadStore() + app := NewAppModel(store, nil) + + nKey := tea.KeyMsg{Type: tea.KeyRunes, Runes: []rune{'n'}} + _, cmd := app.Update(nKey) + + if cmd == nil { + t.Error("expected command for thread creation") + } +} From c6f316ffe6cd416b820f95d321ea0403df6934b1 Mon Sep 17 00:00:00 2001 From: Robin White Date: Tue, 17 Mar 2026 14:21:11 -0400 Subject: [PATCH 04/10] feat: graceful shutdown with deferred app-server cleanup --- cmd/dj/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/dj/main.go b/cmd/dj/main.go index 975a0b3..8d1b7c9 100644 --- a/cmd/dj/main.go +++ b/cmd/dj/main.go @@ -39,6 +39,8 @@ func runApp(cmd *cobra.Command, args []string) error { } client := appserver.NewClient(cfg.AppServer.Command, cfg.AppServer.Args...) + defer client.Stop() + store := state.NewThreadStore() app := tui.NewAppModel(store, client) @@ -46,7 +48,5 @@ func runApp(cmd *cobra.Command, args []string) error { app.SetProgram(program) _, err = program.Run() - - client.Stop() return err } From 3cf8f178d248b0eb0f130d2b039f69f982199bfc Mon Sep 17 00:00:00 2001 From: Robin White Date: Tue, 17 Mar 2026 14:21:47 -0400 Subject: [PATCH 05/10] test(tui): end-to-end integration test with real app-server --- internal/tui/integration_test.go | 48 ++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 internal/tui/integration_test.go diff --git a/internal/tui/integration_test.go b/internal/tui/integration_test.go new file mode 100644 index 0000000..5333835 --- /dev/null +++ b/internal/tui/integration_test.go @@ -0,0 +1,48 @@ +//go:build integration + +package tui + +import ( + "context" + "testing" + "time" + + "github.com/robinojw/dj/internal/appserver" + "github.com/robinojw/dj/internal/state" +) + +func TestIntegrationEndToEnd(t *testing.T) { + client := appserver.NewClient("codex", "app-server", "--listen", "stdio://") + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + if err := client.Start(ctx); err != nil { + t.Fatalf("Start failed: %v", err) + } + defer client.Stop() + + router := appserver.NewNotificationRouter() + client.Router = router + go client.ReadLoop(client.Dispatch) + + caps, err := client.Initialize(ctx) + if err != nil { + t.Fatalf("Initialize failed: %v", err) + } + t.Logf("Connected: %s %s", caps.ServerInfo.Name, caps.ServerInfo.Version) + + store := state.NewThreadStore() + + result, err := client.CreateThread(ctx, "Say hello") + if err != nil { + t.Fatalf("CreateThread failed: %v", err) + } + store.Add(result.ThreadID, "Say hello") + t.Logf("Created thread: %s", result.ThreadID) + + threads := store.All() + if len(threads) != 1 { + t.Fatalf("expected 1 thread, got %d", len(threads)) + } +} From b65f2b08e4bcc36511b6b20f2d0d6497bd344c60 Mon Sep 17 00:00:00 2001 From: Robin White Date: Tue, 17 Mar 2026 14:31:33 -0400 Subject: [PATCH 06/10] =?UTF-8?q?fix:=20app-server=20lifecycle=20bugs=20?= =?UTF-8?q?=E2=80=94=20value-copy=20bridge,=20disconnect=20guard,=20init?= =?UTF-8?q?=20timeout?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three root causes fixed: - SetProgram modified wrong copy (value type): moved router/bridge setup to main.go where program reference is available directly - 'n' key didn't check client.Running(), causing broken pipe on disconnected client - Init() used context.Background() with no timeout, hanging silently if app-server didn't respond; now uses 10s timeout --- cmd/dj/main.go | 5 ++++- internal/tui/app.go | 45 ++++++++++++++++++++------------------------- 2 files changed, 24 insertions(+), 26 deletions(-) diff --git a/cmd/dj/main.go b/cmd/dj/main.go index 8d1b7c9..25677a6 100644 --- a/cmd/dj/main.go +++ b/cmd/dj/main.go @@ -41,11 +41,14 @@ func runApp(cmd *cobra.Command, args []string) error { client := appserver.NewClient(cfg.AppServer.Command, cfg.AppServer.Args...) defer client.Stop() + router := appserver.NewNotificationRouter() + client.Router = router + store := state.NewThreadStore() app := tui.NewAppModel(store, client) program := tea.NewProgram(app, tea.WithAltScreen()) - app.SetProgram(program) + tui.WireEventBridge(router, program) _, err = program.Run() return err diff --git a/internal/tui/app.go b/internal/tui/app.go index b8f81c6..7c37036 100644 --- a/internal/tui/app.go +++ b/internal/tui/app.go @@ -3,6 +3,7 @@ package tui import ( "context" "fmt" + "time" tea "github.com/charmbracelet/bubbletea" "github.com/charmbracelet/lipgloss" @@ -10,6 +11,8 @@ import ( "github.com/robinojw/dj/internal/state" ) +const connectTimeout = 10 * time.Second + const ( FocusCanvas = iota FocusTree @@ -22,16 +25,16 @@ var titleStyle = lipgloss.NewStyle(). MarginBottom(1) type AppModel struct { - store *state.ThreadStore - client *appserver.Client - program *tea.Program - canvas CanvasModel - tree TreeModel - session *SessionModel - prefix *PrefixHandler - menu MenuModel - help HelpModel - statusBar *StatusBar + store *state.ThreadStore + client *appserver.Client + canvas CanvasModel + tree TreeModel + session *SessionModel + prefix *PrefixHandler + menu MenuModel + help HelpModel + statusBar *StatusBar + menuVisible bool helpVisible bool focus int @@ -51,10 +54,6 @@ func NewAppModel(store *state.ThreadStore, client *appserver.Client) AppModel { } } -func (app *AppModel) SetProgram(program *tea.Program) { - app.program = program -} - func (app AppModel) Focus() int { return app.focus } @@ -69,21 +68,16 @@ func (app AppModel) Init() tea.Cmd { } return func() tea.Msg { - ctx := context.Background() - if err := app.client.Start(ctx); err != nil { + if err := app.client.Start(context.Background()); err != nil { return AppServerErrorMsg{Err: err} } - router := appserver.NewNotificationRouter() - app.client.Router = router - - if app.program != nil { - WireEventBridge(router, app.program) - } - go app.client.ReadLoop(app.client.Dispatch) - caps, err := app.client.Initialize(ctx) + initCtx, cancel := context.WithTimeout(context.Background(), connectTimeout) + defer cancel() + + caps, err := app.client.Initialize(initCtx) if err != nil { return AppServerErrorMsg{Err: err} } @@ -179,7 +173,8 @@ func (app AppModel) handleRune(msg tea.KeyMsg) (tea.Model, tea.Cmd) { func (app AppModel) createThreadCmd() tea.Cmd { return func() tea.Msg { - if app.client == nil { + isDisconnected := app.client == nil || !app.client.Running() + if isDisconnected { return AppServerErrorMsg{Err: fmt.Errorf("not connected to app-server")} } From 569a533444e9be983e10f30158dc3afccc14f491 Mon Sep 17 00:00:00 2001 From: Robin White Date: Tue, 17 Mar 2026 14:38:02 -0400 Subject: [PATCH 07/10] fix: gate 'n' key on model-level connected state, not client.Running() client.Running() reflects Start() was called, not that the process is alive. When the app-server dies after Start(), Running() still returns true and CreateThread writes to a dead pipe. Fix: track connected bool in AppModel, set only on AppServerConnectedMsg. Check synchronously in handleRune before spawning any async command. No race condition possible since Update runs single-threaded. --- internal/tui/app.go | 15 +++++++-------- internal/tui/app_test.go | 20 ++++++++++++++++++-- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/internal/tui/app.go b/internal/tui/app.go index 7c37036..aae4020 100644 --- a/internal/tui/app.go +++ b/internal/tui/app.go @@ -2,7 +2,6 @@ package tui import ( "context" - "fmt" "time" tea "github.com/charmbracelet/bubbletea" @@ -35,6 +34,7 @@ type AppModel struct { help HelpModel statusBar *StatusBar + connected bool menuVisible bool helpVisible bool focus int @@ -115,6 +115,7 @@ func (app AppModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { app.statusBar.SetThreadCount(len(app.store.All())) return app, nil case AppServerConnectedMsg: + app.connected = true app.statusBar.SetConnected(true) return app, nil case AppServerErrorMsg: @@ -166,6 +167,10 @@ func (app AppModel) handleRune(msg tea.KeyMsg) (tea.Model, tea.Cmd) { case "?": app.helpVisible = !app.helpVisible case "n": + if !app.connected { + app.statusBar.SetError("not connected to app-server") + return app, nil + } return app, app.createThreadCmd() } return app, nil @@ -173,13 +178,7 @@ func (app AppModel) handleRune(msg tea.KeyMsg) (tea.Model, tea.Cmd) { func (app AppModel) createThreadCmd() tea.Cmd { return func() tea.Msg { - isDisconnected := app.client == nil || !app.client.Running() - if isDisconnected { - return AppServerErrorMsg{Err: fmt.Errorf("not connected to app-server")} - } - - ctx := context.Background() - result, err := app.client.CreateThread(ctx, "New thread") + result, err := app.client.CreateThread(context.Background(), "New thread") if err != nil { return AppServerErrorMsg{Err: err} } diff --git a/internal/tui/app_test.go b/internal/tui/app_test.go index 4c68763..2cffe36 100644 --- a/internal/tui/app_test.go +++ b/internal/tui/app_test.go @@ -288,14 +288,30 @@ func TestAppSessionRefreshesOnMessage(t *testing.T) { } } -func TestAppNewThread(t *testing.T) { +func TestAppNewThreadBlockedWhenDisconnected(t *testing.T) { store := state.NewThreadStore() app := NewAppModel(store, nil) nKey := tea.KeyMsg{Type: tea.KeyRunes, Runes: []rune{'n'}} _, cmd := app.Update(nKey) + if cmd != nil { + t.Error("expected no command when disconnected") + } +} + +func TestAppNewThreadAllowedWhenConnected(t *testing.T) { + store := state.NewThreadStore() + app := NewAppModel(store, nil) + + connectedMsg := AppServerConnectedMsg{ServerName: "test", ServerVersion: "1.0"} + updated, _ := app.Update(connectedMsg) + app = updated.(AppModel) + + nKey := tea.KeyMsg{Type: tea.KeyRunes, Runes: []rune{'n'}} + _, cmd := app.Update(nKey) + if cmd == nil { - t.Error("expected command for thread creation") + t.Error("expected command for thread creation when connected") } } From 0f7fb80369591ee0446569e1358ee35bd8a9aa36 Mon Sep 17 00:00:00 2001 From: Robin White Date: Tue, 17 Mar 2026 14:41:40 -0400 Subject: [PATCH 08/10] feat(tui): connection-aware messaging with codex CLI hint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Status bar now shows three states: - "Connecting to app-server..." (amber) while Init handshake runs - "Connected" (green) after successful handshake - "Disconnected — requires codex CLI (codex app-server)" (red) on failure Canvas shows "Waiting for app-server connection..." instead of "Press 'n' to create one" when not connected. Pressing 'n' while disconnected shows "waiting for app-server — is codex CLI installed?" --- internal/tui/app.go | 22 +++++++++++++++++++--- internal/tui/statusbar.go | 27 +++++++++++++++++++++------ internal/tui/statusbar_test.go | 29 +++++++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 9 deletions(-) diff --git a/internal/tui/app.go b/internal/tui/app.go index aae4020..322374c 100644 --- a/internal/tui/app.go +++ b/internal/tui/app.go @@ -43,6 +43,11 @@ type AppModel struct { } func NewAppModel(store *state.ThreadStore, client *appserver.Client) AppModel { + bar := NewStatusBar() + if client != nil { + bar.SetConnecting() + } + return AppModel{ store: store, client: client, @@ -50,7 +55,7 @@ func NewAppModel(store *state.ThreadStore, client *appserver.Client) AppModel { tree: NewTreeModel(store), prefix: NewPrefixHandler(), help: NewHelpModel(), - statusBar: NewStatusBar(), + statusBar: bar, } } @@ -168,7 +173,7 @@ func (app AppModel) handleRune(msg tea.KeyMsg) (tea.Model, tea.Cmd) { app.helpVisible = !app.helpVisible case "n": if !app.connected { - app.statusBar.SetError("not connected to app-server") + app.statusBar.SetError("waiting for app-server — is codex CLI installed?") return app, nil } return app, app.createThreadCmd() @@ -238,6 +243,17 @@ func (app *AppModel) handleCanvasArrow(msg tea.KeyMsg) { } } +func (app AppModel) canvasView() string { + threads := app.store.All() + if len(threads) > 0 { + return app.canvas.View() + } + if !app.connected { + return "Waiting for app-server connection..." + } + return "No active threads. Press 'n' to create one." +} + func (app AppModel) View() string { title := titleStyle.Render("DJ — Codex TUI Visualizer") status := app.statusBar.View() @@ -254,7 +270,7 @@ func (app AppModel) View() string { return title + "\n" + app.session.View() + "\n" + status } - canvas := app.canvas.View() + canvas := app.canvasView() if app.focus == FocusTree { treeView := app.tree.View() diff --git a/internal/tui/statusbar.go b/internal/tui/statusbar.go index 10ee269..ebaebae 100644 --- a/internal/tui/statusbar.go +++ b/internal/tui/statusbar.go @@ -13,6 +13,8 @@ var ( Padding(0, 1) statusConnectedStyle = lipgloss.NewStyle(). Foreground(lipgloss.Color("42")) + statusConnectingStyle = lipgloss.NewStyle(). + Foreground(lipgloss.Color("214")) statusDisconnectedStyle = lipgloss.NewStyle(). Foreground(lipgloss.Color("196")) statusErrorStyle = lipgloss.NewStyle(). @@ -22,6 +24,7 @@ var ( type StatusBar struct { connected bool + connecting bool threadCount int selectedThread string errorMessage string @@ -32,7 +35,14 @@ func NewStatusBar() *StatusBar { return &StatusBar{} } +func (statusBar *StatusBar) SetConnecting() { + statusBar.connecting = true + statusBar.connected = false + statusBar.errorMessage = "" +} + func (statusBar *StatusBar) SetConnected(connected bool) { + statusBar.connecting = false statusBar.connected = connected if connected { statusBar.errorMessage = "" @@ -47,6 +57,16 @@ func (statusBar *StatusBar) SetSelectedThread(name string) { statusBar.selectedThread = name } +func (statusBar StatusBar) renderConnectionState() string { + if statusBar.connected { + return statusConnectedStyle.Render("● Connected") + } + if statusBar.connecting { + return statusConnectingStyle.Render("◌ Connecting to app-server...") + } + return statusDisconnectedStyle.Render("○ Disconnected — requires codex CLI (codex app-server)") +} + func (statusBar *StatusBar) SetError(message string) { statusBar.errorMessage = message } @@ -56,12 +76,7 @@ func (statusBar *StatusBar) SetWidth(width int) { } func (statusBar StatusBar) View() string { - var left string - if statusBar.connected { - left = statusConnectedStyle.Render("● Connected") - } else { - left = statusDisconnectedStyle.Render("○ Disconnected") - } + left := statusBar.renderConnectionState() if statusBar.errorMessage != "" { left += " " + statusErrorStyle.Render(statusBar.errorMessage) diff --git a/internal/tui/statusbar_test.go b/internal/tui/statusbar_test.go index 3016d51..988ca31 100644 --- a/internal/tui/statusbar_test.go +++ b/internal/tui/statusbar_test.go @@ -33,6 +33,35 @@ func TestStatusBarDisconnected(t *testing.T) { if !strings.Contains(output, "Disconnected") { t.Errorf("expected Disconnected in output:\n%s", output) } + if !strings.Contains(output, "codex") { + t.Errorf("expected codex hint in disconnected output:\n%s", output) + } +} + +func TestStatusBarConnecting(t *testing.T) { + bar := NewStatusBar() + bar.SetConnecting() + + output := bar.View() + + if !strings.Contains(output, "Connecting") { + t.Errorf("expected Connecting in output:\n%s", output) + } +} + +func TestStatusBarConnectingClearedOnConnect(t *testing.T) { + bar := NewStatusBar() + bar.SetConnecting() + bar.SetConnected(true) + + output := bar.View() + + if strings.Contains(output, "Connecting") { + t.Errorf("expected Connecting cleared after connect:\n%s", output) + } + if !strings.Contains(output, "Connected") { + t.Errorf("expected Connected in output:\n%s", output) + } } func TestStatusBarError(t *testing.T) { From cdacbcf4736f2f4a00bf5d360070593e3d90077f Mon Sep 17 00:00:00 2001 From: Robin White Date: Tue, 17 Mar 2026 14:55:34 -0400 Subject: [PATCH 09/10] fix(protocol): align wire format with real Codex App Server API The Codex App Server wire format omits "jsonrpc":"2.0" from all messages, uses different method names (thread/start, thread/archive, turn/start), different response shapes (nested thread object), and different notification names (item/started, item/agentMessage/delta, turn/started, turn/completed). The initialized notification also requires params: {}. Fixed default args to remove unnecessary --listen stdio:// flag. --- internal/appserver/client.go | 14 +++---- internal/appserver/client_test.go | 33 ++++----------- internal/appserver/client_thread.go | 26 ++++++------ internal/appserver/client_thread_test.go | 24 +++++------ internal/appserver/dispatch_test.go | 10 ++--- internal/appserver/methods.go | 23 ++++++----- internal/appserver/methods_test.go | 15 ++++--- internal/appserver/protocol.go | 30 +++++++------- internal/appserver/protocol_test.go | 18 ++++---- internal/appserver/router.go | 42 ++++++++++++++++--- internal/appserver/router_test.go | 40 ++++++++++++++++-- internal/appserver/types_notify.go | 34 ++++++++++++---- internal/appserver/types_notify_test.go | 52 ++++++++++++++++++++---- internal/appserver/types_thread.go | 20 +++++---- internal/appserver/types_thread_test.go | 33 ++++++++++----- internal/config/config.go | 2 +- internal/tui/app.go | 4 +- internal/tui/bridge.go | 10 ++--- internal/tui/bridge_test.go | 43 ++++++++++++++++++++ 19 files changed, 314 insertions(+), 159 deletions(-) diff --git a/internal/appserver/client.go b/internal/appserver/client.go index 9357eb2..0ada51b 100644 --- a/internal/appserver/client.go +++ b/internal/appserver/client.go @@ -140,10 +140,9 @@ func (c *Client) Call(ctx context.Context, method string, params json.RawMessage defer c.pending.Delete(id) req := &Request{ - JSONRPC: "2.0", - ID: &id, - Method: method, - Params: params, + ID: &id, + Method: method, + Params: params, } if err := c.Send(req); err != nil { @@ -237,10 +236,11 @@ func (c *Client) Initialize(ctx context.Context) (*ServerCapabilities, error) { } } - // Send the initialized notification (no id, no response expected) + // Send the initialized notification (no id, no response expected). + // Codex requires params: {} even for empty notifications. notif := &Request{ - JSONRPC: "2.0", - Method: "initialized", + Method: "initialized", + Params: json.RawMessage(`{}`), } if err := c.Send(notif); err != nil { return nil, fmt.Errorf("send initialized: %w", err) diff --git a/internal/appserver/client_test.go b/internal/appserver/client_test.go index 404f63a..c13142e 100644 --- a/internal/appserver/client_test.go +++ b/internal/appserver/client_test.go @@ -10,8 +10,6 @@ import ( ) func TestClientStartStop(t *testing.T) { - // Use 'cat' as a mock app-server: it reads stdin and echoes to stdout. - // This verifies process lifecycle without a real codex binary. client := NewClient("cat") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -35,7 +33,6 @@ func TestClientStartStop(t *testing.T) { } func TestClientSendAndRead(t *testing.T) { - // 'cat' echoes back what we write — simulates a response client := NewClient("cat") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -46,18 +43,15 @@ func TestClientSendAndRead(t *testing.T) { } defer client.Stop() - // Start the read loop msgs := make(chan Message, 10) go client.ReadLoop(func(msg Message) { msgs <- msg }) - // Send a JSON-RPC request — cat will echo it back req := &Request{ - JSONRPC: "2.0", - ID: intPtr(1), - Method: "test/echo", - Params: json.RawMessage(`{"hello":"world"}`), + ID: intPtr(1), + Method: "test/echo", + Params: json.RawMessage(`{"hello":"world"}`), } if err := client.Send(req); err != nil { t.Fatalf("Send failed: %v", err) @@ -74,8 +68,6 @@ func TestClientSendAndRead(t *testing.T) { } func TestClientCall(t *testing.T) { - // Use 'cat' — it echoes the request back as-is. - // The Call method will see the echoed message has a matching ID and treat it as a response. client := NewClient("cat") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -86,34 +78,26 @@ func TestClientCall(t *testing.T) { } defer client.Stop() - // Start dispatching go client.ReadLoop(client.Dispatch) - // Call — cat echoes back the request, which has an id, so it resolves as a response resp, err := client.Call(ctx, "test/method", json.RawMessage(`{"key":"val"}`)) if err != nil { t.Fatalf("Call failed: %v", err) } - // The echo will have params (not result), but the call resolved because ID matched if resp == nil { t.Fatal("expected non-nil response") } } func TestInitializeHandshake(t *testing.T) { - // Set up bidirectional pipes to simulate app-server - // client writes -> serverRead, serverWrite -> clientRead clientRead, serverWrite := io.Pipe() serverRead, clientWrite := io.Pipe() - // Mock server: reads initialize request, writes back capabilities response, - // then reads the initialized notification go func() { scanner := bufio.NewScanner(serverRead) scanner.Buffer(make([]byte, 1024*1024), 1024*1024) - // Read initialize request if !scanner.Scan() { t.Error("mock server: failed to read initialize request") return @@ -128,17 +112,14 @@ func TestInitializeHandshake(t *testing.T) { return } - // Write capabilities response resp := Message{ - JSONRPC: "2.0", - ID: req.ID, - Result: json.RawMessage(`{"serverInfo":{"name":"codex-app-server","version":"0.1.0"}}`), + ID: req.ID, + Result: json.RawMessage(`{"serverInfo":{"name":"codex-app-server","version":"0.1.0"}}`), } data, _ := json.Marshal(resp) data = append(data, '\n') serverWrite.Write(data) - // Read initialized notification if !scanner.Scan() { t.Error("mock server: failed to read initialized notification") return @@ -151,9 +132,11 @@ func TestInitializeHandshake(t *testing.T) { if notif.Method != "initialized" { t.Errorf("mock server: expected method initialized, got %s", notif.Method) } + if notif.Params == nil { + t.Error("mock server: initialized notification must include params") + } }() - // Set up client with our pipes instead of a real process client := &Client{} client.stdin = clientWrite client.stdout = clientRead diff --git a/internal/appserver/client_thread.go b/internal/appserver/client_thread.go index f4a6111..1c813ea 100644 --- a/internal/appserver/client_thread.go +++ b/internal/appserver/client_thread.go @@ -6,22 +6,22 @@ import ( "fmt" ) -func (c *Client) CreateThread(ctx context.Context, instructions string) (*ThreadCreateResult, error) { - params, _ := json.Marshal(ThreadCreateParams{ - Instructions: instructions, +func (c *Client) StartThread(ctx context.Context, model string) (*ThreadStartResult, error) { + params, _ := json.Marshal(ThreadStartParams{ + Model: model, }) - resp, err := c.Call(ctx, MethodThreadCreate, params) + resp, err := c.Call(ctx, MethodThreadStart, params) if err != nil { - return nil, fmt.Errorf("thread/create: %w", err) + return nil, fmt.Errorf("thread/start: %w", err) } if resp.Error != nil { - return nil, fmt.Errorf("thread/create: %w", resp.Error) + return nil, fmt.Errorf("thread/start: %w", resp.Error) } - var result ThreadCreateResult + var result ThreadStartResult if err := json.Unmarshal(resp.Result, &result); err != nil { - return nil, fmt.Errorf("unmarshal thread/create result: %w", err) + return nil, fmt.Errorf("unmarshal thread/start result: %w", err) } return &result, nil } @@ -42,17 +42,17 @@ func (c *Client) ListThreads(ctx context.Context) (*ThreadListResult, error) { return &result, nil } -func (c *Client) DeleteThread(ctx context.Context, threadID string) error { - params, _ := json.Marshal(ThreadDeleteParams{ +func (c *Client) ArchiveThread(ctx context.Context, threadID string) error { + params, _ := json.Marshal(ThreadArchiveParams{ ThreadID: threadID, }) - resp, err := c.Call(ctx, MethodThreadDelete, params) + resp, err := c.Call(ctx, MethodThreadArchive, params) if err != nil { - return fmt.Errorf("thread/delete: %w", err) + return fmt.Errorf("thread/archive: %w", err) } if resp.Error != nil { - return fmt.Errorf("thread/delete: %w", resp.Error) + return fmt.Errorf("thread/archive: %w", resp.Error) } return nil } diff --git a/internal/appserver/client_thread_test.go b/internal/appserver/client_thread_test.go index 8ac0dd2..c9e2941 100644 --- a/internal/appserver/client_thread_test.go +++ b/internal/appserver/client_thread_test.go @@ -9,11 +9,11 @@ import ( "time" ) -func TestClientCreateThread(t *testing.T) { +func TestClientStartThread(t *testing.T) { clientRead, serverWrite := io.Pipe() serverRead, clientWrite := io.Pipe() - go mockThreadCreateServer(t, serverRead, serverWrite) + go mockThreadStartServer(t, serverRead, serverWrite) client := &Client{} client.stdin = clientWrite @@ -27,16 +27,16 @@ func TestClientCreateThread(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - result, err := client.CreateThread(ctx, "Build a web app") + result, err := client.StartThread(ctx, "") if err != nil { - t.Fatalf("CreateThread failed: %v", err) + t.Fatalf("StartThread failed: %v", err) } - if result.ThreadID != "t-new-123" { - t.Errorf("expected t-new-123, got %s", result.ThreadID) + if result.Thread.ID != "thr_new_123" { + t.Errorf("expected thr_new_123, got %s", result.Thread.ID) } } -func mockThreadCreateServer(t *testing.T, reader *io.PipeReader, writer *io.PipeWriter) { +func mockThreadStartServer(t *testing.T, reader *io.PipeReader, writer *io.PipeWriter) { t.Helper() scanner := bufio.NewScanner(reader) scanner.Buffer(make([]byte, 1024*1024), 1024*1024) @@ -52,9 +52,8 @@ func mockThreadCreateServer(t *testing.T, reader *io.PipeReader, writer *io.Pipe } resp := Message{ - JSONRPC: "2.0", - ID: req.ID, - Result: json.RawMessage(`{"threadId":"t-new-123"}`), + ID: req.ID, + Result: json.RawMessage(`{"thread":{"id":"thr_new_123"}}`), } data, _ := json.Marshal(resp) data = append(data, '\n') @@ -105,9 +104,8 @@ func mockThreadListServer(t *testing.T, reader *io.PipeReader, writer *io.PipeWr threadList := `{"threads":[{"id":"t-1","status":"active","title":"A"},{"id":"t-2","status":"idle","title":"B"}]}` resp := Message{ - JSONRPC: "2.0", - ID: req.ID, - Result: json.RawMessage(threadList), + ID: req.ID, + Result: json.RawMessage(threadList), } data, _ := json.Marshal(resp) data = append(data, '\n') diff --git a/internal/appserver/dispatch_test.go b/internal/appserver/dispatch_test.go index 2490b64..e390e5e 100644 --- a/internal/appserver/dispatch_test.go +++ b/internal/appserver/dispatch_test.go @@ -21,9 +21,8 @@ func TestDispatchRoutesNotificationToRouter(t *testing.T) { client.Router = router msg := Message{ - JSONRPC: "2.0", - Method: NotifyThreadStatusChanged, - Params: json.RawMessage(`{"threadId":"t-1","status":"active","title":"Test"}`), + Method: NotifyThreadStatusChanged, + Params: json.RawMessage(`{"threadId":"t-1","status":"active","title":"Test"}`), } client.Dispatch(msg) @@ -41,9 +40,8 @@ func TestDispatchFallsBackToOnNotification(t *testing.T) { } msg := Message{ - JSONRPC: "2.0", - Method: "custom/notification", - Params: json.RawMessage(`{}`), + Method: "custom/notification", + Params: json.RawMessage(`{}`), } client.Dispatch(msg) diff --git a/internal/appserver/methods.go b/internal/appserver/methods.go index ff6dafb..740e019 100644 --- a/internal/appserver/methods.go +++ b/internal/appserver/methods.go @@ -1,17 +1,20 @@ package appserver const ( - MethodThreadCreate = "thread/create" - MethodThreadList = "thread/list" - MethodThreadDelete = "thread/delete" - MethodThreadSendMessage = "thread/sendMessage" - MethodCommandExec = "command/exec" + MethodThreadStart = "thread/start" + MethodThreadList = "thread/list" + MethodThreadArchive = "thread/archive" + MethodTurnStart = "turn/start" + MethodTurnInterrupt = "turn/interrupt" ) const ( - NotifyThreadStatusChanged = "thread/status/changed" - NotifyThreadMessageCreated = "thread/message/created" - NotifyThreadMessageDelta = "thread/message/delta" - NotifyCommandOutput = "command/output" - NotifyCommandFinished = "command/finished" + NotifyThreadStatusChanged = "thread/status/changed" + NotifyItemStarted = "item/started" + NotifyItemCompleted = "item/completed" + NotifyItemMessageDelta = "item/agentMessage/delta" + NotifyTurnStarted = "turn/started" + NotifyTurnCompleted = "turn/completed" + NotifyCommandOutput = "command/output" + NotifyCommandFinished = "command/finished" ) diff --git a/internal/appserver/methods_test.go b/internal/appserver/methods_test.go index db05e13..7a3befc 100644 --- a/internal/appserver/methods_test.go +++ b/internal/appserver/methods_test.go @@ -8,14 +8,17 @@ func TestMethodConstants(t *testing.T) { constant string expected string }{ - {"ThreadCreate", MethodThreadCreate, "thread/create"}, + {"ThreadStart", MethodThreadStart, "thread/start"}, {"ThreadList", MethodThreadList, "thread/list"}, - {"ThreadDelete", MethodThreadDelete, "thread/delete"}, - {"ThreadSendMessage", MethodThreadSendMessage, "thread/sendMessage"}, - {"CommandExec", MethodCommandExec, "command/exec"}, + {"ThreadArchive", MethodThreadArchive, "thread/archive"}, + {"TurnStart", MethodTurnStart, "turn/start"}, + {"TurnInterrupt", MethodTurnInterrupt, "turn/interrupt"}, {"NotifyThreadStatus", NotifyThreadStatusChanged, "thread/status/changed"}, - {"NotifyThreadMessage", NotifyThreadMessageCreated, "thread/message/created"}, - {"NotifyMessageDelta", NotifyThreadMessageDelta, "thread/message/delta"}, + {"NotifyItemStarted", NotifyItemStarted, "item/started"}, + {"NotifyItemCompleted", NotifyItemCompleted, "item/completed"}, + {"NotifyItemMessageDelta", NotifyItemMessageDelta, "item/agentMessage/delta"}, + {"NotifyTurnStarted", NotifyTurnStarted, "turn/started"}, + {"NotifyTurnCompleted", NotifyTurnCompleted, "turn/completed"}, {"NotifyCommandOutput", NotifyCommandOutput, "command/output"}, {"NotifyCommandFinished", NotifyCommandFinished, "command/finished"}, } diff --git a/internal/appserver/protocol.go b/internal/appserver/protocol.go index 6c697be..fd26848 100644 --- a/internal/appserver/protocol.go +++ b/internal/appserver/protocol.go @@ -2,32 +2,30 @@ package appserver import "encoding/json" -// Message is the generic JSON-RPC 2.0 envelope. -// It covers requests (id + method), responses (id + result/error), +// Message is the generic JSON-RPC envelope used by the Codex App Server. +// The Codex wire format omits the "jsonrpc" field. +// Covers requests (id + method), responses (id + result/error), // and notifications (method, no id). type Message struct { - JSONRPC string `json:"jsonrpc"` - ID *int `json:"id,omitempty"` - Method string `json:"method,omitempty"` - Params json.RawMessage `json:"params,omitempty"` - Result json.RawMessage `json:"result,omitempty"` - Error *RPCError `json:"error,omitempty"` + ID *int `json:"id,omitempty"` + Method string `json:"method,omitempty"` + Params json.RawMessage `json:"params,omitempty"` + Result json.RawMessage `json:"result,omitempty"` + Error *RPCError `json:"error,omitempty"` } // Request is an outbound JSON-RPC request. type Request struct { - JSONRPC string `json:"jsonrpc"` - ID *int `json:"id,omitempty"` - Method string `json:"method"` - Params json.RawMessage `json:"params,omitempty"` + ID *int `json:"id,omitempty"` + Method string `json:"method"` + Params json.RawMessage `json:"params,omitempty"` } // Response is an inbound JSON-RPC response. type Response struct { - JSONRPC string `json:"jsonrpc"` - ID *int `json:"id,omitempty"` - Result json.RawMessage `json:"result,omitempty"` - Error *RPCError `json:"error,omitempty"` + ID *int `json:"id,omitempty"` + Result json.RawMessage `json:"result,omitempty"` + Error *RPCError `json:"error,omitempty"` } // RPCError is the JSON-RPC error object. diff --git a/internal/appserver/protocol_test.go b/internal/appserver/protocol_test.go index d288013..7bceec3 100644 --- a/internal/appserver/protocol_test.go +++ b/internal/appserver/protocol_test.go @@ -7,10 +7,9 @@ import ( func TestRequestMarshal(t *testing.T) { req := &Request{ - JSONRPC: "2.0", - ID: intPtr(1), - Method: "thread/list", - Params: json.RawMessage(`{}`), + ID: intPtr(1), + Method: "thread/list", + Params: json.RawMessage(`{}`), } data, err := json.Marshal(req) if err != nil { @@ -18,8 +17,8 @@ func TestRequestMarshal(t *testing.T) { } var parsed map[string]any json.Unmarshal(data, &parsed) - if parsed["jsonrpc"] != "2.0" { - t.Errorf("expected jsonrpc 2.0, got %v", parsed["jsonrpc"]) + if _, hasJSONRPC := parsed["jsonrpc"]; hasJSONRPC { + t.Error("codex wire format must not include jsonrpc field") } if parsed["method"] != "thread/list" { t.Errorf("expected method thread/list, got %v", parsed["method"]) @@ -27,7 +26,7 @@ func TestRequestMarshal(t *testing.T) { } func TestResponseUnmarshal(t *testing.T) { - raw := `{"jsonrpc":"2.0","id":1,"result":{"threads":[]}}` + raw := `{"id":1,"result":{"threads":[]}}` var resp Response if err := json.Unmarshal([]byte(raw), &resp); err != nil { t.Fatal(err) @@ -41,7 +40,7 @@ func TestResponseUnmarshal(t *testing.T) { } func TestNotificationUnmarshal(t *testing.T) { - raw := `{"jsonrpc":"2.0","method":"thread/status/changed","params":{"threadId":"t1","status":"active"}}` + raw := `{"method":"thread/status/changed","params":{"threadId":"t1","status":"active"}}` var msg Message if err := json.Unmarshal([]byte(raw), &msg); err != nil { t.Fatal(err) @@ -49,14 +48,13 @@ func TestNotificationUnmarshal(t *testing.T) { if msg.Method != "thread/status/changed" { t.Errorf("expected thread/status/changed, got %s", msg.Method) } - // Notification: no ID if msg.ID != nil { t.Error("notification should have no id") } } func TestErrorResponseUnmarshal(t *testing.T) { - raw := `{"jsonrpc":"2.0","id":2,"error":{"code":-32600,"message":"Invalid Request"}}` + raw := `{"id":2,"error":{"code":-32600,"message":"Invalid Request"}}` var resp Response if err := json.Unmarshal([]byte(raw), &resp); err != nil { t.Fatal(err) diff --git a/internal/appserver/router.go b/internal/appserver/router.go index f9398fa..c2ae99b 100644 --- a/internal/appserver/router.go +++ b/internal/appserver/router.go @@ -30,9 +30,9 @@ func (router *NotificationRouter) OnThreadStatusChanged(fn func(ThreadStatusChan } } -func (router *NotificationRouter) OnThreadMessageCreated(fn func(ThreadMessageCreated)) { - router.handlers[NotifyThreadMessageCreated] = func(raw json.RawMessage) { - var params ThreadMessageCreated +func (router *NotificationRouter) OnItemStarted(fn func(ItemStarted)) { + router.handlers[NotifyItemStarted] = func(raw json.RawMessage) { + var params ItemStarted if err := json.Unmarshal(raw, ¶ms); err != nil { return } @@ -40,9 +40,39 @@ func (router *NotificationRouter) OnThreadMessageCreated(fn func(ThreadMessageCr } } -func (router *NotificationRouter) OnThreadMessageDelta(fn func(ThreadMessageDelta)) { - router.handlers[NotifyThreadMessageDelta] = func(raw json.RawMessage) { - var params ThreadMessageDelta +func (router *NotificationRouter) OnItemCompleted(fn func(ItemCompleted)) { + router.handlers[NotifyItemCompleted] = func(raw json.RawMessage) { + var params ItemCompleted + if err := json.Unmarshal(raw, ¶ms); err != nil { + return + } + fn(params) + } +} + +func (router *NotificationRouter) OnItemMessageDelta(fn func(ItemMessageDelta)) { + router.handlers[NotifyItemMessageDelta] = func(raw json.RawMessage) { + var params ItemMessageDelta + if err := json.Unmarshal(raw, ¶ms); err != nil { + return + } + fn(params) + } +} + +func (router *NotificationRouter) OnTurnStarted(fn func(TurnStarted)) { + router.handlers[NotifyTurnStarted] = func(raw json.RawMessage) { + var params TurnStarted + if err := json.Unmarshal(raw, ¶ms); err != nil { + return + } + fn(params) + } +} + +func (router *NotificationRouter) OnTurnCompleted(fn func(TurnCompleted)) { + router.handlers[NotifyTurnCompleted] = func(raw json.RawMessage) { + var params TurnCompleted if err := json.Unmarshal(raw, ¶ms); err != nil { return } diff --git a/internal/appserver/router_test.go b/internal/appserver/router_test.go index d719547..cd23232 100644 --- a/internal/appserver/router_test.go +++ b/internal/appserver/router_test.go @@ -30,16 +30,16 @@ func TestRouterIgnoresUnregisteredMethod(t *testing.T) { router.Handle("unknown/method", json.RawMessage(`{}`)) } -func TestRouterDispatchesMessageDelta(t *testing.T) { +func TestRouterDispatchesItemMessageDelta(t *testing.T) { router := NewNotificationRouter() var receivedDelta string - router.OnThreadMessageDelta(func(params ThreadMessageDelta) { + router.OnItemMessageDelta(func(params ItemMessageDelta) { receivedDelta = params.Delta }) - raw := json.RawMessage(`{"threadId":"t-1","messageId":"m-1","delta":"hello"}`) - router.Handle(NotifyThreadMessageDelta, raw) + raw := json.RawMessage(`{"threadId":"t-1","itemId":"item-1","delta":"hello"}`) + router.Handle(NotifyItemMessageDelta, raw) if receivedDelta != "hello" { t.Errorf("expected hello, got %s", receivedDelta) @@ -61,3 +61,35 @@ func TestRouterDispatchesCommandOutput(t *testing.T) { t.Errorf("expected output, got %s", receivedData) } } + +func TestRouterDispatchesItemStarted(t *testing.T) { + router := NewNotificationRouter() + + var receivedRole string + router.OnItemStarted(func(params ItemStarted) { + receivedRole = params.Role + }) + + raw := json.RawMessage(`{"threadId":"t-1","itemId":"item-1","role":"assistant","type":"message"}`) + router.Handle(NotifyItemStarted, raw) + + if receivedRole != "assistant" { + t.Errorf("expected assistant, got %s", receivedRole) + } +} + +func TestRouterDispatchesTurnCompleted(t *testing.T) { + router := NewNotificationRouter() + + var receivedTurnID string + router.OnTurnCompleted(func(params TurnCompleted) { + receivedTurnID = params.TurnID + }) + + raw := json.RawMessage(`{"threadId":"t-1","turnId":"turn-1"}`) + router.Handle(NotifyTurnCompleted, raw) + + if receivedTurnID != "turn-1" { + t.Errorf("expected turn-1, got %s", receivedTurnID) + } +} diff --git a/internal/appserver/types_notify.go b/internal/appserver/types_notify.go index 3c103e1..10e82ad 100644 --- a/internal/appserver/types_notify.go +++ b/internal/appserver/types_notify.go @@ -6,17 +6,33 @@ type ThreadStatusChanged struct { Title string `json:"title"` } -type ThreadMessageCreated struct { - ThreadID string `json:"threadId"` - MessageID string `json:"messageId"` - Role string `json:"role"` - Content string `json:"content"` +type ItemStarted struct { + ThreadID string `json:"threadId"` + ItemID string `json:"itemId"` + Role string `json:"role"` + Type string `json:"type"` +} + +type ItemCompleted struct { + ThreadID string `json:"threadId"` + ItemID string `json:"itemId"` + Content string `json:"content"` } -type ThreadMessageDelta struct { - ThreadID string `json:"threadId"` - MessageID string `json:"messageId"` - Delta string `json:"delta"` +type ItemMessageDelta struct { + ThreadID string `json:"threadId"` + ItemID string `json:"itemId"` + Delta string `json:"delta"` +} + +type TurnStarted struct { + ThreadID string `json:"threadId"` + TurnID string `json:"turnId"` +} + +type TurnCompleted struct { + ThreadID string `json:"threadId"` + TurnID string `json:"turnId"` } type CommandOutput struct { diff --git a/internal/appserver/types_notify_test.go b/internal/appserver/types_notify_test.go index 7afce24..6d8e91b 100644 --- a/internal/appserver/types_notify_test.go +++ b/internal/appserver/types_notify_test.go @@ -19,23 +19,37 @@ func TestThreadStatusChangedUnmarshal(t *testing.T) { } } -func TestThreadMessageCreatedUnmarshal(t *testing.T) { - raw := `{"threadId":"t-1","messageId":"m-1","role":"assistant","content":"Hello"}` - var params ThreadMessageCreated +func TestItemStartedUnmarshal(t *testing.T) { + raw := `{"threadId":"t-1","itemId":"item-1","role":"assistant","type":"message"}` + var params ItemStarted if err := json.Unmarshal([]byte(raw), ¶ms); err != nil { t.Fatal(err) } if params.Role != "assistant" { t.Errorf("expected assistant, got %s", params.Role) } - if params.Content != "Hello" { - t.Errorf("expected Hello, got %s", params.Content) + if params.Type != "message" { + t.Errorf("expected message, got %s", params.Type) + } + if params.ItemID != "item-1" { + t.Errorf("expected item-1, got %s", params.ItemID) + } +} + +func TestItemCompletedUnmarshal(t *testing.T) { + raw := `{"threadId":"t-1","itemId":"item-1","content":"Hello world"}` + var params ItemCompleted + if err := json.Unmarshal([]byte(raw), ¶ms); err != nil { + t.Fatal(err) + } + if params.Content != "Hello world" { + t.Errorf("expected Hello world, got %s", params.Content) } } -func TestThreadMessageDeltaUnmarshal(t *testing.T) { - raw := `{"threadId":"t-1","messageId":"m-1","delta":"more text"}` - var params ThreadMessageDelta +func TestItemMessageDeltaUnmarshal(t *testing.T) { + raw := `{"threadId":"t-1","itemId":"item-1","delta":"more text"}` + var params ItemMessageDelta if err := json.Unmarshal([]byte(raw), ¶ms); err != nil { t.Fatal(err) } @@ -44,6 +58,28 @@ func TestThreadMessageDeltaUnmarshal(t *testing.T) { } } +func TestTurnStartedUnmarshal(t *testing.T) { + raw := `{"threadId":"t-1","turnId":"turn-1"}` + var params TurnStarted + if err := json.Unmarshal([]byte(raw), ¶ms); err != nil { + t.Fatal(err) + } + if params.TurnID != "turn-1" { + t.Errorf("expected turn-1, got %s", params.TurnID) + } +} + +func TestTurnCompletedUnmarshal(t *testing.T) { + raw := `{"threadId":"t-1","turnId":"turn-1"}` + var params TurnCompleted + if err := json.Unmarshal([]byte(raw), ¶ms); err != nil { + t.Fatal(err) + } + if params.TurnID != "turn-1" { + t.Errorf("expected turn-1, got %s", params.TurnID) + } +} + func TestCommandOutputUnmarshal(t *testing.T) { raw := `{"threadId":"t-1","execId":"e-1","data":"line of output\n"}` var params CommandOutput diff --git a/internal/appserver/types_thread.go b/internal/appserver/types_thread.go index 1c5ef3c..2c616e6 100644 --- a/internal/appserver/types_thread.go +++ b/internal/appserver/types_thread.go @@ -7,15 +7,19 @@ const ( ThreadStatusError = "error" ) -type ThreadCreateParams struct { - Instructions string `json:"instructions"` +type ThreadStartParams struct { + Model string `json:"model,omitempty"` } -type ThreadCreateResult struct { - ThreadID string `json:"threadId"` +type ThreadStartResult struct { + Thread ThreadInfo `json:"thread"` +} + +type ThreadInfo struct { + ID string `json:"id"` } -type ThreadDeleteParams struct { +type ThreadArchiveParams struct { ThreadID string `json:"threadId"` } @@ -29,11 +33,11 @@ type ThreadSummary struct { Title string `json:"title"` } -type ThreadSendMessageParams struct { +type TurnStartParams struct { ThreadID string `json:"threadId"` Content string `json:"content"` } -type ThreadSendMessageResult struct { - MessageID string `json:"messageId"` +type TurnStartResult struct { + TurnID string `json:"turnId"` } diff --git a/internal/appserver/types_thread_test.go b/internal/appserver/types_thread_test.go index e6c298c..a5290ff 100644 --- a/internal/appserver/types_thread_test.go +++ b/internal/appserver/types_thread_test.go @@ -5,9 +5,9 @@ import ( "testing" ) -func TestThreadCreateParamsMarshal(t *testing.T) { - params := ThreadCreateParams{ - Instructions: "Build a web server", +func TestThreadStartParamsMarshal(t *testing.T) { + params := ThreadStartParams{ + Model: "gpt-4o", } data, err := json.Marshal(params) if err != nil { @@ -15,19 +15,32 @@ func TestThreadCreateParamsMarshal(t *testing.T) { } var parsed map[string]any json.Unmarshal(data, &parsed) - if parsed["instructions"] != "Build a web server" { - t.Errorf("expected instructions, got %v", parsed["instructions"]) + if parsed["model"] != "gpt-4o" { + t.Errorf("expected model gpt-4o, got %v", parsed["model"]) } } -func TestThreadCreateResultUnmarshal(t *testing.T) { - raw := `{"threadId":"t-abc123"}` - var result ThreadCreateResult +func TestThreadStartParamsOmitsEmptyModel(t *testing.T) { + params := ThreadStartParams{} + data, err := json.Marshal(params) + if err != nil { + t.Fatal(err) + } + var parsed map[string]any + json.Unmarshal(data, &parsed) + if _, hasModel := parsed["model"]; hasModel { + t.Error("expected model to be omitted when empty") + } +} + +func TestThreadStartResultUnmarshal(t *testing.T) { + raw := `{"thread":{"id":"thr_abc123"}}` + var result ThreadStartResult if err := json.Unmarshal([]byte(raw), &result); err != nil { t.Fatal(err) } - if result.ThreadID != "t-abc123" { - t.Errorf("expected t-abc123, got %s", result.ThreadID) + if result.Thread.ID != "thr_abc123" { + t.Errorf("expected thr_abc123, got %s", result.Thread.ID) } } diff --git a/internal/config/config.go b/internal/config/config.go index 5b8d71d..5a9120d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -28,7 +28,7 @@ func Load(path string) (*Config, error) { viperInstance.SetConfigType("toml") viperInstance.SetDefault("appserver.command", DefaultAppServerCommand) - viperInstance.SetDefault("appserver.args", []string{"app-server", "--listen", "stdio://"}) + viperInstance.SetDefault("appserver.args", []string{"app-server"}) viperInstance.SetDefault("ui.theme", DefaultTheme) if path != "" { diff --git a/internal/tui/app.go b/internal/tui/app.go index 322374c..4afebea 100644 --- a/internal/tui/app.go +++ b/internal/tui/app.go @@ -183,13 +183,13 @@ func (app AppModel) handleRune(msg tea.KeyMsg) (tea.Model, tea.Cmd) { func (app AppModel) createThreadCmd() tea.Cmd { return func() tea.Msg { - result, err := app.client.CreateThread(context.Background(), "New thread") + result, err := app.client.StartThread(context.Background(), "") if err != nil { return AppServerErrorMsg{Err: err} } return ThreadCreatedMsg{ - ThreadID: result.ThreadID, + ThreadID: result.Thread.ID, Title: "New thread", } } diff --git a/internal/tui/bridge.go b/internal/tui/bridge.go index 78dfc17..3636167 100644 --- a/internal/tui/bridge.go +++ b/internal/tui/bridge.go @@ -18,19 +18,19 @@ func WireEventBridge(router *appserver.NotificationRouter, sender MessageSender) }) }) - router.OnThreadMessageCreated(func(params appserver.ThreadMessageCreated) { + router.OnItemStarted(func(params appserver.ItemStarted) { sender.Send(ThreadMessageMsg{ ThreadID: params.ThreadID, - MessageID: params.MessageID, + MessageID: params.ItemID, Role: params.Role, - Content: params.Content, + Content: "", }) }) - router.OnThreadMessageDelta(func(params appserver.ThreadMessageDelta) { + router.OnItemMessageDelta(func(params appserver.ItemMessageDelta) { sender.Send(ThreadDeltaMsg{ ThreadID: params.ThreadID, - MessageID: params.MessageID, + MessageID: params.ItemID, Delta: params.Delta, }) }) diff --git a/internal/tui/bridge_test.go b/internal/tui/bridge_test.go index aed4988..5047a35 100644 --- a/internal/tui/bridge_test.go +++ b/internal/tui/bridge_test.go @@ -35,6 +35,49 @@ func TestBridgeThreadStatusChanged(t *testing.T) { } } +func TestBridgeItemStarted(t *testing.T) { + sender := &mockSender{} + router := appserver.NewNotificationRouter() + WireEventBridge(router, sender) + + router.Handle(appserver.NotifyItemStarted, + []byte(`{"threadId":"t-1","itemId":"item-1","role":"assistant","type":"message"}`)) + + if len(sender.messages) != 1 { + t.Fatalf("expected 1 message, got %d", len(sender.messages)) + } + msg, ok := sender.messages[0].(ThreadMessageMsg) + if !ok { + t.Fatalf("expected ThreadMessageMsg, got %T", sender.messages[0]) + } + if msg.Role != "assistant" { + t.Errorf("expected assistant, got %s", msg.Role) + } + if msg.MessageID != "item-1" { + t.Errorf("expected item-1, got %s", msg.MessageID) + } +} + +func TestBridgeItemMessageDelta(t *testing.T) { + sender := &mockSender{} + router := appserver.NewNotificationRouter() + WireEventBridge(router, sender) + + router.Handle(appserver.NotifyItemMessageDelta, + []byte(`{"threadId":"t-1","itemId":"item-1","delta":"hello"}`)) + + if len(sender.messages) != 1 { + t.Fatalf("expected 1 message, got %d", len(sender.messages)) + } + msg, ok := sender.messages[0].(ThreadDeltaMsg) + if !ok { + t.Fatalf("expected ThreadDeltaMsg, got %T", sender.messages[0]) + } + if msg.Delta != "hello" { + t.Errorf("expected hello, got %s", msg.Delta) + } +} + func TestBridgeCommandOutput(t *testing.T) { sender := &mockSender{} router := appserver.NewNotificationRouter() From 44d3aad6386ecd1b501d21dfaf1cf5c613ad7a14 Mon Sep 17 00:00:00 2001 From: Robin White Date: Tue, 17 Mar 2026 15:43:07 -0400 Subject: [PATCH 10/10] feat(protocol): rewrite from JSON-RPC to Codex SQ/EQ protocol MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The real Codex CLI uses `codex proto` (not `codex app-server`) and speaks a Submission Queue / Event Queue protocol over JSONL, not JSON-RPC 2.0. - Submissions: {"id":"","op":{"type":"",...}} - Events: {"id":"","msg":{"type":"",...}} - No handshake — server sends session_configured on startup - Op types: user_turn, interrupt, exec_approval, shutdown - Event types: session_configured, task_started, task_complete, agent_message_delta, exec_command_begin/output_delta/end, error - Default args changed from ["app-server"] to ["proto"] - Simplified client (removed Call, pending, Dispatch, Initialize) - EventRouter routes by msg.type instead of JSON-RPC method - Bridge maps SQ/EQ events to Bubble Tea messages - App auto-creates thread on session_configured, no handshake needed --- cmd/dj/main.go | 2 +- internal/appserver/client.go | 173 ++++------------------- internal/appserver/client_test.go | 144 +++++++------------ internal/appserver/client_thread.go | 69 +++++---- internal/appserver/client_thread_test.go | 137 ++++++++---------- internal/appserver/dispatch_test.go | 56 ++++---- internal/appserver/integration_test.go | 24 ++-- internal/appserver/methods.go | 34 +++-- internal/appserver/methods_test.go | 45 ++++-- internal/appserver/protocol.go | 33 ++--- internal/appserver/protocol_test.go | 79 +++++------ internal/appserver/router.go | 124 +++++++++------- internal/appserver/router_test.go | 109 +++++++------- internal/appserver/types_command.go | 4 + internal/appserver/types_notify.go | 82 +++++++---- internal/appserver/types_notify_test.go | 106 ++++++-------- internal/appserver/types_thread.go | 68 +++++---- internal/appserver/types_thread_test.go | 88 ++++++------ internal/config/config.go | 2 +- internal/tui/app.go | 39 ++--- internal/tui/app_test.go | 25 +++- internal/tui/bridge.go | 49 ++++--- internal/tui/bridge_test.go | 83 ++++++----- internal/tui/integration_test.go | 36 ++--- internal/tui/msgs.go | 4 +- 25 files changed, 766 insertions(+), 849 deletions(-) diff --git a/cmd/dj/main.go b/cmd/dj/main.go index 25677a6..b95ce2a 100644 --- a/cmd/dj/main.go +++ b/cmd/dj/main.go @@ -41,7 +41,7 @@ func runApp(cmd *cobra.Command, args []string) error { client := appserver.NewClient(cfg.AppServer.Command, cfg.AppServer.Args...) defer client.Stop() - router := appserver.NewNotificationRouter() + router := appserver.NewEventRouter() client.Router = router store := state.NewThreadStore() diff --git a/internal/appserver/client.go b/internal/appserver/client.go index 0ada51b..770bd40 100644 --- a/internal/appserver/client.go +++ b/internal/appserver/client.go @@ -11,7 +11,7 @@ import ( "sync/atomic" ) -// Client manages a child app-server process and bidirectional JSON-RPC communication. +// Client manages a child codex proto process and bidirectional JSONL communication. type Client struct { command string args []string @@ -21,27 +21,17 @@ type Client struct { stdout io.ReadCloser scanner *bufio.Scanner - mu sync.Mutex // protects writes to stdin - nextID atomic.Int64 - pending sync.Map // id → chan *Message + mu sync.Mutex + nextID atomic.Int64 running atomic.Bool - // OnNotification is called for each server notification (no id). - // Set this before calling Start. - OnNotification func(method string, params json.RawMessage) - - // OnServerRequest is called for server-to-client requests (has id). - // Set this before calling Start. - OnServerRequest func(id int, method string, params json.RawMessage) - - // Router dispatches typed notifications by method name. - // Falls back to OnNotification for unregistered methods. - Router *NotificationRouter + Router *EventRouter } +const scannerBufferSize = 1024 * 1024 + // NewClient creates a client that will spawn the given command. -// Additional arguments can be passed after the command. func NewClient(command string, args ...string) *Client { return &Client{ command: command, @@ -49,7 +39,7 @@ func NewClient(command string, args ...string) *Client { } } -// Start spawns the child process and begins reading stdout. +// Start spawns the child process. func (c *Client) Start(ctx context.Context) error { c.cmd = exec.CommandContext(ctx, c.command, c.args...) @@ -65,7 +55,7 @@ func (c *Client) Start(ctx context.Context) error { } c.scanner = bufio.NewScanner(c.stdout) - c.scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 1MB max line + c.scanner.Buffer(make([]byte, scannerBufferSize), scannerBufferSize) if err := c.cmd.Start(); err != nil { return fmt.Errorf("start process: %w", err) @@ -87,7 +77,6 @@ func (c *Client) Stop() error { } c.running.Store(false) - // Close stdin to signal EOF to the child if c.stdin != nil { c.stdin.Close() } @@ -98,11 +87,11 @@ func (c *Client) Stop() error { return nil } -// Send writes a JSON-RPC request to the child's stdin as a JSONL line. -func (c *Client) Send(req *Request) error { - data, err := json.Marshal(req) +// Send writes a Submission to the child's stdin as a JSONL line. +func (c *Client) Send(sub *Submission) error { + data, err := json.Marshal(sub) if err != nil { - return fmt.Errorf("marshal request: %w", err) + return fmt.Errorf("marshal submission: %w", err) } c.mu.Lock() @@ -113,138 +102,28 @@ func (c *Client) Send(req *Request) error { return err } -// ReadLoop reads JSONL from stdout and dispatches each message to the callback. -// It blocks until the scanner is exhausted (stdout closed) or an error occurs. -func (c *Client) ReadLoop(handler func(Message)) { +// NextID generates a unique string ID for a submission. +func (c *Client) NextID() string { + return fmt.Sprintf("sub-%d", c.nextID.Add(1)) +} + +// ReadLoop reads JSONL events from stdout and dispatches to the router. +// Blocks until stdout is closed or an error occurs. +func (c *Client) ReadLoop() { for c.scanner.Scan() { line := c.scanner.Bytes() if len(line) == 0 { continue } - var msg Message - if err := json.Unmarshal(line, &msg); err != nil { - continue // skip malformed lines - } - - handler(msg) - } -} - -// Call sends a request and blocks until the response with the matching ID arrives. -func (c *Client) Call(ctx context.Context, method string, params json.RawMessage) (*Message, error) { - id := int(c.nextID.Add(1)) - - ch := make(chan *Message, 1) - c.pending.Store(id, ch) - defer c.pending.Delete(id) - - req := &Request{ - ID: &id, - Method: method, - Params: params, - } - - if err := c.Send(req); err != nil { - return nil, err - } - - select { - case msg := <-ch: - return msg, nil - case <-ctx.Done(): - return nil, ctx.Err() - } -} - -// Dispatch routes an incoming message to the appropriate handler: -// - Messages with an ID matching a pending request -> resolve the pending Call -// - Messages with an ID but no pending request -> server-to-client request (OnServerRequest) -// - Messages without an ID -> notification (OnNotification) -func (c *Client) Dispatch(msg Message) { - if msg.ID != nil { - // Check if this resolves a pending call - if ch, ok := c.pending.LoadAndDelete(*msg.ID); ok { - ch.(chan *Message) <- &msg - return - } - - // Server-to-client request - if c.OnServerRequest != nil && msg.Method != "" { - c.OnServerRequest(*msg.ID, msg.Method, msg.Params) + var event Event + if err := json.Unmarshal(line, &event); err != nil { + continue } - return - } - if msg.Method == "" { - return - } - - if c.Router != nil { - c.Router.Handle(msg.Method, msg.Params) - } - - if c.OnNotification != nil { - c.OnNotification(msg.Method, msg.Params) - } -} - -// InitializeParams is sent as the first request to the app-server. -type InitializeParams struct { - ClientInfo ClientInfo `json:"clientInfo"` -} - -// ClientInfo identifies this client to the app-server. -type ClientInfo struct { - Name string `json:"name"` - Title string `json:"title"` - Version string `json:"version"` -} - -// ServerCapabilities is the result of the initialize request. -type ServerCapabilities struct { - ServerInfo struct { - Name string `json:"name"` - Version string `json:"version"` - } `json:"serverInfo"` -} - -// Initialize performs the required handshake with the app-server. -// Sends initialize request, receives capabilities, then sends initialized notification. -func (c *Client) Initialize(ctx context.Context) (*ServerCapabilities, error) { - params, _ := json.Marshal(InitializeParams{ - ClientInfo: ClientInfo{ - Name: "dj", - Title: "DJ — Codex TUI Visualizer", - Version: "0.1.0", - }, - }) - - resp, err := c.Call(ctx, "initialize", params) - if err != nil { - return nil, fmt.Errorf("initialize request: %w", err) - } - - if resp.Error != nil { - return nil, fmt.Errorf("initialize error: %s", resp.Error.Message) - } - - var caps ServerCapabilities - if resp.Result != nil { - if err := json.Unmarshal(resp.Result, &caps); err != nil { - return nil, fmt.Errorf("unmarshal capabilities: %w", err) + if c.Router != nil { + c.Router.HandleEvent(event) } } - - // Send the initialized notification (no id, no response expected). - // Codex requires params: {} even for empty notifications. - notif := &Request{ - Method: "initialized", - Params: json.RawMessage(`{}`), - } - if err := c.Send(notif); err != nil { - return nil, fmt.Errorf("send initialized: %w", err) - } - - return &caps, nil + c.running.Store(false) } diff --git a/internal/appserver/client_test.go b/internal/appserver/client_test.go index c13142e..082ea33 100644 --- a/internal/appserver/client_test.go +++ b/internal/appserver/client_test.go @@ -43,123 +43,85 @@ func TestClientSendAndRead(t *testing.T) { } defer client.Stop() - msgs := make(chan Message, 10) - go client.ReadLoop(func(msg Message) { - msgs <- msg + events := make(chan Event, 10) + client.Router = NewEventRouter() + client.Router.OnSessionConfigured(func(event SessionConfigured) { + events <- Event{Msg: json.RawMessage(`{"type":"session_configured"}`)} }) - req := &Request{ - ID: intPtr(1), - Method: "test/echo", - Params: json.RawMessage(`{"hello":"world"}`), + go client.ReadLoop() + + sub := &Submission{ + ID: "test-1", + Op: json.RawMessage(`{"type":"session_configured","session_id":"s1","model":"test"}`), } - if err := client.Send(req); err != nil { - t.Fatalf("Send failed: %v", err) + + wrappedEvent := Event{ + ID: "", + Msg: json.RawMessage(`{"type":"session_configured","session_id":"s1","model":"test"}`), } + data, _ := json.Marshal(wrappedEvent) + client.mu.Lock() + data = append(data, '\n') + client.stdin.Write(data) + client.mu.Unlock() + _ = sub select { - case msg := <-msgs: - if msg.Method != "test/echo" { - t.Errorf("expected method test/echo, got %s", msg.Method) - } + case <-events: case <-time.After(3 * time.Second): - t.Fatal("timeout waiting for message") + t.Fatal("timeout waiting for event") } } -func TestClientCall(t *testing.T) { +func TestClientNextID(t *testing.T) { client := NewClient("cat") - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() + id1 := client.NextID() + id2 := client.NextID() - if err := client.Start(ctx); err != nil { - t.Fatalf("Start failed: %v", err) + if id1 == id2 { + t.Errorf("expected unique IDs, got %s and %s", id1, id2) } - defer client.Stop() - - go client.ReadLoop(client.Dispatch) - - resp, err := client.Call(ctx, "test/method", json.RawMessage(`{"key":"val"}`)) - if err != nil { - t.Fatalf("Call failed: %v", err) + if id1 != "sub-1" { + t.Errorf("expected sub-1, got %s", id1) } - - if resp == nil { - t.Fatal("expected non-nil response") + if id2 != "sub-2" { + t.Errorf("expected sub-2, got %s", id2) } } -func TestInitializeHandshake(t *testing.T) { +func TestClientReadLoopParsesEvents(t *testing.T) { clientRead, serverWrite := io.Pipe() - serverRead, clientWrite := io.Pipe() - - go func() { - scanner := bufio.NewScanner(serverRead) - scanner.Buffer(make([]byte, 1024*1024), 1024*1024) - - if !scanner.Scan() { - t.Error("mock server: failed to read initialize request") - return - } - var req Message - if err := json.Unmarshal(scanner.Bytes(), &req); err != nil { - t.Errorf("mock server: unmarshal request: %v", err) - return - } - if req.Method != "initialize" { - t.Errorf("mock server: expected method initialize, got %s", req.Method) - return - } - - resp := Message{ - ID: req.ID, - Result: json.RawMessage(`{"serverInfo":{"name":"codex-app-server","version":"0.1.0"}}`), - } - data, _ := json.Marshal(resp) - data = append(data, '\n') - serverWrite.Write(data) - - if !scanner.Scan() { - t.Error("mock server: failed to read initialized notification") - return - } - var notif Message - if err := json.Unmarshal(scanner.Bytes(), ¬if); err != nil { - t.Errorf("mock server: unmarshal notification: %v", err) - return - } - if notif.Method != "initialized" { - t.Errorf("mock server: expected method initialized, got %s", notif.Method) - } - if notif.Params == nil { - t.Error("mock server: initialized notification must include params") - } - }() client := &Client{} - client.stdin = clientWrite client.stdout = clientRead client.scanner = bufio.NewScanner(clientRead) - client.scanner.Buffer(make([]byte, 1024*1024), 1024*1024) + client.scanner.Buffer(make([]byte, scannerBufferSize), scannerBufferSize) client.running.Store(true) - go client.ReadLoop(client.Dispatch) + received := make(chan SessionConfigured, 1) + client.Router = NewEventRouter() + client.Router.OnSessionConfigured(func(event SessionConfigured) { + received <- event + }) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() + go client.ReadLoop() - caps, err := client.Initialize(ctx) - if err != nil { - t.Fatalf("Initialize failed: %v", err) - } - if caps == nil { - t.Fatal("expected non-nil capabilities") - } - if caps.ServerInfo.Name != "codex-app-server" { - t.Errorf("expected server name codex-app-server, got %s", caps.ServerInfo.Name) - } - if caps.ServerInfo.Version != "0.1.0" { - t.Errorf("expected server version 0.1.0, got %s", caps.ServerInfo.Version) + eventJSON := `{"id":"","msg":{"type":"session_configured","session_id":"sess-123","model":"gpt-4o"}}` + "\n" + serverWrite.Write([]byte(eventJSON)) + + select { + case event := <-received: + if event.SessionID != "sess-123" { + t.Errorf("expected sess-123, got %s", event.SessionID) + } + if event.Model != "gpt-4o" { + t.Errorf("expected gpt-4o, got %s", event.Model) + } + case <-time.After(3 * time.Second): + t.Fatal("timeout waiting for session_configured event") } + + serverWrite.Close() } diff --git a/internal/appserver/client_thread.go b/internal/appserver/client_thread.go index 1c813ea..634cd65 100644 --- a/internal/appserver/client_thread.go +++ b/internal/appserver/client_thread.go @@ -1,58 +1,53 @@ package appserver import ( - "context" "encoding/json" "fmt" ) -func (c *Client) StartThread(ctx context.Context, model string) (*ThreadStartResult, error) { - params, _ := json.Marshal(ThreadStartParams{ - Model: model, - }) +// SendUserTurn sends a user_turn submission with the given text content. +func (c *Client) SendUserTurn(text string, cwd string, model string) error { + op := UserTurnOp{ + Type: OpUserTurn, + Items: []UserInput{NewTextInput(text)}, + Cwd: cwd, + ApprovalPolicy: "on-request", + SandboxPolicy: SandboxPolicyReadOnly(), + Model: model, + } - resp, err := c.Call(ctx, MethodThreadStart, params) + opData, err := json.Marshal(op) if err != nil { - return nil, fmt.Errorf("thread/start: %w", err) - } - if resp.Error != nil { - return nil, fmt.Errorf("thread/start: %w", resp.Error) + return fmt.Errorf("marshal user_turn op: %w", err) } - var result ThreadStartResult - if err := json.Unmarshal(resp.Result, &result); err != nil { - return nil, fmt.Errorf("unmarshal thread/start result: %w", err) + sub := &Submission{ + ID: c.NextID(), + Op: opData, } - return &result, nil + return c.Send(sub) } -func (c *Client) ListThreads(ctx context.Context) (*ThreadListResult, error) { - resp, err := c.Call(ctx, MethodThreadList, json.RawMessage(`{}`)) - if err != nil { - return nil, fmt.Errorf("thread/list: %w", err) - } - if resp.Error != nil { - return nil, fmt.Errorf("thread/list: %w", resp.Error) - } +// SendInterrupt sends an interrupt submission to stop the current turn. +func (c *Client) SendInterrupt() error { + op := InterruptOp{Type: OpInterrupt} + opData, _ := json.Marshal(op) - var result ThreadListResult - if err := json.Unmarshal(resp.Result, &result); err != nil { - return nil, fmt.Errorf("unmarshal thread/list result: %w", err) + sub := &Submission{ + ID: c.NextID(), + Op: opData, } - return &result, nil + return c.Send(sub) } -func (c *Client) ArchiveThread(ctx context.Context, threadID string) error { - params, _ := json.Marshal(ThreadArchiveParams{ - ThreadID: threadID, - }) +// SendShutdown sends a shutdown submission. +func (c *Client) SendShutdown() error { + op := ShutdownOp{Type: OpShutdown} + opData, _ := json.Marshal(op) - resp, err := c.Call(ctx, MethodThreadArchive, params) - if err != nil { - return fmt.Errorf("thread/archive: %w", err) - } - if resp.Error != nil { - return fmt.Errorf("thread/archive: %w", resp.Error) + sub := &Submission{ + ID: c.NextID(), + Op: opData, } - return nil + return c.Send(sub) } diff --git a/internal/appserver/client_thread_test.go b/internal/appserver/client_thread_test.go index c9e2941..c88d017 100644 --- a/internal/appserver/client_thread_test.go +++ b/internal/appserver/client_thread_test.go @@ -2,112 +2,97 @@ package appserver import ( "bufio" - "context" "encoding/json" "io" "testing" "time" ) -func TestClientStartThread(t *testing.T) { - clientRead, serverWrite := io.Pipe() +func TestSendUserTurn(t *testing.T) { + clientRead, _ := io.Pipe() serverRead, clientWrite := io.Pipe() - go mockThreadStartServer(t, serverRead, serverWrite) - client := &Client{} client.stdin = clientWrite client.stdout = clientRead client.scanner = bufio.NewScanner(clientRead) - client.scanner.Buffer(make([]byte, 1024*1024), 1024*1024) + client.scanner.Buffer(make([]byte, scannerBufferSize), scannerBufferSize) client.running.Store(true) - go client.ReadLoop(client.Dispatch) - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - result, err := client.StartThread(ctx, "") + received := make(chan map[string]any, 1) + go func() { + scanner := bufio.NewScanner(serverRead) + scanner.Buffer(make([]byte, scannerBufferSize), scannerBufferSize) + if scanner.Scan() { + var parsed map[string]any + json.Unmarshal(scanner.Bytes(), &parsed) + received <- parsed + } + }() + + err := client.SendUserTurn("Hello world", "/tmp", "o4-mini") if err != nil { - t.Fatalf("StartThread failed: %v", err) - } - if result.Thread.ID != "thr_new_123" { - t.Errorf("expected thr_new_123, got %s", result.Thread.ID) - } -} - -func mockThreadStartServer(t *testing.T, reader *io.PipeReader, writer *io.PipeWriter) { - t.Helper() - scanner := bufio.NewScanner(reader) - scanner.Buffer(make([]byte, 1024*1024), 1024*1024) - - if !scanner.Scan() { - t.Error("mock: failed to read request") - return - } - var req Message - if err := json.Unmarshal(scanner.Bytes(), &req); err != nil { - t.Errorf("mock: unmarshal: %v", err) - return + t.Fatalf("SendUserTurn failed: %v", err) } - resp := Message{ - ID: req.ID, - Result: json.RawMessage(`{"thread":{"id":"thr_new_123"}}`), + select { + case msg := <-received: + if msg["id"] == nil || msg["id"] == "" { + t.Error("expected non-empty id") + } + opRaw, _ := json.Marshal(msg["op"]) + var op map[string]any + json.Unmarshal(opRaw, &op) + if op["type"] != OpUserTurn { + t.Errorf("expected user_turn, got %v", op["type"]) + } + if op["model"] != "o4-mini" { + t.Errorf("expected model o4-mini, got %v", op["model"]) + } + if op["cwd"] != "/tmp" { + t.Errorf("expected cwd /tmp, got %v", op["cwd"]) + } + case <-time.After(3 * time.Second): + t.Fatal("timeout waiting for submission") } - data, _ := json.Marshal(resp) - data = append(data, '\n') - writer.Write(data) } -func TestClientListThreads(t *testing.T) { - clientRead, serverWrite := io.Pipe() +func TestSendInterrupt(t *testing.T) { + clientRead, _ := io.Pipe() serverRead, clientWrite := io.Pipe() - go mockThreadListServer(t, serverRead, serverWrite) - client := &Client{} client.stdin = clientWrite client.stdout = clientRead client.scanner = bufio.NewScanner(clientRead) - client.scanner.Buffer(make([]byte, 1024*1024), 1024*1024) + client.scanner.Buffer(make([]byte, scannerBufferSize), scannerBufferSize) client.running.Store(true) - go client.ReadLoop(client.Dispatch) - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - result, err := client.ListThreads(ctx) + received := make(chan map[string]any, 1) + go func() { + scanner := bufio.NewScanner(serverRead) + scanner.Buffer(make([]byte, scannerBufferSize), scannerBufferSize) + if scanner.Scan() { + var parsed map[string]any + json.Unmarshal(scanner.Bytes(), &parsed) + received <- parsed + } + }() + + err := client.SendInterrupt() if err != nil { - t.Fatalf("ListThreads failed: %v", err) - } - if len(result.Threads) != 2 { - t.Fatalf("expected 2 threads, got %d", len(result.Threads)) - } -} - -func mockThreadListServer(t *testing.T, reader *io.PipeReader, writer *io.PipeWriter) { - t.Helper() - scanner := bufio.NewScanner(reader) - scanner.Buffer(make([]byte, 1024*1024), 1024*1024) - - if !scanner.Scan() { - t.Error("mock: failed to read request") - return - } - var req Message - if err := json.Unmarshal(scanner.Bytes(), &req); err != nil { - t.Errorf("mock: unmarshal: %v", err) - return + t.Fatalf("SendInterrupt failed: %v", err) } - threadList := `{"threads":[{"id":"t-1","status":"active","title":"A"},{"id":"t-2","status":"idle","title":"B"}]}` - resp := Message{ - ID: req.ID, - Result: json.RawMessage(threadList), + select { + case msg := <-received: + opRaw, _ := json.Marshal(msg["op"]) + var op map[string]any + json.Unmarshal(opRaw, &op) + if op["type"] != OpInterrupt { + t.Errorf("expected interrupt, got %v", op["type"]) + } + case <-time.After(3 * time.Second): + t.Fatal("timeout waiting for submission") } - data, _ := json.Marshal(resp) - data = append(data, '\n') - writer.Write(data) } diff --git a/internal/appserver/dispatch_test.go b/internal/appserver/dispatch_test.go index e390e5e..e8c64c7 100644 --- a/internal/appserver/dispatch_test.go +++ b/internal/appserver/dispatch_test.go @@ -6,46 +6,52 @@ import ( "testing" ) -func TestDispatchRoutesNotificationToRouter(t *testing.T) { - client := &Client{} - router := NewNotificationRouter() +func TestEventRouterDispatchesSessionConfigured(t *testing.T) { + router := NewEventRouter() var called atomic.Bool - router.OnThreadStatusChanged(func(params ThreadStatusChanged) { + router.OnSessionConfigured(func(event SessionConfigured) { called.Store(true) - if params.ThreadID != "t-1" { - t.Errorf("expected t-1, got %s", params.ThreadID) + if event.SessionID != "sess-1" { + t.Errorf("expected sess-1, got %s", event.SessionID) } }) - client.Router = router - - msg := Message{ - Method: NotifyThreadStatusChanged, - Params: json.RawMessage(`{"threadId":"t-1","status":"active","title":"Test"}`), + event := Event{ + ID: "", + Msg: json.RawMessage(`{"type":"session_configured","session_id":"sess-1","model":"gpt-4o"}`), } - client.Dispatch(msg) + router.HandleEvent(event) if !called.Load() { - t.Error("router handler was not called") + t.Error("handler was not called") } } -func TestDispatchFallsBackToOnNotification(t *testing.T) { - client := &Client{} - - var called atomic.Bool - client.OnNotification = func(method string, params json.RawMessage) { - called.Store(true) +func TestEventRouterIgnoresUnregisteredType(t *testing.T) { + router := NewEventRouter() + event := Event{ + ID: "", + Msg: json.RawMessage(`{"type":"unknown_event"}`), } + router.HandleEvent(event) +} - msg := Message{ - Method: "custom/notification", - Params: json.RawMessage(`{}`), +func TestEventRouterDispatchesError(t *testing.T) { + router := NewEventRouter() + + var receivedMsg string + router.OnError(func(event ServerError) { + receivedMsg = event.Message + }) + + event := Event{ + ID: "", + Msg: json.RawMessage(`{"type":"error","message":"test error"}`), } - client.Dispatch(msg) + router.HandleEvent(event) - if !called.Load() { - t.Error("OnNotification was not called") + if receivedMsg != "test error" { + t.Errorf("expected 'test error', got %s", receivedMsg) } } diff --git a/internal/appserver/integration_test.go b/internal/appserver/integration_test.go index 1ae343d..556eebf 100644 --- a/internal/appserver/integration_test.go +++ b/internal/appserver/integration_test.go @@ -8,23 +8,29 @@ import ( "time" ) -func TestIntegrationAppServerConnect(t *testing.T) { - client := NewClient("codex", "app-server", "--listen", "stdio://") +func TestIntegrationProtoConnect(t *testing.T) { + client := NewClient("codex", "proto") ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() if err := client.Start(ctx); err != nil { - t.Fatalf("Failed to start app-server: %v", err) + t.Fatalf("Failed to start codex proto: %v", err) } defer client.Stop() - go client.ReadLoop(client.Dispatch) + received := make(chan SessionConfigured, 1) + client.Router = NewEventRouter() + client.Router.OnSessionConfigured(func(event SessionConfigured) { + received <- event + }) - caps, err := client.Initialize(ctx) - if err != nil { - t.Fatalf("Initialize failed: %v", err) - } + go client.ReadLoop() - t.Logf("Connected to: %s %s", caps.ServerInfo.Name, caps.ServerInfo.Version) + select { + case event := <-received: + t.Logf("Connected: session=%s model=%s", event.SessionID, event.Model) + case <-ctx.Done(): + t.Fatal("timeout waiting for session_configured") + } } diff --git a/internal/appserver/methods.go b/internal/appserver/methods.go index 740e019..9ffe97d 100644 --- a/internal/appserver/methods.go +++ b/internal/appserver/methods.go @@ -1,20 +1,28 @@ package appserver const ( - MethodThreadStart = "thread/start" - MethodThreadList = "thread/list" - MethodThreadArchive = "thread/archive" - MethodTurnStart = "turn/start" - MethodTurnInterrupt = "turn/interrupt" + OpUserTurn = "user_turn" + OpInterrupt = "interrupt" + OpExecApproval = "exec_approval" + OpPatchApproval = "patch_approval" + OpShutdown = "shutdown" + OpGetHistoryEntry = "get_history_entry_request" ) const ( - NotifyThreadStatusChanged = "thread/status/changed" - NotifyItemStarted = "item/started" - NotifyItemCompleted = "item/completed" - NotifyItemMessageDelta = "item/agentMessage/delta" - NotifyTurnStarted = "turn/started" - NotifyTurnCompleted = "turn/completed" - NotifyCommandOutput = "command/output" - NotifyCommandFinished = "command/finished" + EventSessionConfigured = "session_configured" + EventTaskStarted = "task_started" + EventTaskComplete = "task_complete" + EventTurnAborted = "turn_aborted" + EventAgentMessage = "agent_message" + EventAgentMessageDelta = "agent_message_delta" + EventExecCommandBegin = "exec_command_begin" + EventExecCommandOutputDelta = "exec_command_output_delta" + EventExecCommandEnd = "exec_command_end" + EventExecApprovalRequest = "exec_approval_request" + EventPatchApplyBegin = "patch_apply_begin" + EventPatchApplyEnd = "patch_apply_end" + EventError = "error" + EventWarning = "warning" + EventShutdownComplete = "shutdown_complete" ) diff --git a/internal/appserver/methods_test.go b/internal/appserver/methods_test.go index 7a3befc..2c4cf93 100644 --- a/internal/appserver/methods_test.go +++ b/internal/appserver/methods_test.go @@ -2,25 +2,42 @@ package appserver import "testing" -func TestMethodConstants(t *testing.T) { +func TestOpConstants(t *testing.T) { tests := []struct { name string constant string expected string }{ - {"ThreadStart", MethodThreadStart, "thread/start"}, - {"ThreadList", MethodThreadList, "thread/list"}, - {"ThreadArchive", MethodThreadArchive, "thread/archive"}, - {"TurnStart", MethodTurnStart, "turn/start"}, - {"TurnInterrupt", MethodTurnInterrupt, "turn/interrupt"}, - {"NotifyThreadStatus", NotifyThreadStatusChanged, "thread/status/changed"}, - {"NotifyItemStarted", NotifyItemStarted, "item/started"}, - {"NotifyItemCompleted", NotifyItemCompleted, "item/completed"}, - {"NotifyItemMessageDelta", NotifyItemMessageDelta, "item/agentMessage/delta"}, - {"NotifyTurnStarted", NotifyTurnStarted, "turn/started"}, - {"NotifyTurnCompleted", NotifyTurnCompleted, "turn/completed"}, - {"NotifyCommandOutput", NotifyCommandOutput, "command/output"}, - {"NotifyCommandFinished", NotifyCommandFinished, "command/finished"}, + {"UserTurn", OpUserTurn, "user_turn"}, + {"Interrupt", OpInterrupt, "interrupt"}, + {"ExecApproval", OpExecApproval, "exec_approval"}, + {"Shutdown", OpShutdown, "shutdown"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.constant != tt.expected { + t.Errorf("expected %q, got %q", tt.expected, tt.constant) + } + }) + } +} + +func TestEventConstants(t *testing.T) { + tests := []struct { + name string + constant string + expected string + }{ + {"SessionConfigured", EventSessionConfigured, "session_configured"}, + {"TaskStarted", EventTaskStarted, "task_started"}, + {"TaskComplete", EventTaskComplete, "task_complete"}, + {"AgentMessage", EventAgentMessage, "agent_message"}, + {"AgentMessageDelta", EventAgentMessageDelta, "agent_message_delta"}, + {"ExecCommandBegin", EventExecCommandBegin, "exec_command_begin"}, + {"ExecCommandOutputDelta", EventExecCommandOutputDelta, "exec_command_output_delta"}, + {"ExecCommandEnd", EventExecCommandEnd, "exec_command_end"}, + {"ExecApprovalRequest", EventExecApprovalRequest, "exec_approval_request"}, + {"Error", EventError, "error"}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/internal/appserver/protocol.go b/internal/appserver/protocol.go index fd26848..201f29e 100644 --- a/internal/appserver/protocol.go +++ b/internal/appserver/protocol.go @@ -2,33 +2,24 @@ package appserver import "encoding/json" -// Message is the generic JSON-RPC envelope used by the Codex App Server. -// The Codex wire format omits the "jsonrpc" field. -// Covers requests (id + method), responses (id + result/error), -// and notifications (method, no id). -type Message struct { - ID *int `json:"id,omitempty"` - Method string `json:"method,omitempty"` - Params json.RawMessage `json:"params,omitempty"` - Result json.RawMessage `json:"result,omitempty"` - Error *RPCError `json:"error,omitempty"` +// Submission is a client-to-server message in the Codex proto protocol. +type Submission struct { + ID string `json:"id"` + Op json.RawMessage `json:"op"` } -// Request is an outbound JSON-RPC request. -type Request struct { - ID *int `json:"id,omitempty"` - Method string `json:"method"` - Params json.RawMessage `json:"params,omitempty"` +// Event is a server-to-client message in the Codex proto protocol. +type Event struct { + ID string `json:"id"` + Msg json.RawMessage `json:"msg"` } -// Response is an inbound JSON-RPC response. -type Response struct { - ID *int `json:"id,omitempty"` - Result json.RawMessage `json:"result,omitempty"` - Error *RPCError `json:"error,omitempty"` +// EventHeader extracts just the type discriminator from an event message. +type EventHeader struct { + Type string `json:"type"` } -// RPCError is the JSON-RPC error object. +// RPCError is an error returned by the server. type RPCError struct { Code int `json:"code"` Message string `json:"message"` diff --git a/internal/appserver/protocol_test.go b/internal/appserver/protocol_test.go index 7bceec3..dd9c158 100644 --- a/internal/appserver/protocol_test.go +++ b/internal/appserver/protocol_test.go @@ -5,66 +5,65 @@ import ( "testing" ) -func TestRequestMarshal(t *testing.T) { - req := &Request{ - ID: intPtr(1), - Method: "thread/list", - Params: json.RawMessage(`{}`), +func TestSubmissionMarshal(t *testing.T) { + op := UserTurnOp{ + Type: OpUserTurn, + Items: []UserInput{NewTextInput("hello")}, + Cwd: "/tmp", + ApprovalPolicy: "never", + SandboxPolicy: SandboxPolicyReadOnly(), + Model: "o4-mini", } - data, err := json.Marshal(req) + opData, _ := json.Marshal(op) + + sub := &Submission{ + ID: "sub-1", + Op: opData, + } + data, err := json.Marshal(sub) if err != nil { t.Fatal(err) } var parsed map[string]any json.Unmarshal(data, &parsed) - if _, hasJSONRPC := parsed["jsonrpc"]; hasJSONRPC { - t.Error("codex wire format must not include jsonrpc field") - } - if parsed["method"] != "thread/list" { - t.Errorf("expected method thread/list, got %v", parsed["method"]) + if parsed["id"] != "sub-1" { + t.Errorf("expected id sub-1, got %v", parsed["id"]) } } -func TestResponseUnmarshal(t *testing.T) { - raw := `{"id":1,"result":{"threads":[]}}` - var resp Response - if err := json.Unmarshal([]byte(raw), &resp); err != nil { +func TestEventUnmarshal(t *testing.T) { + raw := `{"id":"","msg":{"type":"session_configured","session_id":"sess-1","model":"gpt-4o"}}` + var event Event + if err := json.Unmarshal([]byte(raw), &event); err != nil { t.Fatal(err) } - if resp.ID == nil || *resp.ID != 1 { - t.Errorf("expected id 1, got %v", resp.ID) + if event.ID != "" { + t.Errorf("expected empty id, got %s", event.ID) } - if resp.Error != nil { - t.Error("expected no error") - } -} -func TestNotificationUnmarshal(t *testing.T) { - raw := `{"method":"thread/status/changed","params":{"threadId":"t1","status":"active"}}` - var msg Message - if err := json.Unmarshal([]byte(raw), &msg); err != nil { + var header EventHeader + if err := json.Unmarshal(event.Msg, &header); err != nil { t.Fatal(err) } - if msg.Method != "thread/status/changed" { - t.Errorf("expected thread/status/changed, got %s", msg.Method) - } - if msg.ID != nil { - t.Error("notification should have no id") + if header.Type != EventSessionConfigured { + t.Errorf("expected session_configured, got %s", header.Type) } } -func TestErrorResponseUnmarshal(t *testing.T) { - raw := `{"id":2,"error":{"code":-32600,"message":"Invalid Request"}}` - var resp Response - if err := json.Unmarshal([]byte(raw), &resp); err != nil { +func TestEventHeaderExtraction(t *testing.T) { + raw := `{"type":"agent_message_delta","delta":"hello"}` + var header EventHeader + if err := json.Unmarshal([]byte(raw), &header); err != nil { t.Fatal(err) } - if resp.Error == nil { - t.Fatal("expected error") - } - if resp.Error.Code != -32600 { - t.Errorf("expected code -32600, got %d", resp.Error.Code) + if header.Type != EventAgentMessageDelta { + t.Errorf("expected agent_message_delta, got %s", header.Type) } } -func intPtr(i int) *int { return &i } +func TestRPCErrorMessage(t *testing.T) { + err := &RPCError{Code: -1, Message: "something broke"} + if err.Error() != "something broke" { + t.Errorf("expected 'something broke', got %s", err.Error()) + } +} diff --git a/internal/appserver/router.go b/internal/appserver/router.go index c2ae99b..d9b996e 100644 --- a/internal/appserver/router.go +++ b/internal/appserver/router.go @@ -2,100 +2,128 @@ package appserver import "encoding/json" -type NotificationRouter struct { - handlers map[string]func(json.RawMessage) +// EventRouter dispatches incoming events by their type field. +type EventRouter struct { + handlers map[string]func(string, json.RawMessage) } -func NewNotificationRouter() *NotificationRouter { - return &NotificationRouter{ - handlers: make(map[string]func(json.RawMessage)), +// NewEventRouter creates a new event router. +func NewEventRouter() *EventRouter { + return &EventRouter{ + handlers: make(map[string]func(string, json.RawMessage)), } } -func (router *NotificationRouter) Handle(method string, params json.RawMessage) { - handler, exists := router.handlers[method] +// HandleEvent extracts the event type and dispatches to the registered handler. +func (router *EventRouter) HandleEvent(event Event) { + var header EventHeader + if err := json.Unmarshal(event.Msg, &header); err != nil { + return + } + + handler, exists := router.handlers[header.Type] if !exists { return } - handler(params) + handler(event.ID, event.Msg) +} + +func (router *EventRouter) OnSessionConfigured(fn func(SessionConfigured)) { + router.handlers[EventSessionConfigured] = func(_ string, raw json.RawMessage) { + var event SessionConfigured + if err := json.Unmarshal(raw, &event); err != nil { + return + } + fn(event) + } +} + +func (router *EventRouter) OnTaskStarted(fn func(TaskStarted)) { + router.handlers[EventTaskStarted] = func(_ string, raw json.RawMessage) { + var event TaskStarted + if err := json.Unmarshal(raw, &event); err != nil { + return + } + fn(event) + } } -func (router *NotificationRouter) OnThreadStatusChanged(fn func(ThreadStatusChanged)) { - router.handlers[NotifyThreadStatusChanged] = func(raw json.RawMessage) { - var params ThreadStatusChanged - if err := json.Unmarshal(raw, ¶ms); err != nil { +func (router *EventRouter) OnTaskComplete(fn func(TaskComplete)) { + router.handlers[EventTaskComplete] = func(_ string, raw json.RawMessage) { + var event TaskComplete + if err := json.Unmarshal(raw, &event); err != nil { return } - fn(params) + fn(event) } } -func (router *NotificationRouter) OnItemStarted(fn func(ItemStarted)) { - router.handlers[NotifyItemStarted] = func(raw json.RawMessage) { - var params ItemStarted - if err := json.Unmarshal(raw, ¶ms); err != nil { +func (router *EventRouter) OnAgentMessage(fn func(AgentMessage)) { + router.handlers[EventAgentMessage] = func(_ string, raw json.RawMessage) { + var event AgentMessage + if err := json.Unmarshal(raw, &event); err != nil { return } - fn(params) + fn(event) } } -func (router *NotificationRouter) OnItemCompleted(fn func(ItemCompleted)) { - router.handlers[NotifyItemCompleted] = func(raw json.RawMessage) { - var params ItemCompleted - if err := json.Unmarshal(raw, ¶ms); err != nil { +func (router *EventRouter) OnAgentMessageDelta(fn func(AgentMessageDelta)) { + router.handlers[EventAgentMessageDelta] = func(_ string, raw json.RawMessage) { + var event AgentMessageDelta + if err := json.Unmarshal(raw, &event); err != nil { return } - fn(params) + fn(event) } } -func (router *NotificationRouter) OnItemMessageDelta(fn func(ItemMessageDelta)) { - router.handlers[NotifyItemMessageDelta] = func(raw json.RawMessage) { - var params ItemMessageDelta - if err := json.Unmarshal(raw, ¶ms); err != nil { +func (router *EventRouter) OnExecCommandBegin(fn func(ExecCommandBegin)) { + router.handlers[EventExecCommandBegin] = func(_ string, raw json.RawMessage) { + var event ExecCommandBegin + if err := json.Unmarshal(raw, &event); err != nil { return } - fn(params) + fn(event) } } -func (router *NotificationRouter) OnTurnStarted(fn func(TurnStarted)) { - router.handlers[NotifyTurnStarted] = func(raw json.RawMessage) { - var params TurnStarted - if err := json.Unmarshal(raw, ¶ms); err != nil { +func (router *EventRouter) OnExecCommandOutputDelta(fn func(ExecCommandOutputDelta)) { + router.handlers[EventExecCommandOutputDelta] = func(_ string, raw json.RawMessage) { + var event ExecCommandOutputDelta + if err := json.Unmarshal(raw, &event); err != nil { return } - fn(params) + fn(event) } } -func (router *NotificationRouter) OnTurnCompleted(fn func(TurnCompleted)) { - router.handlers[NotifyTurnCompleted] = func(raw json.RawMessage) { - var params TurnCompleted - if err := json.Unmarshal(raw, ¶ms); err != nil { +func (router *EventRouter) OnExecCommandEnd(fn func(ExecCommandEnd)) { + router.handlers[EventExecCommandEnd] = func(_ string, raw json.RawMessage) { + var event ExecCommandEnd + if err := json.Unmarshal(raw, &event); err != nil { return } - fn(params) + fn(event) } } -func (router *NotificationRouter) OnCommandOutput(fn func(CommandOutput)) { - router.handlers[NotifyCommandOutput] = func(raw json.RawMessage) { - var params CommandOutput - if err := json.Unmarshal(raw, ¶ms); err != nil { +func (router *EventRouter) OnExecApprovalRequest(fn func(ExecApprovalRequest)) { + router.handlers[EventExecApprovalRequest] = func(_ string, raw json.RawMessage) { + var event ExecApprovalRequest + if err := json.Unmarshal(raw, &event); err != nil { return } - fn(params) + fn(event) } } -func (router *NotificationRouter) OnCommandFinished(fn func(CommandFinished)) { - router.handlers[NotifyCommandFinished] = func(raw json.RawMessage) { - var params CommandFinished - if err := json.Unmarshal(raw, ¶ms); err != nil { +func (router *EventRouter) OnError(fn func(ServerError)) { + router.handlers[EventError] = func(_ string, raw json.RawMessage) { + var event ServerError + if err := json.Unmarshal(raw, &event); err != nil { return } - fn(params) + fn(event) } } diff --git a/internal/appserver/router_test.go b/internal/appserver/router_test.go index cd23232..9ddab41 100644 --- a/internal/appserver/router_test.go +++ b/internal/appserver/router_test.go @@ -6,90 +6,97 @@ import ( "testing" ) -func TestRouterDispatchesNotification(t *testing.T) { - router := NewNotificationRouter() +func TestRouterDispatchesAgentMessageDelta(t *testing.T) { + router := NewEventRouter() - var called atomic.Bool - router.OnThreadStatusChanged(func(params ThreadStatusChanged) { - called.Store(true) - if params.ThreadID != "t-1" { - t.Errorf("expected t-1, got %s", params.ThreadID) - } + var receivedDelta string + router.OnAgentMessageDelta(func(event AgentMessageDelta) { + receivedDelta = event.Delta }) - raw := json.RawMessage(`{"threadId":"t-1","status":"active","title":"Test"}`) - router.Handle(NotifyThreadStatusChanged, raw) - - if !called.Load() { - t.Error("handler was not called") + event := Event{ + ID: "sub-1", + Msg: json.RawMessage(`{"type":"agent_message_delta","delta":"hello"}`), } -} + router.HandleEvent(event) -func TestRouterIgnoresUnregisteredMethod(t *testing.T) { - router := NewNotificationRouter() - router.Handle("unknown/method", json.RawMessage(`{}`)) + if receivedDelta != "hello" { + t.Errorf("expected hello, got %s", receivedDelta) + } } -func TestRouterDispatchesItemMessageDelta(t *testing.T) { - router := NewNotificationRouter() +func TestRouterDispatchesExecCommandBegin(t *testing.T) { + router := NewEventRouter() - var receivedDelta string - router.OnItemMessageDelta(func(params ItemMessageDelta) { - receivedDelta = params.Delta + var receivedCommand string + router.OnExecCommandBegin(func(event ExecCommandBegin) { + receivedCommand = event.Command }) - raw := json.RawMessage(`{"threadId":"t-1","itemId":"item-1","delta":"hello"}`) - router.Handle(NotifyItemMessageDelta, raw) + event := Event{ + ID: "sub-1", + Msg: json.RawMessage(`{"type":"exec_command_begin","call_id":"cmd-1","command":"ls"}`), + } + router.HandleEvent(event) - if receivedDelta != "hello" { - t.Errorf("expected hello, got %s", receivedDelta) + if receivedCommand != "ls" { + t.Errorf("expected ls, got %s", receivedCommand) } } -func TestRouterDispatchesCommandOutput(t *testing.T) { - router := NewNotificationRouter() +func TestRouterDispatchesExecCommandOutputDelta(t *testing.T) { + router := NewEventRouter() var receivedData string - router.OnCommandOutput(func(params CommandOutput) { - receivedData = params.Data + router.OnExecCommandOutputDelta(func(event ExecCommandOutputDelta) { + receivedData = event.Delta }) - raw := json.RawMessage(`{"threadId":"t-1","execId":"e-1","data":"output line\n"}`) - router.Handle(NotifyCommandOutput, raw) + event := Event{ + ID: "sub-1", + Msg: json.RawMessage(`{"type":"exec_command_output_delta","call_id":"cmd-1","delta":"output\n"}`), + } + router.HandleEvent(event) - if receivedData != "output line\n" { + if receivedData != "output\n" { t.Errorf("expected output, got %s", receivedData) } } -func TestRouterDispatchesItemStarted(t *testing.T) { - router := NewNotificationRouter() +func TestRouterDispatchesExecApprovalRequest(t *testing.T) { + router := NewEventRouter() - var receivedRole string - router.OnItemStarted(func(params ItemStarted) { - receivedRole = params.Role + var called atomic.Bool + router.OnExecApprovalRequest(func(event ExecApprovalRequest) { + called.Store(true) }) - raw := json.RawMessage(`{"threadId":"t-1","itemId":"item-1","role":"assistant","type":"message"}`) - router.Handle(NotifyItemStarted, raw) + event := Event{ + ID: "", + Msg: json.RawMessage(`{"type":"exec_approval_request","call_id":"cmd-1","command":"rm file"}`), + } + router.HandleEvent(event) - if receivedRole != "assistant" { - t.Errorf("expected assistant, got %s", receivedRole) + if !called.Load() { + t.Error("handler was not called") } } -func TestRouterDispatchesTurnCompleted(t *testing.T) { - router := NewNotificationRouter() +func TestRouterDispatchesTaskComplete(t *testing.T) { + router := NewEventRouter() - var receivedTurnID string - router.OnTurnCompleted(func(params TurnCompleted) { - receivedTurnID = params.TurnID + var called atomic.Bool + router.OnTaskComplete(func(event TaskComplete) { + called.Store(true) }) - raw := json.RawMessage(`{"threadId":"t-1","turnId":"turn-1"}`) - router.Handle(NotifyTurnCompleted, raw) + event := Event{ + ID: "sub-1", + Msg: json.RawMessage(`{"type":"task_complete"}`), + } + router.HandleEvent(event) - if receivedTurnID != "turn-1" { - t.Errorf("expected turn-1, got %s", receivedTurnID) + if !called.Load() { + t.Error("handler was not called") } } diff --git a/internal/appserver/types_command.go b/internal/appserver/types_command.go index 8e34704..89aed6f 100644 --- a/internal/appserver/types_command.go +++ b/internal/appserver/types_command.go @@ -1,20 +1,24 @@ package appserver +// CommandExecParams is used when the TUI wants to execute a command. type CommandExecParams struct { ThreadID string `json:"threadId"` Command string `json:"command"` TTY bool `json:"tty"` } +// CommandExecResult is the result of a command execution. type CommandExecResult struct { ExecID string `json:"execId"` } +// ConfirmExecParams is used for exec approval responses. type ConfirmExecParams struct { ThreadID string `json:"threadId"` Command string `json:"command"` } +// ConfirmExecResult is the approval response. type ConfirmExecResult struct { Approved bool `json:"approved"` } diff --git a/internal/appserver/types_notify.go b/internal/appserver/types_notify.go index 10e82ad..755cab0 100644 --- a/internal/appserver/types_notify.go +++ b/internal/appserver/types_notify.go @@ -1,48 +1,68 @@ package appserver -type ThreadStatusChanged struct { - ThreadID string `json:"threadId"` - Status string `json:"status"` - Title string `json:"title"` +// SessionConfigured is the initial event sent by the server on startup. +type SessionConfigured struct { + Type string `json:"type"` + SessionID string `json:"session_id"` + Model string `json:"model"` + ReasoningEffort string `json:"reasoning_effort"` + HistoryLogID int `json:"history_log_id"` + HistoryEntryCount int `json:"history_entry_count"` + RolloutPath string `json:"rollout_path"` } -type ItemStarted struct { - ThreadID string `json:"threadId"` - ItemID string `json:"itemId"` - Role string `json:"role"` - Type string `json:"type"` +// TaskStarted signals the beginning of an agent turn. +type TaskStarted struct { + Type string `json:"type"` +} + +// TaskComplete signals the end of an agent turn. +type TaskComplete struct { + Type string `json:"type"` } -type ItemCompleted struct { - ThreadID string `json:"threadId"` - ItemID string `json:"itemId"` - Content string `json:"content"` +// AgentMessage is a complete agent message. +type AgentMessage struct { + Type string `json:"type"` + Content string `json:"content"` } -type ItemMessageDelta struct { - ThreadID string `json:"threadId"` - ItemID string `json:"itemId"` - Delta string `json:"delta"` +// AgentMessageDelta is a streaming text delta from the agent. +type AgentMessageDelta struct { + Type string `json:"type"` + Delta string `json:"delta"` } -type TurnStarted struct { - ThreadID string `json:"threadId"` - TurnID string `json:"turnId"` +// ExecCommandBegin signals the start of a command execution. +type ExecCommandBegin struct { + Type string `json:"type"` + ExecID string `json:"call_id"` + Command string `json:"command"` } -type TurnCompleted struct { - ThreadID string `json:"threadId"` - TurnID string `json:"turnId"` +// ExecCommandOutputDelta is a chunk of command output. +type ExecCommandOutputDelta struct { + Type string `json:"type"` + ExecID string `json:"call_id"` + Delta string `json:"delta"` +} + +// ExecCommandEnd signals the end of a command execution. +type ExecCommandEnd struct { + Type string `json:"type"` + ExecID string `json:"call_id"` + ExitCode int `json:"exit_code"` } -type CommandOutput struct { - ThreadID string `json:"threadId"` - ExecID string `json:"execId"` - Data string `json:"data"` +// ExecApprovalRequest asks the client to approve a command. +type ExecApprovalRequest struct { + Type string `json:"type"` + ExecID string `json:"call_id"` + Command string `json:"command"` } -type CommandFinished struct { - ThreadID string `json:"threadId"` - ExecID string `json:"execId"` - ExitCode int `json:"exitCode"` +// ServerError is an error event from the server. +type ServerError struct { + Type string `json:"type"` + Message string `json:"message"` } diff --git a/internal/appserver/types_notify_test.go b/internal/appserver/types_notify_test.go index 6d8e91b..44ada14 100644 --- a/internal/appserver/types_notify_test.go +++ b/internal/appserver/types_notify_test.go @@ -5,99 +5,85 @@ import ( "testing" ) -func TestThreadStatusChangedUnmarshal(t *testing.T) { - raw := `{"threadId":"t-1","status":"completed","title":"Done"}` - var params ThreadStatusChanged - if err := json.Unmarshal([]byte(raw), ¶ms); err != nil { +func TestSessionConfiguredUnmarshal(t *testing.T) { + raw := `{"type":"session_configured","session_id":"sess-1","model":"gpt-4o","reasoning_effort":"high","history_log_id":123,"history_entry_count":5,"rollout_path":"/tmp/rollout.jsonl"}` + var event SessionConfigured + if err := json.Unmarshal([]byte(raw), &event); err != nil { t.Fatal(err) } - if params.ThreadID != "t-1" { - t.Errorf("expected t-1, got %s", params.ThreadID) + if event.SessionID != "sess-1" { + t.Errorf("expected sess-1, got %s", event.SessionID) } - if params.Status != ThreadStatusCompleted { - t.Errorf("expected completed, got %s", params.Status) + if event.Model != "gpt-4o" { + t.Errorf("expected gpt-4o, got %s", event.Model) } } -func TestItemStartedUnmarshal(t *testing.T) { - raw := `{"threadId":"t-1","itemId":"item-1","role":"assistant","type":"message"}` - var params ItemStarted - if err := json.Unmarshal([]byte(raw), ¶ms); err != nil { +func TestAgentMessageDeltaUnmarshal(t *testing.T) { + raw := `{"type":"agent_message_delta","delta":"hello world"}` + var event AgentMessageDelta + if err := json.Unmarshal([]byte(raw), &event); err != nil { t.Fatal(err) } - if params.Role != "assistant" { - t.Errorf("expected assistant, got %s", params.Role) - } - if params.Type != "message" { - t.Errorf("expected message, got %s", params.Type) - } - if params.ItemID != "item-1" { - t.Errorf("expected item-1, got %s", params.ItemID) + if event.Delta != "hello world" { + t.Errorf("expected 'hello world', got %s", event.Delta) } } -func TestItemCompletedUnmarshal(t *testing.T) { - raw := `{"threadId":"t-1","itemId":"item-1","content":"Hello world"}` - var params ItemCompleted - if err := json.Unmarshal([]byte(raw), ¶ms); err != nil { +func TestExecCommandBeginUnmarshal(t *testing.T) { + raw := `{"type":"exec_command_begin","call_id":"cmd-1","command":"ls -la"}` + var event ExecCommandBegin + if err := json.Unmarshal([]byte(raw), &event); err != nil { t.Fatal(err) } - if params.Content != "Hello world" { - t.Errorf("expected Hello world, got %s", params.Content) - } -} - -func TestItemMessageDeltaUnmarshal(t *testing.T) { - raw := `{"threadId":"t-1","itemId":"item-1","delta":"more text"}` - var params ItemMessageDelta - if err := json.Unmarshal([]byte(raw), ¶ms); err != nil { - t.Fatal(err) + if event.ExecID != "cmd-1" { + t.Errorf("expected cmd-1, got %s", event.ExecID) } - if params.Delta != "more text" { - t.Errorf("expected 'more text', got %s", params.Delta) + if event.Command != "ls -la" { + t.Errorf("expected ls -la, got %s", event.Command) } } -func TestTurnStartedUnmarshal(t *testing.T) { - raw := `{"threadId":"t-1","turnId":"turn-1"}` - var params TurnStarted - if err := json.Unmarshal([]byte(raw), ¶ms); err != nil { +func TestExecCommandOutputDeltaUnmarshal(t *testing.T) { + raw := `{"type":"exec_command_output_delta","call_id":"cmd-1","delta":"output line\n"}` + var event ExecCommandOutputDelta + if err := json.Unmarshal([]byte(raw), &event); err != nil { t.Fatal(err) } - if params.TurnID != "turn-1" { - t.Errorf("expected turn-1, got %s", params.TurnID) + if event.Delta != "output line\n" { + t.Errorf("expected output, got %s", event.Delta) } } -func TestTurnCompletedUnmarshal(t *testing.T) { - raw := `{"threadId":"t-1","turnId":"turn-1"}` - var params TurnCompleted - if err := json.Unmarshal([]byte(raw), ¶ms); err != nil { +func TestExecCommandEndUnmarshal(t *testing.T) { + raw := `{"type":"exec_command_end","call_id":"cmd-1","exit_code":0}` + var event ExecCommandEnd + if err := json.Unmarshal([]byte(raw), &event); err != nil { t.Fatal(err) } - if params.TurnID != "turn-1" { - t.Errorf("expected turn-1, got %s", params.TurnID) + if event.ExitCode != 0 { + t.Errorf("expected exit code 0, got %d", event.ExitCode) } } -func TestCommandOutputUnmarshal(t *testing.T) { - raw := `{"threadId":"t-1","execId":"e-1","data":"line of output\n"}` - var params CommandOutput - if err := json.Unmarshal([]byte(raw), ¶ms); err != nil { +func TestServerErrorUnmarshal(t *testing.T) { + raw := `{"type":"error","message":"something went wrong"}` + var event ServerError + if err := json.Unmarshal([]byte(raw), &event); err != nil { t.Fatal(err) } - if params.ExecID != "e-1" { - t.Errorf("expected e-1, got %s", params.ExecID) + if event.Message != "something went wrong" { + t.Errorf("expected error message, got %s", event.Message) } } -func TestCommandFinishedUnmarshal(t *testing.T) { - raw := `{"threadId":"t-1","execId":"e-1","exitCode":0}` - var params CommandFinished - if err := json.Unmarshal([]byte(raw), ¶ms); err != nil { +func TestExecApprovalRequestUnmarshal(t *testing.T) { + raw := `{"type":"exec_approval_request","call_id":"cmd-1","command":"rm -rf /tmp/test"}` + var event ExecApprovalRequest + if err := json.Unmarshal([]byte(raw), &event); err != nil { t.Fatal(err) } - if params.ExitCode != 0 { - t.Errorf("expected exit code 0, got %d", params.ExitCode) + if event.Command != "rm -rf /tmp/test" { + t.Errorf("expected command, got %s", event.Command) } } diff --git a/internal/appserver/types_thread.go b/internal/appserver/types_thread.go index 2c616e6..63a6b27 100644 --- a/internal/appserver/types_thread.go +++ b/internal/appserver/types_thread.go @@ -1,43 +1,61 @@ package appserver -const ( - ThreadStatusActive = "active" - ThreadStatusIdle = "idle" - ThreadStatusCompleted = "completed" - ThreadStatusError = "error" -) +import "encoding/json" -type ThreadStartParams struct { - Model string `json:"model,omitempty"` +// UserTurnOp is the "user_turn" submission operation. +type UserTurnOp struct { + Type string `json:"type"` + Items []UserInput `json:"items"` + Cwd string `json:"cwd"` + ApprovalPolicy string `json:"approval_policy"` + SandboxPolicy json.RawMessage `json:"sandbox_policy"` + Model string `json:"model"` } -type ThreadStartResult struct { - Thread ThreadInfo `json:"thread"` +// UserInput is a content item in a user turn. +type UserInput struct { + Type string `json:"type"` + Text string `json:"text,omitempty"` + TextElements []any `json:"text_elements,omitempty"` } -type ThreadInfo struct { - ID string `json:"id"` +// NewTextInput creates a text user input item. +func NewTextInput(text string) UserInput { + return UserInput{ + Type: "text", + Text: text, + TextElements: []any{}, + } } -type ThreadArchiveParams struct { - ThreadID string `json:"threadId"` +// SandboxPolicyReadOnly creates a read-only sandbox policy. +func SandboxPolicyReadOnly() json.RawMessage { + return json.RawMessage(`{"type":"read-only","network_access":false}`) } -type ThreadListResult struct { - Threads []ThreadSummary `json:"threads"` +// SandboxPolicyWorkspaceWrite creates a workspace-write sandbox policy. +func SandboxPolicyWorkspaceWrite(roots []string) json.RawMessage { + policy := map[string]any{ + "type": "workspace-write", + "writable_roots": roots, + "network_access": false, + } + data, _ := json.Marshal(policy) + return data } -type ThreadSummary struct { - ID string `json:"id"` - Status string `json:"status"` - Title string `json:"title"` +// InterruptOp is the "interrupt" submission operation. +type InterruptOp struct { + Type string `json:"type"` } -type TurnStartParams struct { - ThreadID string `json:"threadId"` - Content string `json:"content"` +// ShutdownOp is the "shutdown" submission operation. +type ShutdownOp struct { + Type string `json:"type"` } -type TurnStartResult struct { - TurnID string `json:"turnId"` +// ExecApprovalOp is the "exec_approval" submission operation. +type ExecApprovalOp struct { + Type string `json:"type"` + Approved bool `json:"approved"` } diff --git a/internal/appserver/types_thread_test.go b/internal/appserver/types_thread_test.go index a5290ff..6067610 100644 --- a/internal/appserver/types_thread_test.go +++ b/internal/appserver/types_thread_test.go @@ -5,70 +5,70 @@ import ( "testing" ) -func TestThreadStartParamsMarshal(t *testing.T) { - params := ThreadStartParams{ - Model: "gpt-4o", +func TestUserTurnOpMarshal(t *testing.T) { + op := UserTurnOp{ + Type: OpUserTurn, + Items: []UserInput{NewTextInput("hello")}, + Cwd: "/home/user", + ApprovalPolicy: "on-request", + SandboxPolicy: SandboxPolicyReadOnly(), + Model: "gpt-4o", } - data, err := json.Marshal(params) + data, err := json.Marshal(op) if err != nil { t.Fatal(err) } var parsed map[string]any json.Unmarshal(data, &parsed) + if parsed["type"] != "user_turn" { + t.Errorf("expected user_turn, got %v", parsed["type"]) + } if parsed["model"] != "gpt-4o" { - t.Errorf("expected model gpt-4o, got %v", parsed["model"]) + t.Errorf("expected gpt-4o, got %v", parsed["model"]) } } -func TestThreadStartParamsOmitsEmptyModel(t *testing.T) { - params := ThreadStartParams{} - data, err := json.Marshal(params) - if err != nil { - t.Fatal(err) +func TestNewTextInput(t *testing.T) { + input := NewTextInput("hello world") + if input.Type != "text" { + t.Errorf("expected text, got %s", input.Type) } - var parsed map[string]any - json.Unmarshal(data, &parsed) - if _, hasModel := parsed["model"]; hasModel { - t.Error("expected model to be omitted when empty") + if input.Text != "hello world" { + t.Errorf("expected hello world, got %s", input.Text) + } + if input.TextElements == nil { + t.Error("expected non-nil text_elements") } } -func TestThreadStartResultUnmarshal(t *testing.T) { - raw := `{"thread":{"id":"thr_abc123"}}` - var result ThreadStartResult - if err := json.Unmarshal([]byte(raw), &result); err != nil { - t.Fatal(err) - } - if result.Thread.ID != "thr_abc123" { - t.Errorf("expected thr_abc123, got %s", result.Thread.ID) +func TestSandboxPolicyReadOnly(t *testing.T) { + policy := SandboxPolicyReadOnly() + var parsed map[string]any + json.Unmarshal(policy, &parsed) + if parsed["type"] != "read-only" { + t.Errorf("expected read-only, got %v", parsed["type"]) } } -func TestThreadListResultUnmarshal(t *testing.T) { - raw := `{"threads":[{"id":"t-1","status":"active","title":"Test"}]}` - var result ThreadListResult - if err := json.Unmarshal([]byte(raw), &result); err != nil { - t.Fatal(err) - } - if len(result.Threads) != 1 { - t.Fatalf("expected 1 thread, got %d", len(result.Threads)) - } - if result.Threads[0].ID != "t-1" { - t.Errorf("expected id t-1, got %s", result.Threads[0].ID) +func TestSandboxPolicyWorkspaceWrite(t *testing.T) { + policy := SandboxPolicyWorkspaceWrite([]string{"/home/user/project"}) + var parsed map[string]any + json.Unmarshal(policy, &parsed) + if parsed["type"] != "workspace-write" { + t.Errorf("expected workspace-write, got %v", parsed["type"]) } - if result.Threads[0].Status != "active" { - t.Errorf("expected status active, got %s", result.Threads[0].Status) + roots := parsed["writable_roots"].([]any) + if len(roots) != 1 { + t.Fatalf("expected 1 root, got %d", len(roots)) } } -func TestThreadStatusValues(t *testing.T) { - if ThreadStatusActive != "active" { - t.Errorf("expected active, got %s", ThreadStatusActive) - } - if ThreadStatusCompleted != "completed" { - t.Errorf("expected completed, got %s", ThreadStatusCompleted) - } - if ThreadStatusError != "error" { - t.Errorf("expected error, got %s", ThreadStatusError) +func TestInterruptOpMarshal(t *testing.T) { + op := InterruptOp{Type: OpInterrupt} + data, _ := json.Marshal(op) + var parsed map[string]any + json.Unmarshal(data, &parsed) + if parsed["type"] != "interrupt" { + t.Errorf("expected interrupt, got %v", parsed["type"]) } } diff --git a/internal/config/config.go b/internal/config/config.go index 5a9120d..6d12d85 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -28,7 +28,7 @@ func Load(path string) (*Config, error) { viperInstance.SetConfigType("toml") viperInstance.SetDefault("appserver.command", DefaultAppServerCommand) - viperInstance.SetDefault("appserver.args", []string{"app-server"}) + viperInstance.SetDefault("appserver.args", []string{"proto"}) viperInstance.SetDefault("ui.theme", DefaultTheme) if path != "" { diff --git a/internal/tui/app.go b/internal/tui/app.go index 4afebea..0c8ecdd 100644 --- a/internal/tui/app.go +++ b/internal/tui/app.go @@ -2,7 +2,6 @@ package tui import ( "context" - "time" tea "github.com/charmbracelet/bubbletea" "github.com/charmbracelet/lipgloss" @@ -10,8 +9,6 @@ import ( "github.com/robinojw/dj/internal/state" ) -const connectTimeout = 10 * time.Second - const ( FocusCanvas = iota FocusTree @@ -77,20 +74,9 @@ func (app AppModel) Init() tea.Cmd { return AppServerErrorMsg{Err: err} } - go app.client.ReadLoop(app.client.Dispatch) - - initCtx, cancel := context.WithTimeout(context.Background(), connectTimeout) - defer cancel() - - caps, err := app.client.Initialize(initCtx) - if err != nil { - return AppServerErrorMsg{Err: err} - } + go app.client.ReadLoop() - return AppServerConnectedMsg{ - ServerName: caps.ServerInfo.Name, - ServerVersion: caps.ServerInfo.Version, - } + return nil } } @@ -122,6 +108,9 @@ func (app AppModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { case AppServerConnectedMsg: app.connected = true app.statusBar.SetConnected(true) + app.store.Add(msg.SessionID, msg.Model) + app.statusBar.SetThreadCount(len(app.store.All())) + app.statusBar.SetSelectedThread(msg.Model) return app, nil case AppServerErrorMsg: app.statusBar.SetError(msg.Error()) @@ -173,27 +162,15 @@ func (app AppModel) handleRune(msg tea.KeyMsg) (tea.Model, tea.Cmd) { app.helpVisible = !app.helpVisible case "n": if !app.connected { - app.statusBar.SetError("waiting for app-server — is codex CLI installed?") + app.statusBar.SetError("waiting for codex — is codex CLI installed?") return app, nil } - return app, app.createThreadCmd() + app.statusBar.SetError("single session mode — session auto-created on connect") + return app, nil } return app, nil } -func (app AppModel) createThreadCmd() tea.Cmd { - return func() tea.Msg { - result, err := app.client.StartThread(context.Background(), "") - if err != nil { - return AppServerErrorMsg{Err: err} - } - - return ThreadCreatedMsg{ - ThreadID: result.Thread.ID, - Title: "New thread", - } - } -} func (app AppModel) handleHelpKey(msg tea.KeyMsg) (tea.Model, tea.Cmd) { isToggle := msg.Type == tea.KeyRunes && msg.String() == "?" diff --git a/internal/tui/app_test.go b/internal/tui/app_test.go index 2cffe36..6675141 100644 --- a/internal/tui/app_test.go +++ b/internal/tui/app_test.go @@ -300,18 +300,35 @@ func TestAppNewThreadBlockedWhenDisconnected(t *testing.T) { } } -func TestAppNewThreadAllowedWhenConnected(t *testing.T) { +func TestAppConnectedAutoCreatesThread(t *testing.T) { store := state.NewThreadStore() app := NewAppModel(store, nil) - connectedMsg := AppServerConnectedMsg{ServerName: "test", ServerVersion: "1.0"} + connectedMsg := AppServerConnectedMsg{SessionID: "sess-1", Model: "gpt-4o"} + updated, _ := app.Update(connectedMsg) + app = updated.(AppModel) + + threads := store.All() + if len(threads) != 1 { + t.Fatalf("expected 1 thread after connect, got %d", len(threads)) + } + if threads[0].Title != "gpt-4o" { + t.Errorf("expected thread title gpt-4o, got %s", threads[0].Title) + } +} + +func TestAppNKeyShowsMessageWhenConnected(t *testing.T) { + store := state.NewThreadStore() + app := NewAppModel(store, nil) + + connectedMsg := AppServerConnectedMsg{SessionID: "sess-1", Model: "gpt-4o"} updated, _ := app.Update(connectedMsg) app = updated.(AppModel) nKey := tea.KeyMsg{Type: tea.KeyRunes, Runes: []rune{'n'}} _, cmd := app.Update(nKey) - if cmd == nil { - t.Error("expected command for thread creation when connected") + if cmd != nil { + t.Error("expected no command in single session mode") } } diff --git a/internal/tui/bridge.go b/internal/tui/bridge.go index 3636167..a2de8d6 100644 --- a/internal/tui/bridge.go +++ b/internal/tui/bridge.go @@ -9,45 +9,44 @@ type MessageSender interface { Send(msg tea.Msg) } -func WireEventBridge(router *appserver.NotificationRouter, sender MessageSender) { - router.OnThreadStatusChanged(func(params appserver.ThreadStatusChanged) { - sender.Send(ThreadStatusMsg{ - ThreadID: params.ThreadID, - Status: params.Status, - Title: params.Title, +func WireEventBridge(router *appserver.EventRouter, sender MessageSender) { + router.OnSessionConfigured(func(event appserver.SessionConfigured) { + sender.Send(AppServerConnectedMsg{ + SessionID: event.SessionID, + Model: event.Model, }) }) - router.OnItemStarted(func(params appserver.ItemStarted) { - sender.Send(ThreadMessageMsg{ - ThreadID: params.ThreadID, - MessageID: params.ItemID, - Role: params.Role, - Content: "", + router.OnAgentMessageDelta(func(event appserver.AgentMessageDelta) { + sender.Send(ThreadDeltaMsg{ + Delta: event.Delta, }) }) - router.OnItemMessageDelta(func(params appserver.ItemMessageDelta) { - sender.Send(ThreadDeltaMsg{ - ThreadID: params.ThreadID, - MessageID: params.ItemID, - Delta: params.Delta, + router.OnExecCommandBegin(func(event appserver.ExecCommandBegin) { + sender.Send(CommandOutputMsg{ + ExecID: event.ExecID, + Data: "$ " + event.Command + "\n", }) }) - router.OnCommandOutput(func(params appserver.CommandOutput) { + router.OnExecCommandOutputDelta(func(event appserver.ExecCommandOutputDelta) { sender.Send(CommandOutputMsg{ - ThreadID: params.ThreadID, - ExecID: params.ExecID, - Data: params.Data, + ExecID: event.ExecID, + Data: event.Delta, }) }) - router.OnCommandFinished(func(params appserver.CommandFinished) { + router.OnExecCommandEnd(func(event appserver.ExecCommandEnd) { sender.Send(CommandFinishedMsg{ - ThreadID: params.ThreadID, - ExecID: params.ExecID, - ExitCode: params.ExitCode, + ExecID: event.ExecID, + ExitCode: event.ExitCode, + }) + }) + + router.OnError(func(event appserver.ServerError) { + sender.Send(AppServerErrorMsg{ + Err: &appserver.RPCError{Message: event.Message}, }) }) } diff --git a/internal/tui/bridge_test.go b/internal/tui/bridge_test.go index 5047a35..cb8369a 100644 --- a/internal/tui/bridge_test.go +++ b/internal/tui/bridge_test.go @@ -1,6 +1,7 @@ package tui import ( + "encoding/json" "testing" tea "github.com/charmbracelet/bubbletea" @@ -15,85 +16,97 @@ func (mock *mockSender) Send(msg tea.Msg) { mock.messages = append(mock.messages, msg) } -func TestBridgeThreadStatusChanged(t *testing.T) { +func TestBridgeSessionConfigured(t *testing.T) { sender := &mockSender{} - router := appserver.NewNotificationRouter() + router := appserver.NewEventRouter() WireEventBridge(router, sender) - router.Handle(appserver.NotifyThreadStatusChanged, - []byte(`{"threadId":"t-1","status":"active","title":"Running"}`)) + event := appserver.Event{ + ID: "", + Msg: json.RawMessage(`{"type":"session_configured","session_id":"sess-1","model":"gpt-4o"}`), + } + router.HandleEvent(event) if len(sender.messages) != 1 { t.Fatalf("expected 1 message, got %d", len(sender.messages)) } - msg, ok := sender.messages[0].(ThreadStatusMsg) + msg, ok := sender.messages[0].(AppServerConnectedMsg) if !ok { - t.Fatalf("expected ThreadStatusMsg, got %T", sender.messages[0]) + t.Fatalf("expected AppServerConnectedMsg, got %T", sender.messages[0]) + } + if msg.SessionID != "sess-1" { + t.Errorf("expected sess-1, got %s", msg.SessionID) } - if msg.ThreadID != "t-1" { - t.Errorf("expected t-1, got %s", msg.ThreadID) + if msg.Model != "gpt-4o" { + t.Errorf("expected gpt-4o, got %s", msg.Model) } } -func TestBridgeItemStarted(t *testing.T) { +func TestBridgeAgentMessageDelta(t *testing.T) { sender := &mockSender{} - router := appserver.NewNotificationRouter() + router := appserver.NewEventRouter() WireEventBridge(router, sender) - router.Handle(appserver.NotifyItemStarted, - []byte(`{"threadId":"t-1","itemId":"item-1","role":"assistant","type":"message"}`)) + event := appserver.Event{ + ID: "sub-1", + Msg: json.RawMessage(`{"type":"agent_message_delta","delta":"hello"}`), + } + router.HandleEvent(event) if len(sender.messages) != 1 { t.Fatalf("expected 1 message, got %d", len(sender.messages)) } - msg, ok := sender.messages[0].(ThreadMessageMsg) + msg, ok := sender.messages[0].(ThreadDeltaMsg) if !ok { - t.Fatalf("expected ThreadMessageMsg, got %T", sender.messages[0]) - } - if msg.Role != "assistant" { - t.Errorf("expected assistant, got %s", msg.Role) + t.Fatalf("expected ThreadDeltaMsg, got %T", sender.messages[0]) } - if msg.MessageID != "item-1" { - t.Errorf("expected item-1, got %s", msg.MessageID) + if msg.Delta != "hello" { + t.Errorf("expected hello, got %s", msg.Delta) } } -func TestBridgeItemMessageDelta(t *testing.T) { +func TestBridgeExecCommandBegin(t *testing.T) { sender := &mockSender{} - router := appserver.NewNotificationRouter() + router := appserver.NewEventRouter() WireEventBridge(router, sender) - router.Handle(appserver.NotifyItemMessageDelta, - []byte(`{"threadId":"t-1","itemId":"item-1","delta":"hello"}`)) + event := appserver.Event{ + ID: "sub-1", + Msg: json.RawMessage(`{"type":"exec_command_begin","call_id":"cmd-1","command":"ls -la"}`), + } + router.HandleEvent(event) if len(sender.messages) != 1 { t.Fatalf("expected 1 message, got %d", len(sender.messages)) } - msg, ok := sender.messages[0].(ThreadDeltaMsg) + msg, ok := sender.messages[0].(CommandOutputMsg) if !ok { - t.Fatalf("expected ThreadDeltaMsg, got %T", sender.messages[0]) + t.Fatalf("expected CommandOutputMsg, got %T", sender.messages[0]) } - if msg.Delta != "hello" { - t.Errorf("expected hello, got %s", msg.Delta) + if msg.ExecID != "cmd-1" { + t.Errorf("expected cmd-1, got %s", msg.ExecID) } } -func TestBridgeCommandOutput(t *testing.T) { +func TestBridgeServerError(t *testing.T) { sender := &mockSender{} - router := appserver.NewNotificationRouter() + router := appserver.NewEventRouter() WireEventBridge(router, sender) - router.Handle(appserver.NotifyCommandOutput, - []byte(`{"threadId":"t-1","execId":"e-1","data":"hello\n"}`)) + event := appserver.Event{ + ID: "", + Msg: json.RawMessage(`{"type":"error","message":"something broke"}`), + } + router.HandleEvent(event) if len(sender.messages) != 1 { t.Fatalf("expected 1 message, got %d", len(sender.messages)) } - msg, ok := sender.messages[0].(CommandOutputMsg) + msg, ok := sender.messages[0].(AppServerErrorMsg) if !ok { - t.Fatalf("expected CommandOutputMsg, got %T", sender.messages[0]) + t.Fatalf("expected AppServerErrorMsg, got %T", sender.messages[0]) } - if msg.Data != "hello\n" { - t.Errorf("expected hello, got %s", msg.Data) + if msg.Error() != "something broke" { + t.Errorf("expected 'something broke', got %s", msg.Error()) } } diff --git a/internal/tui/integration_test.go b/internal/tui/integration_test.go index 5333835..bf1570b 100644 --- a/internal/tui/integration_test.go +++ b/internal/tui/integration_test.go @@ -12,7 +12,7 @@ import ( ) func TestIntegrationEndToEnd(t *testing.T) { - client := appserver.NewClient("codex", "app-server", "--listen", "stdio://") + client := appserver.NewClient("codex", "proto") ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() @@ -22,27 +22,27 @@ func TestIntegrationEndToEnd(t *testing.T) { } defer client.Stop() - router := appserver.NewNotificationRouter() + received := make(chan appserver.SessionConfigured, 1) + router := appserver.NewEventRouter() + router.OnSessionConfigured(func(event appserver.SessionConfigured) { + received <- event + }) client.Router = router - go client.ReadLoop(client.Dispatch) - caps, err := client.Initialize(ctx) - if err != nil { - t.Fatalf("Initialize failed: %v", err) - } - t.Logf("Connected: %s %s", caps.ServerInfo.Name, caps.ServerInfo.Version) + go client.ReadLoop() - store := state.NewThreadStore() + select { + case event := <-received: + t.Logf("Connected: session=%s model=%s", event.SessionID, event.Model) - result, err := client.CreateThread(ctx, "Say hello") - if err != nil { - t.Fatalf("CreateThread failed: %v", err) - } - store.Add(result.ThreadID, "Say hello") - t.Logf("Created thread: %s", result.ThreadID) + store := state.NewThreadStore() + store.Add(event.SessionID, event.Model) - threads := store.All() - if len(threads) != 1 { - t.Fatalf("expected 1 thread, got %d", len(threads)) + threads := store.All() + if len(threads) != 1 { + t.Fatalf("expected 1 thread, got %d", len(threads)) + } + case <-ctx.Done(): + t.Fatal("timeout waiting for session_configured") } } diff --git a/internal/tui/msgs.go b/internal/tui/msgs.go index bc673c7..a68673f 100644 --- a/internal/tui/msgs.go +++ b/internal/tui/msgs.go @@ -41,8 +41,8 @@ type ThreadDeletedMsg struct { } type AppServerConnectedMsg struct { - ServerName string - ServerVersion string + SessionID string + Model string } type AppServerErrorMsg struct {