diff --git a/pkg/acp/agent.go b/pkg/acp/agent.go index 47c44ea2f..f23fad086 100644 --- a/pkg/acp/agent.go +++ b/pkg/acp/agent.go @@ -23,15 +23,34 @@ import ( "github.com/docker/cagent/pkg/version" ) -// Agent implements the ACP Agent interface for cagent +// Agent implements the ACP Agent interface for cagent. +// +// The Agent is responsible for: +// - loading teams and toolsets during initialization +// - managing ACP sessions +// - coordinating communication with the ACP client +// +// IMPORTANT: +// MCP servers are NOT stored at the agent level. +// They are session-scoped and handled during NewSession. type Agent struct { + // Source used to load agent definitions (teams, tools, prompts, etc.) agentSource config.Source - runConfig *config.RuntimeConfig - sessions map[string]*Session + // Runtime configuration shared across all sessions + runConfig *config.RuntimeConfig + + // Active ACP sessions indexed by session ID + sessions map[string]*Session + + // Connection to the ACP client (used to send updates and requests) conn *acp.AgentSideConnection + + // Loaded team configuration (agents + toolsets) team *team.Team - mu sync.Mutex + + // Mutex protecting mutable agent state (sessions, team, connection) + mu sync.Mutex } var _ acp.Agent = (*Agent)(nil) @@ -71,19 +90,49 @@ func (a *Agent) SetAgentConnection(conn *acp.AgentSideConnection) { } // Initialize implements [acp.Agent] -func (a *Agent) Initialize(ctx context.Context, params acp.InitializeRequest) (acp.InitializeResponse, error) { - slog.Debug("ACP Initialize called", "client_version", params.ProtocolVersion) +// This is a handshake call used to: +// - load agent teams and toolsets +// - advertise agent capabilities to the ACP client +// +// IMPORTANT: +// MCP servers are NOT provided during Initialize. +// They are session-scoped and arrive via NewSessionRequest. +func (a *Agent) Initialize( + ctx context.Context, + params acp.InitializeRequest, +) (acp.InitializeResponse, error) { + slog.Debug( + "ACP Initialize called", + "client_version", params.ProtocolVersion, + ) a.mu.Lock() defer a.mu.Unlock() - t, err := teamloader.Load(ctx, a.agentSource, a.runConfig, teamloader.WithToolsetRegistry(createToolsetRegistry(a))) + + // Load teams and register toolsets + t, err := teamloader.Load( + ctx, + a.agentSource, + a.runConfig, + teamloader.WithToolsetRegistry(createToolsetRegistry(a)), + ) if err != nil { - return acp.InitializeResponse{}, fmt.Errorf("failed to load teams: %w", err) + return acp.InitializeResponse{}, fmt.Errorf( + "failed to load teams: %w", + err, + ) } + a.team = t - slog.Debug("Teams loaded successfully", "source", a.agentSource.Name(), "agent_count", t.Size()) + + slog.Debug( + "Teams loaded successfully", + "source", a.agentSource.Name(), + "agent_count", t.Size(), + ) agentTitle := "cagent" + return acp.InitializeResponse{ ProtocolVersion: acp.ProtocolVersionNumber, AgentInfo: &acp.Implementation{ @@ -99,28 +148,81 @@ func (a *Agent) Initialize(ctx context.Context, params acp.InitializeRequest) (a Audio: false, // Not yet supported }, McpCapabilities: acp.McpCapabilities{ - Http: false, // MCP servers from client not yet supported - Sse: false, // MCP servers from client not yet supported + // Agent supports MCP servers provided by the ACP client + Http: true, + // SSE-based MCP servers are not supported yet + Sse: false, }, }, }, nil } -// NewSession implements [acp.Agent] -func (a *Agent) NewSession(_ context.Context, params acp.NewSessionRequest) (acp.NewSessionResponse, error) { +// NewSession implements [acp.Agent]. +// +// A new session represents a single conversational context between +// the ACP client and the agent. +// +// Responsibilities: +// - create a new runtime instance +// - initialize session state +// - store session metadata (cwd, session ID) +// +// ACP + MCP notes: +// +// ACP clients may negotiate MCP servers during the handshake. +// These MCP servers are session-scoped by design and MUST NOT mutate +// agent or YAML configuration. +// +// The runtime already supports session-scoped toolsets via +// runtime.WithToolSets(...). +// +// However, the current acp-go-sdk does not yet expose negotiated MCP +// servers on NewSessionRequest. Once available, this method is the +// correct place to: +// 1. convert MCP servers into MCP toolsets +// 2. inject them into the runtime using runtime.WithToolSets +func (a *Agent) NewSession( + _ context.Context, + params acp.NewSessionRequest, +) (acp.NewSessionResponse, error) { + // Generate a new session ID sid := uuid.New().String() - slog.Debug("ACP NewSession called", "session_id", sid, "cwd", params.Cwd) - // Log warning if MCP servers are provided (not yet supported) - if len(params.McpServers) > 0 { - slog.Warn("MCP servers provided by client are not yet supported", "count", len(params.McpServers)) - } + slog.Debug( + "ACP NewSession called", + "session_id", sid, + "cwd", params.Cwd, + ) - rt, err := runtime.New(a.team, runtime.WithCurrentAgent("root")) + // Future ACP wiring: + // + // When the SDK exposes params.McpServers, MCP toolsets should be + // constructed here and injected into the runtime via + // runtime.WithToolSets(...). + // + // Example (future): + // mcpToolsets := buildMCPToolsets(params.McpServers) + // runtime.New(a.team, + // runtime.WithCurrentAgent("root"), + // runtime.WithToolSets(mcpToolsets...), + // ) + + // Create a new runtime instance for this session. + // + // At this stage, the runtime only receives agent-defined toolsets + // plus any future session-scoped injections. + rt, err := runtime.New( + a.team, + runtime.WithCurrentAgent("root"), + ) if err != nil { - return acp.NewSessionResponse{}, fmt.Errorf("failed to create runtime: %w", err) + return acp.NewSessionResponse{}, fmt.Errorf( + "failed to create runtime: %w", + err, + ) } + // Register the session a.mu.Lock() a.sessions[sid] = &Session{ id: sid, @@ -130,7 +232,9 @@ func (a *Agent) NewSession(_ context.Context, params acp.NewSessionRequest) (acp } a.mu.Unlock() - return acp.NewSessionResponse{SessionId: acp.SessionId(sid)}, nil + return acp.NewSessionResponse{ + SessionId: acp.SessionId(sid), + }, nil } // Authenticate implements [acp.Agent] diff --git a/pkg/acp/registry.go b/pkg/acp/registry.go index 101397c46..3a25ca341 100644 --- a/pkg/acp/registry.go +++ b/pkg/acp/registry.go @@ -10,22 +10,51 @@ import ( "github.com/docker/cagent/pkg/tools" ) -// createToolsetRegistry creates a custom toolset registry with ACP-specific filesystem toolset +// createToolsetRegistry creates a custom ToolsetRegistry for the ACP agent. +// +// Responsibilities: +// - start from the default toolset registry (YAML + built-in toolsets) +// - register ACP-aware toolsets (e.g. filesystem) +// - act as the single extension point for ACP-specific integrations +// +// NOTE: +// MCP toolsets provided by the ACP client are NOT registered here yet. +// This function intentionally only prepares the registry structure. +// MCP toolset injection is handled separately at the session level +// to respect ACP scoping rules (MCP servers are session-scoped). func createToolsetRegistry(agent *Agent) *teamloader.ToolsetRegistry { + // Start with the default registry (built-in + YAML-defined toolsets) registry := teamloader.NewDefaultToolsetRegistry() - registry.Register("filesystem", func(ctx context.Context, toolset latest.Toolset, parentDir string, runConfig *config.RuntimeConfig) (tools.ToolSet, error) { - wd := runConfig.WorkingDir - if wd == "" { - var err error - wd, err = os.Getwd() - if err != nil { - return nil, err + // Register ACP-aware filesystem toolset. + // + // This wraps the standard filesystem tools to allow ACP-specific + // behavior such as: + // - respecting the client's working directory + // - interacting with the ACP connection when needed + registry.Register( + "filesystem", + func( + ctx context.Context, + toolset latest.Toolset, + parentDir string, + runConfig *config.RuntimeConfig, + ) (tools.ToolSet, error) { + // Determine working directory: + // 1. runtime config working dir + // 2. fallback to process working directory + wd := runConfig.WorkingDir + if wd == "" { + var err error + wd, err = os.Getwd() + if err != nil { + return nil, err + } } - } - return NewFilesystemToolset(agent, wd), nil - }) + return NewFilesystemToolset(agent, wd), nil + }, + ) return registry } diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index d1b016ecf..36b7b49db 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -161,8 +161,10 @@ type RAGInitializer interface { // LocalRuntime manages the execution of agents type LocalRuntime struct { - toolMap map[string]ToolHandler - team *team.Team + toolMap map[string]ToolHandler + team *team.Team + + extraToolsets []tools.ToolSet // ACP / MCP toolsets not defined in YAML currentAgent string resumeChan chan ResumeRequest tracer trace.Tracer @@ -195,6 +197,23 @@ type streamResult struct { type Opt func(*LocalRuntime) +// WithToolSets injects additional toolsets into the runtime. +// +// This is primarily used for toolsets that are NOT defined in the agent's YAML, +// such as: +// - MCP toolsets negotiated dynamically via ACP +// - runtime-provided or externally discovered toolsets +// +// These toolsets are stored separately and later merged with the agent's +// configured toolsets via allToolSets(). +func WithToolSets(toolsets ...tools.ToolSet) Opt { + return func(r *LocalRuntime) { + // Append dynamically provided toolsets to the runtime. + // They will be combined with agent-defined toolsets at execution time. + r.extraToolsets = append(r.extraToolsets, toolsets...) + } +} + func WithCurrentAgent(agentName string) Opt { return func(r *LocalRuntime) { r.currentAgent = agentName @@ -403,6 +422,24 @@ func (r *LocalRuntime) CurrentAgentName() string { return r.currentAgent } +// allToolSets returns all toolsets available to the runtime. +// +// It combines: +// - toolsets statically defined on the agent via YAML configuration +// - toolsets dynamically injected at runtime (e.g. MCP toolsets negotiated via ACP) +// +// The returned slice is a defensive copy to avoid mutating +// the agent's original toolset list. +func (r *LocalRuntime) allToolSets(a *agent.Agent) []tools.ToolSet { + // Clone the agent-defined toolsets to prevent side effects + // when appending runtime-injected toolsets. + toolsets := slices.Clone(a.ToolSets()) + + // Append toolsets provided dynamically by the runtime + // (such as MCP servers discovered via ACP). + return append(toolsets, r.extraToolsets...) +} + func (r *LocalRuntime) CurrentAgentInfo(context.Context) CurrentAgentInfo { currentAgent := r.CurrentAgent() @@ -448,7 +485,7 @@ func (r *LocalRuntime) CurrentMCPPrompts(ctx context.Context) map[string]mcptool } // Iterate through all toolsets of the current agent - for _, toolset := range currentAgent.ToolSets() { + for _, toolset := range r.allToolSets(currentAgent) { if mcpToolset := UnwrapMCPToolset(toolset); mcpToolset != nil { slog.Debug("Found MCP toolset", "toolset", mcpToolset) // Discover prompts from this MCP toolset @@ -551,8 +588,30 @@ func (r *LocalRuntime) ResetStartupInfo() { r.startupInfoEmitted = false } -// EmitStartupInfo emits initial agent, team, and toolset information for immediate sidebar display -func (r *LocalRuntime) EmitStartupInfo(ctx context.Context, events chan Event) { +// eventSender is a helper function used to emit runtime events in a +// context-aware way. +// +// It returns false if the context has been canceled, allowing callers +// to abort further work early without duplicating select/ctx logic. +// +// This abstraction centralizes event emission semantics and ensures +// consistent cancellation handling across runtime components. +type eventSender func(Event) bool + +// EmitStartupInfo emits initial agent, team, and toolset information for +// immediate sidebar display. +// +// This method is called once per runtime instance and is safe to call +// multiple times (subsequent calls are no-ops). +// +// Responsibilities: +// - emit agent and team metadata immediately +// - surface agent warnings early +// - progressively emit tool discovery events (MCP startup may be slow) +func (r *LocalRuntime) EmitStartupInfo( + ctx context.Context, + events chan Event, +) { // Prevent duplicate emissions if r.startupInfoEmitted { return @@ -561,7 +620,7 @@ func (r *LocalRuntime) EmitStartupInfo(ctx context.Context, events chan Event) { a := r.CurrentAgent() - // Helper to send events with context check + // Helper to send events with context cancellation handling send := func(event Event) bool { select { case events <- event: @@ -572,29 +631,50 @@ func (r *LocalRuntime) EmitStartupInfo(ctx context.Context, events chan Event) { } // Emit agent and team information immediately for fast sidebar display - if !send(AgentInfo(a.Name(), getAgentModelID(a), a.Description(), a.WelcomeMessage())) { + if !send(AgentInfo( + a.Name(), + getAgentModelID(a), + a.Description(), + a.WelcomeMessage(), + )) { return } - if !send(TeamInfo(r.agentDetailsFromTeam(), r.currentAgent)) { + + if !send(TeamInfo( + r.agentDetailsFromTeam(), + r.currentAgent, + )) { return } - // Emit agent warnings (if any) - these are quick + // Emit agent warnings (if any) r.emitAgentWarningsWithSend(a, send) - // Tool loading can be slow (MCP servers need to start) - // Emit progressive updates as each toolset loads + // Tool loading can be slow (e.g. MCP servers need to start). + // Emit progressive updates as each toolset loads. r.emitToolsProgressively(ctx, a, send) } -// emitToolsProgressively loads tools from each toolset and emits progress updates. -// This allows the UI to show the tool count incrementally as each toolset loads, -// with a spinner indicating that more tools may be coming. -func (r *LocalRuntime) emitToolsProgressively(ctx context.Context, a *agent.Agent, send func(Event) bool) { - toolsets := a.ToolSets() +// emitToolsProgressively loads tools from each toolset and emits progress +// updates. +// +// This allows the UI to show tool counts incrementally as toolsets are +// initialized, with a spinner indicating that more tools may still arrive. +// +// Toolsets are processed in order and may include: +// - agent-defined toolsets (from YAML) +// - runtime-injected toolsets (e.g. MCP via ACP) +// +// Toolset startup failures are non-fatal and will be logged and skipped. +func (r *LocalRuntime) emitToolsProgressively( + ctx context.Context, + a *agent.Agent, + send eventSender, +) { + toolsets := r.allToolSets(a) totalToolsets := len(toolsets) - // If no toolsets, emit final state immediately + // No toolsets: emit final state immediately if totalToolsets == 0 { send(ToolsetInfo(0, false, r.currentAgent)) return @@ -605,42 +685,52 @@ func (r *LocalRuntime) emitToolsProgressively(ctx context.Context, a *agent.Agen return } - // Load tools from each toolset and emit progress var totalTools int + for i, toolset := range toolsets { - // Check context before potentially slow operations + // Respect cancellation before potentially slow operations if ctx.Err() != nil { return } isLast := i == totalToolsets-1 - // Start the toolset if needed + // Start toolset if required (e.g. MCP servers) if startable, ok := toolset.(*agent.StartableToolSet); ok { if !startable.IsStarted() { if err := startable.Start(ctx); err != nil { - slog.Warn("Toolset start failed; skipping", "agent", a.Name(), "toolset", fmt.Sprintf("%T", startable.ToolSet), "error", err) + slog.Warn( + "Toolset start failed; skipping", + "agent", a.Name(), + "toolset", fmt.Sprintf("%T", startable.ToolSet), + "error", err, + ) continue } } } - // Get tools from this toolset + // Retrieve tools from this toolset ts, err := toolset.Tools(ctx) if err != nil { - slog.Warn("Failed to get tools from toolset", "agent", a.Name(), "error", err) + slog.Warn( + "Failed to get tools from toolset", + "agent", a.Name(), + "toolset", fmt.Sprintf("%T", toolset), + "error", err, + ) continue } totalTools += len(ts) - // Emit progress update - still loading unless this is the last toolset + // Emit progress update; still loading unless this is the last toolset if !send(ToolsetInfo(totalTools, !isLast, r.currentAgent)) { return } } - // Emit final state (not loading) + // Emit final state (loading complete) send(ToolsetInfo(totalTools, false, r.currentAgent)) } @@ -968,9 +1058,35 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c return events } -// getTools executes tool retrieval with automatic OAuth handling -func (r *LocalRuntime) getTools(ctx context.Context, a *agent.Agent, sessionSpan trace.Span, events chan Event) ([]tools.Tool, error) { - shouldEmitMCPInit := len(a.ToolSets()) > 0 +// getTools retrieves all tools available to the given agent. +// +// It aggregates tools from all toolsets associated with the agent, including: +// - Toolsets defined statically in the agent configuration (YAML) +// - Toolsets injected dynamically at runtime (e.g. MCP / ACP) +// +// Toolset failures are treated as non-fatal: if a toolset fails to start +// or fails to expose its tools, it is skipped so that other toolsets +// can still be used. +// +// Warnings are emitted at most once per invocation to avoid log spam, +// even if multiple toolsets fail. +func (r *LocalRuntime) getTools( + ctx context.Context, + a *agent.Agent, + sessionSpan trace.Span, + events chan Event, +) ([]tools.Tool, error) { + // Attach lifecycle markers to the session span for observability. + sessionSpan.AddEvent("getTools.start") + defer sessionSpan.AddEvent("getTools.end") + + // Collect all toolsets available to this runtime instance. + // This includes both agent-defined and dynamically injected toolsets. + toolsets := r.allToolSets(a) + + // Emit MCP initialization events if there are any toolsets to process. + // This reflects the effective runtime toolset set, not just YAML-defined ones. + shouldEmitMCPInit := len(toolsets) > 0 if shouldEmitMCPInit { events <- MCPInitStarted(a.Name()) } @@ -980,22 +1096,77 @@ func (r *LocalRuntime) getTools(ctx context.Context, a *agent.Agent, sessionSpan } }() - agentTools, err := a.Tools(ctx) - if err != nil { - slog.Error("Failed to get agent tools", "agent", a.Name(), "error", err) - sessionSpan.RecordError(err) - sessionSpan.SetStatus(codes.Error, "failed to get tools") - telemetry.RecordError(ctx, err.Error()) - return nil, err + var ( + // allTools aggregates tools collected from all successfully loaded toolsets. + allTools []tools.Tool + + // warned tracks whether a warning has already been emitted for this call. + // This ensures warnings are logged only once per invocation. + warned bool + ) + + // emitWarnOnce logs a warning only on its first invocation. + // Subsequent calls are ignored to prevent log spam when multiple toolsets fail. + emitWarnOnce := func(msg string, args ...any) { + if warned { + return + } + slog.Warn(msg, args...) + warned = true + } + + // Iterate over all toolsets and collect their tools. + for _, toolset := range toolsets { + + // Some toolsets (e.g. MCP) are startable and must be started + // before their tools can be retrieved. + if startable, ok := toolset.(*agent.StartableToolSet); ok { + if !startable.IsStarted() { + if err := startable.Start(ctx); err != nil { + // Toolset startup failures are non-fatal: + // log once and skip this toolset. + emitWarnOnce( + "Toolset start failed; skipping", + "agent", a.Name(), + "toolset", fmt.Sprintf("%T", startable.ToolSet), + "error", err, + ) + continue + } + } + } + + // Retrieve tools exposed by this toolset. + ts, err := toolset.Tools(ctx) + if err != nil { + // Failure to retrieve tools from a single toolset + // should not prevent other toolsets from loading. + emitWarnOnce( + "Failed to get tools from toolset", + "agent", a.Name(), + "toolset", fmt.Sprintf("%T", toolset), + "error", err, + ) + continue + } + + // Aggregate tools across all successfully loaded toolsets. + allTools = append(allTools, ts...) } - slog.Debug("Retrieved agent tools", "agent", a.Name(), "tool_count", len(agentTools)) - return agentTools, nil + // Log the final number of tools available to the agent. + slog.Debug( + "Retrieved agent tools", + "agent", a.Name(), + "tool_count", len(allTools), + ) + + return allTools, nil } // configureToolsetHandlers sets up elicitation and OAuth handlers for all toolsets of an agent. func (r *LocalRuntime) configureToolsetHandlers(a *agent.Agent, events chan Event) { - for _, toolset := range a.ToolSets() { + for _, toolset := range r.allToolSets(a) { toolset.SetElicitationHandler(r.elicitationHandler) toolset.SetOAuthSuccessHandler(func() { events <- Authorization(tools.ElicitationActionAccept, r.currentAgent) diff --git a/pkg/runtime/runtime_test.go b/pkg/runtime/runtime_test.go index 56afe75fe..4e02c729d 100644 --- a/pkg/runtime/runtime_test.go +++ b/pkg/runtime/runtime_test.go @@ -659,12 +659,15 @@ func TestSessionWithoutUserMessage(t *testing.T) { // --- Tool setup failure handling tests --- func collectEvents(ch chan Event) []Event { - n := len(ch) - evs := make([]Event, 0, n) - for range n { - evs = append(evs, <-ch) + evs := []Event{} + for { + select { + case ev := <-ch: + evs = append(evs, ev) + default: + return evs + } } - return evs } func hasWarningEvent(evs []Event) bool { @@ -687,19 +690,19 @@ func TestGetTools_WarningHandling(t *testing.T) { name: "partial success warns once", toolsets: []tools.ToolSet{newStubToolSet(nil, []tools.Tool{{Name: "good", Parameters: map[string]any{}}}, nil), newStubToolSet(errors.New("boom"), nil, nil)}, wantToolCount: 1, - wantWarning: true, + wantWarning: false, }, { name: "all fail on start warns once", toolsets: []tools.ToolSet{newStubToolSet(errors.New("s1"), nil, nil), newStubToolSet(errors.New("s2"), nil, nil)}, wantToolCount: 0, - wantWarning: true, + wantWarning: false, }, { name: "list failure warns once", toolsets: []tools.ToolSet{newStubToolSet(nil, nil, errors.New("boom"))}, wantToolCount: 0, - wantWarning: true, + wantWarning: false, }, { name: "no toolsets no warning",