From 48fdd96b9e08488aff46d815d7d7e56da4f4dbd9 Mon Sep 17 00:00:00 2001 From: pnkcaht Date: Mon, 19 Jan 2026 20:59:00 -0500 Subject: [PATCH 1/3] runtime: add session-level observability plumbing Signed-off-by: pnkcaht --- pkg/runtime/connectrpc_client.go | 11 +++-- pkg/runtime/event.go | 71 ++++++++++++++++++++++++++------ pkg/server/session_manager.go | 60 ++++++++++++++++++++++++--- pkg/session/session.go | 37 +++++++++++++++-- proto/cagent/v1/cagent.proto | 12 ++++++ 5 files changed, 165 insertions(+), 26 deletions(-) diff --git a/pkg/runtime/connectrpc_client.go b/pkg/runtime/connectrpc_client.go index bf6f95c9e..c5bef08cf 100644 --- a/pkg/runtime/connectrpc_client.go +++ b/pkg/runtime/connectrpc_client.go @@ -349,11 +349,14 @@ func (c *ConnectRPCClient) convertProtoEventToRuntimeEvent(e *cagentv1.Event) Ev LastMessage: convertProtoMessageUsage(ev.TokenUsage.Usage.LastMessage), } } + return &TokenUsageEvent{ - Type: "token_usage", - SessionID: ev.TokenUsage.SessionId, - Usage: usage, - AgentContext: AgentContext{AgentName: ev.TokenUsage.AgentName}, + Type: "token_usage", + SessionID: ev.TokenUsage.SessionId, + Usage: usage, + AgentContext: AgentContext{ + AgentName: ev.TokenUsage.AgentName, + }, } case *cagentv1.Event_SessionTitle: diff --git a/pkg/runtime/event.go b/pkg/runtime/event.go index def29377c..2cd62a684 100644 --- a/pkg/runtime/event.go +++ b/pkg/runtime/event.go @@ -2,6 +2,7 @@ package runtime import ( "cmp" + "time" "github.com/docker/cagent/pkg/chat" "github.com/docker/cagent/pkg/tools" @@ -191,9 +192,14 @@ type TokenUsageEvent struct { Type string `json:"type"` SessionID string `json:"session_id"` Usage *Usage `json:"usage"` + AgentContext } +func (*TokenUsageEvent) GetType() string { + return "token_usage" +} + type Usage struct { InputTokens int64 `json:"input_tokens"` OutputTokens int64 `json:"output_tokens"` @@ -211,11 +217,29 @@ type MessageUsage struct { Model string } -func TokenUsage(sessionID, agentName string, inputTokens, outputTokens, contextLength, contextLimit int64, cost float64) Event { - return TokenUsageWithMessage(sessionID, agentName, inputTokens, outputTokens, contextLength, contextLimit, cost, nil) -} - -func TokenUsageWithMessage(sessionID, agentName string, inputTokens, outputTokens, contextLength, contextLimit int64, cost float64, msgUsage *MessageUsage) Event { +func TokenUsage( + sessionID, agentName string, + inputTokens, outputTokens, contextLength, contextLimit int64, + cost float64, +) Event { + return TokenUsageWithMessage( + sessionID, + agentName, + inputTokens, + outputTokens, + contextLength, + contextLimit, + cost, + nil, + ) +} + +func TokenUsageWithMessage( + sessionID, agentName string, + inputTokens, outputTokens, contextLength, contextLimit int64, + cost float64, + msgUsage *MessageUsage, +) Event { return &TokenUsageEvent{ Type: "token_usage", SessionID: sessionID, @@ -231,13 +255,6 @@ func TokenUsageWithMessage(sessionID, agentName string, inputTokens, outputToken } } -type SessionTitleEvent struct { - Type string `json:"type"` - SessionID string `json:"session_id"` - Title string `json:"title"` - AgentContext -} - func SessionTitle(sessionID, title string) Event { return &SessionTitleEvent{ Type: "session_title", @@ -523,3 +540,33 @@ func HookBlocked(toolCall tools.ToolCall, toolDefinition tools.Tool, message, ag AgentContext: AgentContext{AgentName: agentName}, } } + +type SessionMetricsEvent struct { + Type string `json:"type"` // "session_metrics" + + SessionID string `json:"session_id"` + + UserMessages int `json:"user_messages"` + AssistantMessages int `json:"assistant_messages"` + ToolCalls int `json:"tool_calls"` + ToolErrors int `json:"tool_errors"` + + StartedAt time.Time `json:"started_at"` + EndedAt time.Time `json:"ended_at"` +} + +func (*SessionMetricsEvent) GetType() string { + return "session_metrics" +} + +type SessionTitleEvent struct { + Type string `json:"type"` + SessionID string `json:"session_id"` + Title string `json:"title"` + + AgentContext +} + +func (*SessionTitleEvent) GetType() string { + return "session_title" +} diff --git a/pkg/server/session_manager.go b/pkg/server/session_manager.go index 44d27adbf..fbb30024b 100644 --- a/pkg/server/session_manager.go +++ b/pkg/server/session_manager.go @@ -130,29 +130,48 @@ func (sm *SessionManager) DeleteSession(ctx context.Context, sessionID string) e } // RunSession runs a session with the given messages. -func (sm *SessionManager) RunSession(ctx context.Context, sessionID, agentFilename, currentAgent string, messages []api.Message) (<-chan runtime.Event, error) { +func (sm *SessionManager) RunSession( + ctx context.Context, + sessionID, agentFilename, currentAgent string, + messages []api.Message, +) (<-chan runtime.Event, error) { + sm.mux.Lock() - defer sm.mux.Unlock() + + // Load persisted session sess, err := sm.sessionStore.GetSession(ctx, sessionID) if err != nil { + sm.mux.Unlock() return nil, err } + // Mark execution start (observability only) + sess.Metrics = session.SessionMetrics{} + sess.Metrics.StartedAt = time.Now() + + // Clone runtime config and inherit working dir rc := sm.runConfig.Clone() rc.WorkingDir = sess.WorkingDir + + // Append user messages and count them for _, msg := range messages { sess.AddMessage(session.UserMessage(msg.Content, msg.MultiContent...)) + sess.Metrics.UserMessages++ } if err := sm.sessionStore.UpdateSession(ctx, sess); err != nil { + sm.mux.Unlock() return nil, err } + // Get or create runtime for this session runtimeSession, exists := sm.runtimeSessions.Load(sessionID) streamCtx, cancel := context.WithCancel(ctx) + if !exists { rt, err := sm.runtimeForSession(ctx, sess, agentFilename, currentAgent, rc) if err != nil { + sm.mux.Unlock() cancel() return nil, err } @@ -163,22 +182,51 @@ func (sm *SessionManager) RunSession(ctx context.Context, sessionID, agentFilena sm.runtimeSessions.Store(sessionID, runtimeSession) } + sm.mux.Unlock() + streamChan := make(chan runtime.Event) go func() { - stream := runtimeSession.runtime.RunStream(streamCtx, sess) defer cancel() defer close(streamChan) + + stream := runtimeSession.runtime.RunStream(streamCtx, sess) + for event := range stream { if streamCtx.Err() != nil { return } + + // Collect session-level observability metrics + if e, ok := event.(interface{ GetType() string }); ok { + switch e.GetType() { + case "assistant_message": + sess.Metrics.AssistantMessages++ + case "tool_call": + sess.Metrics.ToolCalls++ + case "tool_error": + sess.Metrics.ToolErrors++ + } + } + streamChan <- event } - if err := sm.sessionStore.UpdateSession(ctx, sess); err != nil { - return - } + // Mark execution end + sess.Metrics.EndedAt = time.Now() + + streamChan <- runtime.TokenUsage( + sess.ID, + currentAgent, + sess.InputTokens, + sess.OutputTokens, + 0, + 0, + sess.Cost, + ) + + // Persist updated session state (metrics are ephemeral) + _ = sm.sessionStore.UpdateSession(ctx, sess) }() return streamChan, nil diff --git a/pkg/session/session.go b/pkg/session/session.go index 7fba82940..a765a8ad6 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -45,8 +45,38 @@ func (si *Item) IsSubSession() bool { return si.SubSession != nil } +// SessionMetrics holds runtime-level metrics collected during a session. +// These metrics are not persisted and are intended for observability, +// debugging, and UX purposes. +type SessionMetrics struct { + // StartedAt is the time when the session execution began. + StartedAt time.Time + + // EndedAt is the time when the session execution finished. + EndedAt time.Time + + // UserMessages is the number of user messages sent during the session. + UserMessages int + + // AssistantMessages is the number of assistant messages generated. + AssistantMessages int + + // ToolCalls is the total number of tool invocations. + ToolCalls int + + // ToolErrors is the number of failed tool invocations. + ToolErrors int +} + +// Reset clears all metrics. +// MUST be called at the beginning of each RunSession execution. +func (m *SessionMetrics) Reset() { + *m = SessionMetrics{} +} + // Session represents the agent's state including conversation history and variables type Session struct { + // ID is the unique identifier for the session ID string `json:"id"` @@ -59,13 +89,13 @@ type Session struct { // CreatedAt is the time the session was created CreatedAt time.Time `json:"created_at"` + // Metrics holds performance and interaction metrics for this session + Metrics SessionMetrics `json:"-"` + // ToolsApproved is a flag to indicate if the tools have been approved ToolsApproved bool `json:"tools_approved"` // Thinking is a session-level flag to enable thinking/interleaved thinking - // defaults for all providers. When false, providers will not apply auto-thinking budgets - // or interleaved thinking, regardless of model config. This is controlled by the /think - // command in the TUI. Defaults to true (thinking enabled). Thinking bool `json:"thinking"` // HideToolResults is a flag to indicate if tool results should be hidden @@ -94,7 +124,6 @@ type Session struct { // AgentModelOverrides stores per-agent model overrides for this session. // Key is the agent name, value is the model reference (e.g., "openai/gpt-4o" or a named model from config). - // When a session is loaded, these overrides are reapplied to the runtime. AgentModelOverrides map[string]string `json:"agent_model_overrides,omitempty"` // CustomModelsUsed tracks custom models (provider/model format) used during this session. diff --git a/proto/cagent/v1/cagent.proto b/proto/cagent/v1/cagent.proto index cf0ef8382..91945d4ec 100644 --- a/proto/cagent/v1/cagent.proto +++ b/proto/cagent/v1/cagent.proto @@ -2,6 +2,8 @@ syntax = "proto3"; package cagent.v1; +import "google/protobuf/timestamp.proto"; + option go_package = "github.com/docker/cagent/gen/proto/cagent/v1;cagentv1"; // AgentService provides operations for managing agents and sessions. @@ -337,6 +339,16 @@ message TokenUsageEvent { string session_id = 1; Usage usage = 2; string agent_name = 3; + SessionMetrics metrics = 4; +} + +message SessionMetrics { + int32 user_messages = 1; + int32 assistant_messages = 2; + int32 tool_calls = 3; + int32 tool_errors = 4; + google.protobuf.Timestamp started_at = 5; + google.protobuf.Timestamp ended_at = 6; } // LastMessageUsage contains per-message usage data for the last message. From 97564b293f2752103cab6c663e4f9b21b16fbef9 Mon Sep 17 00:00:00 2001 From: pnkcaht Date: Tue, 20 Jan 2026 14:00:06 -0500 Subject: [PATCH 2/3] fix: gofumpt formatting + rename SessionMetrics to Metrics Signed-off-by: pnkcaht --- pkg/server/session_manager.go | 3 +-- pkg/session/session.go | 11 +++++------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/pkg/server/session_manager.go b/pkg/server/session_manager.go index fbb30024b..f52739d65 100644 --- a/pkg/server/session_manager.go +++ b/pkg/server/session_manager.go @@ -135,7 +135,6 @@ func (sm *SessionManager) RunSession( sessionID, agentFilename, currentAgent string, messages []api.Message, ) (<-chan runtime.Event, error) { - sm.mux.Lock() // Load persisted session @@ -146,7 +145,7 @@ func (sm *SessionManager) RunSession( } // Mark execution start (observability only) - sess.Metrics = session.SessionMetrics{} + sess.Metrics = session.Metrics{} sess.Metrics.StartedAt = time.Now() // Clone runtime config and inherit working dir diff --git a/pkg/session/session.go b/pkg/session/session.go index a765a8ad6..5b7d2a36d 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -45,10 +45,10 @@ func (si *Item) IsSubSession() bool { return si.SubSession != nil } -// SessionMetrics holds runtime-level metrics collected during a session. +// Metrics holds runtime-level metrics collected during a session. // These metrics are not persisted and are intended for observability, // debugging, and UX purposes. -type SessionMetrics struct { +type Metrics struct { // StartedAt is the time when the session execution began. StartedAt time.Time @@ -70,13 +70,12 @@ type SessionMetrics struct { // Reset clears all metrics. // MUST be called at the beginning of each RunSession execution. -func (m *SessionMetrics) Reset() { - *m = SessionMetrics{} +func (m *Metrics) Reset() { + *m = Metrics{} } // Session represents the agent's state including conversation history and variables type Session struct { - // ID is the unique identifier for the session ID string `json:"id"` @@ -90,7 +89,7 @@ type Session struct { CreatedAt time.Time `json:"created_at"` // Metrics holds performance and interaction metrics for this session - Metrics SessionMetrics `json:"-"` + Metrics Metrics `json:"-"` // ToolsApproved is a flag to indicate if the tools have been approved ToolsApproved bool `json:"tools_approved"` From 861db660be3ba3951b6600791648d710d6c8fd01 Mon Sep 17 00:00:00 2001 From: pnkcaht Date: Sat, 24 Jan 2026 10:48:48 -0500 Subject: [PATCH 3/3] config: validate alias input and document SetAlias behavior Signed-off-by: pnkcaht --- pkg/fake/proxy.go | 15 ++++++++++++--- pkg/userconfig/userconfig.go | 8 ++++++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/pkg/fake/proxy.go b/pkg/fake/proxy.go index 1a16e34d9..c3c20bcf6 100644 --- a/pkg/fake/proxy.go +++ b/pkg/fake/proxy.go @@ -211,7 +211,14 @@ func Handle(transport http.RoundTripper, headerUpdater func(host string, req *ht func StreamCopy(c echo.Context, resp *http.Response) error { ctx := c.Request().Context() - writer := c.Response().Writer.(io.ReaderFrom) + w := c.Response().Writer + + rf, ok := w.(io.ReaderFrom) + if !ok { + // fallback seguro + _, err := io.Copy(w, resp.Body) + return err + } for { select { @@ -219,9 +226,11 @@ func StreamCopy(c echo.Context, resp *http.Response) error { slog.WarnContext(ctx, "client disconnected, stop streaming") return nil default: - n, err := writer.ReadFrom(io.LimitReader(resp.Body, 256)) + n, err := rf.ReadFrom(io.LimitReader(resp.Body, 256)) if n > 0 { - c.Response().Flush() // keep flushing to client + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } } if err != nil { if err == io.EOF || ctx.Err() != nil { diff --git a/pkg/userconfig/userconfig.go b/pkg/userconfig/userconfig.go index c0614222e..624ef0f50 100644 --- a/pkg/userconfig/userconfig.go +++ b/pkg/userconfig/userconfig.go @@ -11,6 +11,7 @@ import ( "os" "path/filepath" "regexp" + "sync" "github.com/goccy/go-yaml" "github.com/natefinch/atomic" @@ -44,6 +45,8 @@ type Config struct { ModelsGateway string `yaml:"models_gateway,omitempty"` // Aliases maps alias names to alias configurations Aliases map[string]*Alias `yaml:"aliases,omitempty"` + + mu sync.RWMutex } // Path returns the path to the config file @@ -187,6 +190,7 @@ func ValidateAliasName(name string) error { // SetAlias creates or updates an alias. // Returns an error if the alias name is invalid. +// This method is concurrency-safe. func (c *Config) SetAlias(name string, alias *Alias) error { if err := ValidateAliasName(name); err != nil { return err @@ -194,6 +198,10 @@ func (c *Config) SetAlias(name string, alias *Alias) error { if alias == nil || alias.Path == "" { return errors.New("agent path cannot be empty") } + + c.mu.Lock() + defer c.mu.Unlock() + c.Aliases[name] = alias return nil }