Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions cmd/dj/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
tea "github.com/charmbracelet/bubbletea"
"github.com/spf13/cobra"

"github.com/robinojw/dj/internal/appserver"
"github.com/robinojw/dj/internal/config"
"github.com/robinojw/dj/internal/state"
"github.com/robinojw/dj/internal/tui"
Expand Down Expand Up @@ -37,12 +38,18 @@ func runApp(cmd *cobra.Command, args []string) error {
return fmt.Errorf("load config: %w", err)
}

_ = cfg
client := appserver.NewClient(cfg.AppServer.Command, cfg.AppServer.Args...)
defer client.Stop()

router := appserver.NewEventRouter()
client.Router = router

store := state.NewThreadStore()
app := tui.NewAppModel(store)
app := tui.NewAppModel(store, client)

program := tea.NewProgram(app, tea.WithAltScreen())
tui.WireEventBridge(router, program)

_, err = program.Run()
return err
}
173 changes: 26 additions & 147 deletions internal/appserver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"sync/atomic"
)

// Client manages a child app-server process and bidirectional JSON-RPC communication.
// Client manages a child codex proto process and bidirectional JSONL communication.
type Client struct {
command string
args []string
Expand All @@ -21,35 +21,25 @@ type Client struct {
stdout io.ReadCloser
scanner *bufio.Scanner

mu sync.Mutex // protects writes to stdin
nextID atomic.Int64
pending sync.Map // id → chan *Message
mu sync.Mutex
nextID atomic.Int64

running atomic.Bool

// OnNotification is called for each server notification (no id).
// Set this before calling Start.
OnNotification func(method string, params json.RawMessage)

// OnServerRequest is called for server-to-client requests (has id).
// Set this before calling Start.
OnServerRequest func(id int, method string, params json.RawMessage)

// Router dispatches typed notifications by method name.
// Falls back to OnNotification for unregistered methods.
Router *NotificationRouter
Router *EventRouter
}

const scannerBufferSize = 1024 * 1024

// NewClient creates a client that will spawn the given command.
// Additional arguments can be passed after the command.
func NewClient(command string, args ...string) *Client {
return &Client{
command: command,
args: args,
}
}

// Start spawns the child process and begins reading stdout.
// Start spawns the child process.
func (c *Client) Start(ctx context.Context) error {
c.cmd = exec.CommandContext(ctx, c.command, c.args...)

Expand All @@ -65,7 +55,7 @@ func (c *Client) Start(ctx context.Context) error {
}

c.scanner = bufio.NewScanner(c.stdout)
c.scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 1MB max line
c.scanner.Buffer(make([]byte, scannerBufferSize), scannerBufferSize)

if err := c.cmd.Start(); err != nil {
return fmt.Errorf("start process: %w", err)
Expand All @@ -87,7 +77,6 @@ func (c *Client) Stop() error {
}
c.running.Store(false)

// Close stdin to signal EOF to the child
if c.stdin != nil {
c.stdin.Close()
}
Expand All @@ -98,11 +87,11 @@ func (c *Client) Stop() error {
return nil
}

// Send writes a JSON-RPC request to the child's stdin as a JSONL line.
func (c *Client) Send(req *Request) error {
data, err := json.Marshal(req)
// Send writes a Submission to the child's stdin as a JSONL line.
func (c *Client) Send(sub *Submission) error {
data, err := json.Marshal(sub)
if err != nil {
return fmt.Errorf("marshal request: %w", err)
return fmt.Errorf("marshal submission: %w", err)
}

c.mu.Lock()
Expand All @@ -113,138 +102,28 @@ func (c *Client) Send(req *Request) error {
return err
}

// ReadLoop reads JSONL from stdout and dispatches each message to the callback.
// It blocks until the scanner is exhausted (stdout closed) or an error occurs.
func (c *Client) ReadLoop(handler func(Message)) {
// NextID generates a unique string ID for a submission.
func (c *Client) NextID() string {
return fmt.Sprintf("sub-%d", c.nextID.Add(1))
}

// ReadLoop reads JSONL events from stdout and dispatches to the router.
// Blocks until stdout is closed or an error occurs.
func (c *Client) ReadLoop() {
for c.scanner.Scan() {
line := c.scanner.Bytes()
if len(line) == 0 {
continue
}

var msg Message
if err := json.Unmarshal(line, &msg); err != nil {
continue // skip malformed lines
}

handler(msg)
}
}

// Call sends a request and blocks until the response with the matching ID arrives.
func (c *Client) Call(ctx context.Context, method string, params json.RawMessage) (*Message, error) {
id := int(c.nextID.Add(1))

ch := make(chan *Message, 1)
c.pending.Store(id, ch)
defer c.pending.Delete(id)

req := &Request{
JSONRPC: "2.0",
ID: &id,
Method: method,
Params: params,
}

if err := c.Send(req); err != nil {
return nil, err
}

select {
case msg := <-ch:
return msg, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}

// Dispatch routes an incoming message to the appropriate handler:
// - Messages with an ID matching a pending request -> resolve the pending Call
// - Messages with an ID but no pending request -> server-to-client request (OnServerRequest)
// - Messages without an ID -> notification (OnNotification)
func (c *Client) Dispatch(msg Message) {
if msg.ID != nil {
// Check if this resolves a pending call
if ch, ok := c.pending.LoadAndDelete(*msg.ID); ok {
ch.(chan *Message) <- &msg
return
}

// Server-to-client request
if c.OnServerRequest != nil && msg.Method != "" {
c.OnServerRequest(*msg.ID, msg.Method, msg.Params)
var event Event
if err := json.Unmarshal(line, &event); err != nil {
continue
}
return
}

if msg.Method == "" {
return
}

if c.Router != nil {
c.Router.Handle(msg.Method, msg.Params)
}

if c.OnNotification != nil {
c.OnNotification(msg.Method, msg.Params)
}
}

// InitializeParams is sent as the first request to the app-server.
type InitializeParams struct {
ClientInfo ClientInfo `json:"clientInfo"`
}

// ClientInfo identifies this client to the app-server.
type ClientInfo struct {
Name string `json:"name"`
Title string `json:"title"`
Version string `json:"version"`
}

// ServerCapabilities is the result of the initialize request.
type ServerCapabilities struct {
ServerInfo struct {
Name string `json:"name"`
Version string `json:"version"`
} `json:"serverInfo"`
}

// Initialize performs the required handshake with the app-server.
// Sends initialize request, receives capabilities, then sends initialized notification.
func (c *Client) Initialize(ctx context.Context) (*ServerCapabilities, error) {
params, _ := json.Marshal(InitializeParams{
ClientInfo: ClientInfo{
Name: "dj",
Title: "DJ — Codex TUI Visualizer",
Version: "0.1.0",
},
})

resp, err := c.Call(ctx, "initialize", params)
if err != nil {
return nil, fmt.Errorf("initialize request: %w", err)
}

if resp.Error != nil {
return nil, fmt.Errorf("initialize error: %s", resp.Error.Message)
}

var caps ServerCapabilities
if resp.Result != nil {
if err := json.Unmarshal(resp.Result, &caps); err != nil {
return nil, fmt.Errorf("unmarshal capabilities: %w", err)
if c.Router != nil {
c.Router.HandleEvent(event)
}
}

// Send the initialized notification (no id, no response expected)
notif := &Request{
JSONRPC: "2.0",
Method: "initialized",
}
if err := c.Send(notif); err != nil {
return nil, fmt.Errorf("send initialized: %w", err)
}

return &caps, nil
c.running.Store(false)
}
Loading
Loading