feat: Add OTEL-enhanced ext_proc for zero-agent GenAI observability#119
feat: Add OTEL-enhanced ext_proc for zero-agent GenAI observability#119Ladas wants to merge 3 commits intokagenti:mainfrom
Conversation
Add OpenTelemetry root span creation and SSE stream parsing to the existing go-processor. When OTEL_TRACING_ENABLED=true, the ext_proc: - Creates invoke_agent root spans with GenAI semantic conventions - Injects W3C traceparent so agent spans become children - Parses A2A request body for user input and conversation ID - Parses SSE response stream for LLM/tool child spans with token counts - Handles client disconnect via tasks/resubscribe fallback - Extracts agent output from artifact events or tasks/get All existing functionality (JWT validation, token exchange, resolver) is preserved. OTEL is opt-in via environment variable. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Ladislav Smola <lsmola@redhat.com>
b0623b0 to
e42f45c
Compare
When the A2A response is a buffered JSON-RPC response (not SSE events), the ext_proc now extracts child spans from result.history. This handles the case where the A2A SDK returns the complete task with history messages instead of streaming SSE events. The history messages contain the same LangGraph step format (🚶♂️assistant: / 🚶♂️tools:) as SSE events, with full LangChain metadata for token counts, model names, and tool call details. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Ladislav Smola <lsmola@redhat.com>
The A2A SDK merges the final LLM response into the artifact/completion rather than including it in result.history. This means the ext_proc only sees 2 of the 3 LangGraph steps (LLM→tool→LLM). When the last history event is a tool call and the task completed with an artifact, infer the final LLM call and create a "chat" child span for it with the artifact text as gen_ai.completion. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Ladislav Smola <lsmola@redhat.com>
pdettori
left a comment
There was a problem hiding this comment.
Review Summary
Solid architecture — root spans with child spans parsed from SSE events, W3C traceparent injection, and disconnect recovery via tasks/resubscribe. The GenAI semantic convention usage looks correct and the opt-in via OTEL_TRACING_ENABLED is clean.
Two blocking issues: mutex held during OTEL SDK calls (contention risk under concurrent streams), and JSON injection via unescaped taskID in fmt.Sprintf. Several suggestions for hardcoded values, Docker build reproducibility, and unbounded span attributes.
Areas reviewed: Go (concurrency, error handling, security), Dockerfile, go.mod dependencies
Commits: 3 commits, all signed-off ✓
CI: All checks passing ✓
Note on commit attribution: All 3 commits use Co-Authored-By: Claude Opus 4.6. The kagenti convention prefers Assisted-By for AI attribution to avoid inflating contributor stats. Consider amending if the extensions repo follows the same policy.
| // For each SSE chunk, it parses events and creates nested child spans for | ||
| // LLM and tool events. On end_of_stream, it sets the output on the root span. | ||
| func (p *processor) handleResponseBody(stream v3.ExternalProcessor_ProcessServer, body []byte, endOfStream bool) *v3.ProcessingResponse { | ||
| p.mu.Lock() |
There was a problem hiding this comment.
must-fix: Mutex held across OTEL SDK calls and SSE parsing
p.mu.Lock() is acquired here and not released until the manual unlock in two branches (~37 lines later). The entire SSE parsing loop — parseSSEEvents, classifySSEEvent, createChildSpan (which calls otelTracer.Start) — runs while the mutex is held.
Under concurrent streams this creates contention and potential deadlock risk if any OTEL SDK call blocks.
Suggested fix: Copy state out under the lock, then operate on the copy:
p.mu.Lock()
state := p.streamSpans[stream]
p.mu.Unlock()
if state != nil {
// SSE parsing and span operations on state (no lock needed)
}
// Re-acquire lock only for map mutation
if endOfStream {
p.mu.Lock()
delete(p.streamSpans, stream)
p.mu.Unlock()
}Also consider using defer p.mu.Unlock() instead of manual unlock in two branches — the current pattern is fragile if new code paths are added.
| // resubscribeAndCapture opens a new SSE streaming connection to the agent's | ||
| // tasks/resubscribe endpoint for disconnect recovery. | ||
| func resubscribeAndCapture(cancelCtx context.Context, taskID string, span trace.Span, spanCtx context.Context, startIndex int) (string, int) { | ||
| reqBody := fmt.Sprintf(`{"jsonrpc":"2.0","id":"ext-proc-resub","method":"tasks/resubscribe","params":{"id":"%s"}}`, taskID) |
There was a problem hiding this comment.
must-fix: JSON injection via unescaped taskID in fmt.Sprintf
taskID is interpolated directly into a JSON string with %s. If the agent ever returns a task ID containing " or \, this produces malformed or potentially manipulated JSON. Same issue at line 1486 (tasks/get).
Fix: Use encoding/json to marshal the request struct:
type jsonRPCRequest struct {
JSONRPC string `json:"jsonrpc"`
ID string `json:"id"`
Method string `json:"method"`
Params interface{} `json:"params"`
}
reqBytes, _ := json.Marshal(jsonRPCRequest{
JSONRPC: "2.0",
ID: "ext-proc-resub",
Method: "tasks/resubscribe",
Params: map[string]string{"id": taskID},
})| func resubscribeAndCapture(cancelCtx context.Context, taskID string, span trace.Span, spanCtx context.Context, startIndex int) (string, int) { | ||
| reqBody := fmt.Sprintf(`{"jsonrpc":"2.0","id":"ext-proc-resub","method":"tasks/resubscribe","params":{"id":"%s"}}`, taskID) | ||
|
|
||
| req, err := http.NewRequestWithContext(cancelCtx, "POST", "http://127.0.0.1:8000/", strings.NewReader(reqBody)) |
There was a problem hiding this comment.
suggestion: Hardcoded agent URL http://127.0.0.1:8000/
This appears here and again at line 1486 (fetchTaskResult). If the agent runs on a different port or the ext_proc is not co-located, these calls silently fail.
Consider making this configurable:
agentURL = getEnvOrDefault("AGENT_URL", "http://127.0.0.1:8000")Also note: these requests carry no Authorization header. If the agent requires auth, the calls will fail. The trust model (ext_proc on loopback) should be documented.
| // OTEL tracing setup | ||
| // ============================================================================ | ||
|
|
||
| func initOtelTracing() error { |
There was a problem hiding this comment.
suggestion: Default values are domain-specific examples
"weather-assistant", "weather-service" are baked in as defaults. A misconfigured deployment will silently report all traces as coming from weather-assistant. Consider either:
- Empty string defaults with a startup warning log
- Or at minimum, values like
"unknown-agent"/"unknown-service"that are obviously wrong in dashboards
| COPY go-processor/ ./go-processor/ | ||
|
|
||
| RUN CGO_ENABLED=0 GOOS=linux go build -o /go-processor ./go-processor | ||
| RUN go mod tidy && CGO_ENABLED=0 GOOS=linux GOARCH=${TARGETARCH} go build -o /go-processor ./go-processor |
There was a problem hiding this comment.
suggestion: go mod tidy at build time breaks reproducibility
Running go mod tidy during the Docker build means the build can pull different transitive dependency versions at different times. Combined with go.sum* glob on line 7 (which allows a missing go.sum entirely), this can silently introduce new dependencies.
Standard practice: commit go.sum, COPY go.sum (not go.sum*), go mod download for caching, then build.
|
|
||
| COPY go.mod go.sum ./ | ||
| RUN go mod download | ||
| COPY go.mod go.sum* ./ |
There was a problem hiding this comment.
suggestion: go.sum* glob allows building without a lock file
If go.sum is absent, the * glob silently skips it and go mod tidy on line 11 generates it at build time. This means builds are not pinned to specific dependency versions. Consider committing go.sum and removing the glob.
| // MLflow/OpenInference attributes are derived by the OTEL Collector. | ||
| if userInput != "" { | ||
| state.span.SetAttributes( | ||
| attribute.String("gen_ai.prompt", userInput), |
There was a problem hiding this comment.
suggestion: gen_ai.prompt attribute has no size limit
userInput is set as a span attribute without truncation. Later in the code, artifact output is truncated to 1000 chars (good), but prompts are not. Very long prompts could cause issues with some OTEL collectors or backends.
Consider applying the same truncation pattern used for gen_ai.completion.
| buf = append(buf, readBuf[:n]...) | ||
|
|
||
| for { | ||
| idx := strings.Index(string(buf), "\n\n") |
There was a problem hiding this comment.
nit: string(buf) conversion on every read loop iteration
This performs a full []byte to string copy and linear scan on every iteration. For a long-running SSE stream, this is O(n) per read where n is the total buffered data.
// Use bytes.Index instead:
idx := bytes.Index(buf, []byte("\n\n"))|
|
||
| // Skip OTEL span creation for non-API paths (agent card, health) | ||
| reqPath := getHeaderValue(headers.Headers, ":path") | ||
| isAPIRequest := reqPath == "/" || strings.HasPrefix(reqPath, "/?") |
There was a problem hiding this comment.
nit: Path detection may be too narrow
This only matches / and /?*, so A2A requests to sub-paths (e.g., /api/v1/agent) would skip tracing entirely. If that is intentional, a comment explaining which paths are expected would help. If not, consider a positive check for non-API paths instead:
isNonAPIPath := reqPath == "/.well-known/agent-card.json" || reqPath == "/health" || reqPath == "/healthz"
isAPIRequest := !isNonAPIPath
Summary
Adds
AuthBridge/otel-ext-proc/— a Go gRPC ext_proc that creates OpenTelemetry root spans and nested child spans by parsing A2A SSE stream events. Enables full GenAI observability with zero OTEL code in agents.Related
Features
invoke_agent {name}withgen_ai.*attributeschat {model}andexecute_tool {name}from SSE eventstasks/getfallbackArchitecture
Test plan
🤖 Generated with Claude Code