From 91c208a5e1eb5c4ab764e70118f4a41ca1ee8099 Mon Sep 17 00:00:00 2001 From: Piotr Duda Date: Sat, 21 Mar 2026 15:18:08 +0100 Subject: [PATCH 1/3] Support for agentic workflow observility --- README.md | 1 - docs/manual-tracking.md | 37 ++++ .../research/Supporting Agentic Workflows.md | 150 +++++++++++++++ internal/research/Trace | 0 internal/research/Trace Span API Tweaks.md | 66 +++++++ tests/test_client.py | 2 - tests/test_event_serialization.py | 15 ++ tests/test_tracing.py | 32 ++++ wildedge/__init__.py | 5 + wildedge/client.py | 178 ++++++++++++++++++ wildedge/decorators.py | 48 +++++ wildedge/events/__init__.py | 2 + wildedge/events/common.py | 11 ++ wildedge/events/error.py | 26 ++- wildedge/events/feedback.py | 26 ++- wildedge/events/inference.py | 26 ++- wildedge/events/model_download.py | 26 ++- wildedge/events/model_load.py | 26 ++- wildedge/events/model_unload.py | 26 ++- wildedge/events/span.py | 66 +++++++ wildedge/model.py | 115 +++++++++++ wildedge/tracing.py | 161 ++++++++++++++++ 22 files changed, 1036 insertions(+), 9 deletions(-) create mode 100644 internal/research/Supporting Agentic Workflows.md create mode 100644 internal/research/Trace create mode 100644 internal/research/Trace Span API Tweaks.md create mode 100644 tests/test_tracing.py create mode 100644 wildedge/events/common.py create mode 100644 wildedge/events/span.py create mode 100644 wildedge/tracing.py diff --git a/README.md b/README.md index 1c24d0b..1777cd2 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,6 @@ client = wildedge.init( If no DSN is configured, the client becomes a no-op and logs a warning. `init(...)` is a convenience wrapper for `WildEdge(...)` + `instrument(...)`. - ## Supported integrations **On-device** diff --git a/docs/manual-tracking.md b/docs/manual-tracking.md index f90f58a..d908d33 100644 --- a/docs/manual-tracking.md +++ b/docs/manual-tracking.md @@ -215,3 +215,40 @@ handle.feedback(FeedbackType.THUMBS_DOWN) ``` `FeedbackType` values: `THUMBS_UP`, `THUMBS_DOWN`. + +## Track spans for agentic workflows + +Use span events to track non-inference steps like planning, tool calls, retrieval, or memory updates. + +```python +from wildedge.timing import Timer + +with Timer() as t: + tool_result = call_tool() + +client.track_span( + kind="tool", + name="call_tool", + duration_ms=t.elapsed_ms, + status="ok", + attributes={"tool": "search"}, +) +``` + +You can also attach optional correlation fields (`trace_id`, `span_id`, +`parent_span_id`, `run_id`, `agent_id`, `step_index`, `conversation_id`) to any +event by passing them into `track_inference`, `track_error`, `track_feedback`, +or `track_span`. + +### Trace context helpers + +Use `trace_context()` and `span_context()` to auto-populate correlation fields +for all events emitted inside the block: + +```python +import wildedge + +with wildedge.trace_context(run_id="run-123", agent_id="agent-1"): + with wildedge.span_context(step_index=1): + handle.track_inference(duration_ms=12) +``` diff --git a/internal/research/Supporting Agentic Workflows.md b/internal/research/Supporting Agentic Workflows.md new file mode 100644 index 0000000..c03d2e3 --- /dev/null +++ b/internal/research/Supporting Agentic Workflows.md @@ -0,0 +1,150 @@ +# Supporting Agentic Workflows + +## Summary +WildEdge already captures model load/download/inference events and basic errors. Agentic systems introduce multi-step plans, tool calls, retrieval, memory, and feedback loops that require traceability across steps and richer metadata. This document proposes a full, production-grade extension to the SDK and protocol to support agentic workflows while remaining backward compatible. + +## Goals +- Support end-to-end tracing of agent runs across LLM calls, tools, retrieval, and memory. +- Provide a minimal, ergonomic SDK API for trace/span creation and propagation. +- Preserve existing integrations and CLI behavior with no required app changes. +- Allow vendor-agnostic metadata attachment without frequent schema changes. +- Enable safe payload controls for privacy and compliance. + +## Non-Goals +- Build a full UI or backend visualization in this repository. +- Require user code changes for existing integrations. +- Enforce a single agent framework or orchestration style. + +## Current SDK Snapshot +- Events: model_load, model_unload, model_download, inference, error, feedback. +- Protocol envelope: batch with protocol_version, device, models, session_id, events. +- Integrations: onnx, gguf, openai, timm, tensorflow, transformers, ultralytics, mlx, torch/keras noop. +- Manual tracking: register_model + track_inference, track_error, track_feedback. +- CLI: wildedge run autoloads runtime and patches integrations. + +## Requirements For Agentic Monitoring +- Correlate events into a single run with a DAG of spans. +- Capture agent steps and tool calls that are not model inference. +- Associate retrieval results, memory writes, and guardrails with the same run. +- Support async workloads and nested spans safely. +- Keep payload sizes and sensitive data under control. + +## Proposed Protocol Extensions +### 1) Correlation Fields (Optional on All Events) +- trace_id: string +- span_id: string +- parent_span_id: string +- run_id: string +- agent_id: string +- step_index: integer +- conversation_id: string + +### 2) Generic Attributes Map (Optional on All Events) +- attributes: object (string keys, JSON-serializable values) + +### 3) New Span Event Type +- event_type: "span" +- span.kind: "agent_step" | "tool" | "retrieval" | "memory" | "router" | "guardrail" | "cache" | "eval" | "custom" +- span.name: string +- span.duration_ms: integer +- span.status: "ok" | "error" +- span.input_summary: string (optional, truncated) +- span.output_summary: string (optional, truncated) +- span.attributes: object + +### 4) Optional Usage Block For Inference Events +- inference.usage.tokens_in: integer +- inference.usage.tokens_out: integer +- inference.usage.cached_input_tokens: integer +- inference.usage.reasoning_tokens_out: integer +- inference.usage.cost_usd: number + +### 5) Protocol Versioning +- Bump PROTOCOL_VERSION to 1.1 +- All new fields remain optional and are ignored by older backends. + +## SDK API Additions +### Trace Context +- wildedge.trace(name: str, run_id: str | None, agent_id: str | None, conversation_id: str | None) +- wildedge.span(kind: str, name: str, attributes: dict | None) +- Context propagation via contextvars for async support. + +### Span Emission +- Span context managers emit a SpanEvent on exit with duration and status. +- Current trace/span context attaches to all events automatically. + +### Example +```python +import wildedge + +with wildedge.trace("puzzle_run", run_id="run-123", agent_id="puzzle-agent"): + with wildedge.span("agent_step", name="analyze_board", attributes={"iteration": 4}): + result = planner() + with wildedge.span("tool", name="rotate_cell", attributes={"col": 2, "row": 1}): + apply_move() +``` + +## Integration Strategy +### Phase 1: Core SDK +- Add trace/span context and SpanEvent. +- Attach correlation fields to all existing events. +- Add generic attributes map. + +### Phase 2: Framework Integrations +- langchain / langgraph +- llamaindex +- autogen +- crewai +- dspy +- litellm (covers many providers) + +### Integration Behavior +- Wrap framework callbacks to create spans for steps, tools, retrieval, memory. +- Reuse existing inference tracking when LLM calls happen under the hood. +- Map framework-specific fields into attributes without schema changes. + +## CLI Impact +- No changes required to wildedge run. +- New SDK features activate automatically when integrations emit spans. + +## Privacy, Security, and Payload Controls +- Add settings to disable input/output capture globally. +- Redaction hooks for PII or secrets. +- Truncation and hashing for large content fields. +- Clear documentation and defaults that favor safety. + +## Compatibility and Migration +- Existing clients continue to work unchanged. +- Server must accept optional new fields. +- Unknown event types should be ignored safely by older servers. + +## Testing Plan +- Unit tests for trace context propagation and span emission. +- Integration tests for nested spans with async workloads. +- Golden tests for event JSON serialization and protocol versioning. + +## Risks +- High cardinality from unbounded attributes. +- Large payloads from agent inputs/outputs. +- Trace context leaks across tasks if not isolated. + +## Milestones +### M1: Protocol and Core SDK +- Add correlation fields and attributes map. +- Add SpanEvent and tracing API. +- Update documentation and examples. + +### M2: First Framework Integration +- Implement langchain or llamaindex integration. +- Validate span graph integrity. + +### M3: Broader Ecosystem +- Add autogen, crewai, dspy, litellm. +- Add configurable redaction and capture policies. + +## Open Questions +- Should SpanEvent share top-level shape with InferenceEvent or be its own payload? +- Should span.status allow more than ok/error? +- Should run_id be auto-generated when a trace starts without explicit id? +- Should attributes enforce a key whitelist or allow arbitrary JSON? + diff --git a/internal/research/Trace b/internal/research/Trace new file mode 100644 index 0000000..e69de29 diff --git a/internal/research/Trace Span API Tweaks.md b/internal/research/Trace Span API Tweaks.md new file mode 100644 index 0000000..826b93c --- /dev/null +++ b/internal/research/Trace Span API Tweaks.md @@ -0,0 +1,66 @@ +# API Tweak Proposal: Trace and Span UX + +## Background +The current API mixes module-level context managers (trace_context, span_context) with a client method for span emission (WildEdge.span). This works, but it is slightly asymmetric and makes usage feel less Pythonic. There is also easy confusion between correlation attributes (event-level attributes) and span attributes (span.attributes), because the API currently exposes both "attributes" and "span_attributes". + +## Goals +- Make the API feel symmetric and Pythonic for common workflows. +- Reduce cognitive overhead for users who only need basic tracing. +- Keep event schema and protocol unchanged. +- Preserve backward compatibility with minimal code changes. + +## Proposal +### 1) Add client-level trace context +Add a client method that mirrors WildEdge.span: + +- WildEdge.trace(...) -> context manager that sets trace context +- Optional return value: TraceContext (for advanced use) + +This makes usage consistent: + +with we.trace(agent_id=..., conversation_id=...): + with we.span(...): + ... + +### 2) Clarify attributes vs correlation +Rename or alias parameters to make intent explicit: + +- WildEdge.span(..., attributes=...) refers to span attributes (stored under span.attributes) +- WildEdge.span(..., context=...) refers to correlation attributes (stored at the event root) + +Backwards compatible behavior: +- span_attributes remains accepted as an alias for attributes +- attributes on trace_context remains accepted as an alias for context + +### 3) Optional client-level span_context +Expose WildEdge.span_context(...) as a thin wrapper over span_context for symmetry. This is a low-level API for correlation without emitting a span event. + +### 4) Optional decorator sugar (static cases) +Add @we.trace as an optional decorator for static metadata. This is explicit sugar only, not the recommended default for dynamic metadata: + +@we.trace(agent_id="agent-1") +def handle_message(...): + ... + +## Example (proposed usage) + +we = wildedge.WildEdge() +we.instrument("openai") + +with we.trace(agent_id="agent-a", conversation_id=channel): + with we.span( + kind="agent_step", + name="respond", + step_index=step_index, + attributes={"model": model}, + ): + response = client.chat.completions.create(...) + +## Migration and compatibility +- No protocol changes. +- Existing code using trace_context and span_context continues to work. +- Existing code using span_attributes continues to work. +- Add warnings only if we want to encourage migration; otherwise keep silent aliases. + +## Notes for agntr +Once WildEdge.trace exists, agntr can switch from wildedge.trace_context(...) to we.trace(...) for symmetry, and use attributes= for span attributes. diff --git a/tests/test_client.py b/tests/test_client.py index 3417ba8..e09cb2f 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -42,7 +42,6 @@ def test_no_dsn_is_noop(monkeypatch, caplog): assert client.closed is True assert "no DSN configured" in caplog.text - def test_no_dsn_instrument_does_not_raise(monkeypatch): from wildedge.client import WildEdge @@ -62,7 +61,6 @@ def test_no_dsn_publish_does_not_enqueue(monkeypatch): client.publish({"event_type": "inference", "model_id": "m"}) client.queue.add.assert_not_called() - def test_batch_size_too_high(): from wildedge.client import WildEdge diff --git a/tests/test_event_serialization.py b/tests/test_event_serialization.py index 12028c7..cfd1655 100644 --- a/tests/test_event_serialization.py +++ b/tests/test_event_serialization.py @@ -4,6 +4,7 @@ from wildedge.events.inference import InferenceEvent, TextInputMeta from wildedge.events.model_download import AdapterDownload, ModelDownloadEvent from wildedge.events.model_load import AdapterLoad, ModelLoadEvent +from wildedge.events.span import SpanEvent def test_inference_event_to_dict_omits_none_fields(): @@ -72,3 +73,17 @@ def test_feedback_event_enum_and_string_forms(): ) assert enum_event.to_dict()["feedback"]["feedback_type"] == "accept" assert string_event.to_dict()["feedback"]["feedback_type"] == "reject" + + +def test_span_event_to_dict_includes_required_fields(): + event = SpanEvent( + kind="tool", + name="search", + duration_ms=250, + status="ok", + span_attributes={"provider": "custom"}, + ) + data = event.to_dict() + assert data["event_type"] == "span" + assert data["span"]["kind"] == "tool" + assert data["span"]["attributes"]["provider"] == "custom" diff --git a/tests/test_tracing.py b/tests/test_tracing.py new file mode 100644 index 0000000..dd72927 --- /dev/null +++ b/tests/test_tracing.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from wildedge.model import ModelHandle, ModelInfo +from wildedge.tracing import span_context, trace_context + + +def test_track_inference_uses_trace_context(): + events: list[dict] = [] + + def publish(event: dict) -> None: + events.append(event) + + handle = ModelHandle( + "model-1", + ModelInfo( + model_name="test", + model_version="1.0", + model_source="local", + model_format="onnx", + ), + publish, + ) + + with trace_context(trace_id="trace-123", run_id="run-1", agent_id="agent-1"): + with span_context(span_id="span-abc", step_index=2): + handle.track_inference(duration_ms=5) + + assert events[0]["trace_id"] == "trace-123" + assert events[0]["parent_span_id"] == "span-abc" + assert events[0]["run_id"] == "run-1" + assert events[0]["agent_id"] == "agent-1" + assert events[0]["step_index"] == 2 diff --git a/wildedge/__init__.py b/wildedge/__init__.py index f1f449b..5701608 100644 --- a/wildedge/__init__.py +++ b/wildedge/__init__.py @@ -15,6 +15,7 @@ GenerationConfig, GenerationOutputMeta, ImageInputMeta, + SpanEvent, TextInputMeta, ) from wildedge.platforms import capture_hardware @@ -22,6 +23,7 @@ from wildedge.platforms.hardware import HardwareContext, ThermalContext from wildedge.queue import QueuePolicy from wildedge.timing import Timer +from wildedge.tracing import span_context, trace_context __all__ = [ "WildEdge", @@ -42,7 +44,10 @@ "GenerationConfig", "AdapterLoad", "AdapterDownload", + "SpanEvent", "FeedbackType", "ErrorCode", "Timer", + "trace_context", + "span_context", ] diff --git a/wildedge/client.py b/wildedge/client.py index da5e052..5dbf275 100644 --- a/wildedge/client.py +++ b/wildedge/client.py @@ -12,6 +12,7 @@ from wildedge import constants from wildedge.consumer import Consumer from wildedge.dead_letters import DeadLetterStore +from wildedge.events import SpanEvent from wildedge.hubs.base import BaseHubTracker from wildedge.hubs.huggingface import HuggingFaceHubTracker from wildedge.hubs.registry import supported_hubs @@ -39,6 +40,13 @@ from wildedge.queue import EventQueue, QueuePolicy from wildedge.settings import read_client_env, resolve_app_identity from wildedge.timing import Timer, elapsed_ms +from wildedge.tracing import ( + SpanContext, + get_span_context, + merge_correlation_fields, + reset_span_context, + set_span_context, +) from wildedge.transmitter import Transmitter DSN_FORMAT = "'https://@ingest.wildedge.dev/'" @@ -381,6 +389,176 @@ def register_model( return handle + class _SpanContextManager: + def __init__( + self, + client: WildEdge, + *, + kind: str, + name: str, + status: str = "ok", + model_id: str | None = None, + input_summary: str | None = None, + output_summary: str | None = None, + span_attributes: dict[str, Any] | None = None, + trace_id: str | None = None, + span_id: str | None = None, + parent_span_id: str | None = None, + run_id: str | None = None, + agent_id: str | None = None, + step_index: int | None = None, + conversation_id: str | None = None, + attributes: dict[str, Any] | None = None, + ): + self._client = client + self._kind = kind + self._name = name + self._status = status + self._model_id = model_id + self._input_summary = input_summary + self._output_summary = output_summary + self._span_attributes = span_attributes + self._trace_id = trace_id + self._span_id = span_id + self._parent_span_id = parent_span_id + self._run_id = run_id + self._agent_id = agent_id + self._step_index = step_index + self._conversation_id = conversation_id + self._attributes = attributes + self._t0: float | None = None + self._span_token = None + + def __enter__(self): + self._t0 = time.perf_counter() + if self._span_id is None: + self._span_id = str(uuid.uuid4()) + if self._parent_span_id is None: + current = get_span_context() + self._parent_span_id = current.span_id if current else None + self._span_token = set_span_context( + SpanContext( + span_id=self._span_id, + parent_span_id=self._parent_span_id, + step_index=self._step_index, + attributes=self._attributes, + ) + ) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self._t0 is None: + return False + duration_ms = elapsed_ms(self._t0) + status = "error" if exc_type else self._status + self._client.track_span( + kind=self._kind, + name=self._name, + duration_ms=duration_ms, + status=status, + model_id=self._model_id, + input_summary=self._input_summary, + output_summary=self._output_summary, + span_attributes=self._span_attributes, + trace_id=self._trace_id, + span_id=self._span_id, + parent_span_id=self._parent_span_id, + run_id=self._run_id, + agent_id=self._agent_id, + step_index=self._step_index, + conversation_id=self._conversation_id, + attributes=self._attributes, + ) + if self._span_token is not None: + reset_span_context(self._span_token) + return False + + def span( + self, + *, + kind: str, + name: str, + status: str = "ok", + model_id: str | None = None, + input_summary: str | None = None, + output_summary: str | None = None, + span_attributes: dict[str, Any] | None = None, + trace_id: str | None = None, + span_id: str | None = None, + parent_span_id: str | None = None, + run_id: str | None = None, + agent_id: str | None = None, + step_index: int | None = None, + conversation_id: str | None = None, + attributes: dict[str, Any] | None = None, + ) -> _SpanContextManager: + """Context manager that times and emits a span event.""" + return WildEdge._SpanContextManager( + self, + kind=kind, + name=name, + status=status, + model_id=model_id, + input_summary=input_summary, + output_summary=output_summary, + span_attributes=span_attributes, + trace_id=trace_id, + span_id=span_id, + parent_span_id=parent_span_id, + run_id=run_id, + agent_id=agent_id, + step_index=step_index, + conversation_id=conversation_id, + attributes=attributes, + ) + + def track_span( + self, + *, + kind: str, + name: str, + duration_ms: int, + status: str = "ok", + model_id: str | None = None, + input_summary: str | None = None, + output_summary: str | None = None, + span_attributes: dict[str, Any] | None = None, + trace_id: str | None = None, + span_id: str | None = None, + parent_span_id: str | None = None, + run_id: str | None = None, + agent_id: str | None = None, + step_index: int | None = None, + conversation_id: str | None = None, + attributes: dict[str, Any] | None = None, + ) -> str: + """Emit a span event for agentic workflows and tooling.""" + correlation = merge_correlation_fields( + trace_id=trace_id, + span_id=span_id, + parent_span_id=parent_span_id, + run_id=run_id, + agent_id=agent_id, + step_index=step_index, + conversation_id=conversation_id, + attributes=attributes, + ) + if correlation["span_id"] is None: + correlation["span_id"] = str(uuid.uuid4()) + event = SpanEvent( + kind=kind, + name=name, + duration_ms=duration_ms, + status=status, + model_id=model_id, + input_summary=input_summary, + output_summary=output_summary, + span_attributes=span_attributes, + **correlation, + ) + self.publish(event.to_dict()) + return correlation["span_id"] + def _find_extractor(self, model_obj: object) -> BaseExtractor | None: for candidate in DEFAULT_EXTRACTORS: if candidate.can_handle(model_obj): diff --git a/wildedge/decorators.py b/wildedge/decorators.py index fe2e3ff..47e2fef 100644 --- a/wildedge/decorators.py +++ b/wildedge/decorators.py @@ -38,6 +38,14 @@ def __init__( input_meta: Any = None, output_meta: Any = None, generation_config: Any = None, + trace_id: str | None = None, + span_id: str | None = None, + parent_span_id: str | None = None, + run_id: str | None = None, + agent_id: str | None = None, + step_index: int | None = None, + conversation_id: str | None = None, + attributes: dict[str, Any] | None = None, ): self.handle = handle self.input_type = input_type @@ -46,6 +54,14 @@ def __init__( self.input_meta = input_meta self.output_meta = output_meta self.generation_config = generation_config + self.trace_id = trace_id + self.span_id = span_id + self.parent_span_id = parent_span_id + self.run_id = run_id + self.agent_id = agent_id + self.step_index = step_index + self.conversation_id = conversation_id + self.attributes = attributes self.start_time: float | None = None def __call__(self, func): @@ -62,6 +78,14 @@ def wrapper(*args, **kwargs): input_meta=self.input_meta, output_meta=self.output_meta, generation_config=self.generation_config, + trace_id=self.trace_id, + span_id=self.span_id, + parent_span_id=self.parent_span_id, + run_id=self.run_id, + agent_id=self.agent_id, + step_index=self.step_index, + conversation_id=self.conversation_id, + attributes=self.attributes, ) return result except Exception as exc: @@ -69,6 +93,14 @@ def wrapper(*args, **kwargs): self.handle.track_error( error_code="UNKNOWN", error_message=str(exc)[: constants.ERROR_MSG_MAX_LEN], + trace_id=self.trace_id, + span_id=self.span_id, + parent_span_id=self.parent_span_id, + run_id=self.run_id, + agent_id=self.agent_id, + step_index=self.step_index, + conversation_id=self.conversation_id, + attributes=self.attributes, ) raise @@ -89,6 +121,14 @@ def __exit__(self, exc_type, exc_val, exc_tb): error_message=str(exc_val)[: constants.ERROR_MSG_MAX_LEN] if exc_val else None, + trace_id=self.trace_id, + span_id=self.span_id, + parent_span_id=self.parent_span_id, + run_id=self.run_id, + agent_id=self.agent_id, + step_index=self.step_index, + conversation_id=self.conversation_id, + attributes=self.attributes, ) else: self.handle.track_inference( @@ -99,5 +139,13 @@ def __exit__(self, exc_type, exc_val, exc_tb): input_meta=self.input_meta, output_meta=self.output_meta, generation_config=self.generation_config, + trace_id=self.trace_id, + span_id=self.span_id, + parent_span_id=self.parent_span_id, + run_id=self.run_id, + agent_id=self.agent_id, + step_index=self.step_index, + conversation_id=self.conversation_id, + attributes=self.attributes, ) return False diff --git a/wildedge/events/__init__.py b/wildedge/events/__init__.py index b08bc78..6648c40 100644 --- a/wildedge/events/__init__.py +++ b/wildedge/events/__init__.py @@ -19,6 +19,7 @@ from wildedge.events.model_download import AdapterDownload, ModelDownloadEvent from wildedge.events.model_load import AdapterLoad, ModelLoadEvent from wildedge.events.model_unload import ModelUnloadEvent +from wildedge.events.span import SpanEvent __all__ = [ "ApiMeta", @@ -40,6 +41,7 @@ "ModelDownloadEvent", "ModelLoadEvent", "ModelUnloadEvent", + "SpanEvent", "TextInputMeta", "TopKPrediction", ] diff --git a/wildedge/events/common.py b/wildedge/events/common.py new file mode 100644 index 0000000..399b55b --- /dev/null +++ b/wildedge/events/common.py @@ -0,0 +1,11 @@ +from __future__ import annotations + +from typing import Any + + +def add_optional_fields(event: dict, fields: dict[str, Any]) -> dict: + """Add non-None fields to an event payload.""" + for key, value in fields.items(): + if value is not None: + event[key] = value + return event diff --git a/wildedge/events/error.py b/wildedge/events/error.py index 13d29f1..56501b9 100644 --- a/wildedge/events/error.py +++ b/wildedge/events/error.py @@ -23,6 +23,14 @@ class ErrorEvent: error_message: str | None = None stack_trace_hash: str | None = None related_event_id: str | None = None + trace_id: str | None = None + span_id: str | None = None + parent_span_id: str | None = None + run_id: str | None = None + agent_id: str | None = None + step_index: int | None = None + conversation_id: str | None = None + attributes: dict[str, Any] | None = None event_id: str = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) @@ -40,10 +48,26 @@ def to_dict(self) -> dict: if self.related_event_id is not None: error_data["related_event_id"] = self.related_event_id - return { + event = { "event_id": self.event_id, "event_type": "error", "timestamp": self.timestamp.isoformat(), "model_id": self.model_id, "error": error_data, } + from wildedge.events.common import add_optional_fields + + add_optional_fields( + event, + { + "trace_id": self.trace_id, + "span_id": self.span_id, + "parent_span_id": self.parent_span_id, + "run_id": self.run_id, + "agent_id": self.agent_id, + "step_index": self.step_index, + "conversation_id": self.conversation_id, + "attributes": self.attributes, + }, + ) + return event diff --git a/wildedge/events/feedback.py b/wildedge/events/feedback.py index 650904b..3634123 100644 --- a/wildedge/events/feedback.py +++ b/wildedge/events/feedback.py @@ -24,6 +24,14 @@ class FeedbackEvent: feedback_type: str | FeedbackType delay_ms: int | None = None edit_distance: int | None = None + trace_id: str | None = None + span_id: str | None = None + parent_span_id: str | None = None + run_id: str | None = None + agent_id: str | None = None + step_index: int | None = None + conversation_id: str | None = None + attributes: dict[str, Any] | None = None event_id: str = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) @@ -42,10 +50,26 @@ def to_dict(self) -> dict: if self.edit_distance is not None: feedback_data["edit_distance"] = self.edit_distance - return { + event = { "event_id": self.event_id, "event_type": "feedback", "timestamp": self.timestamp.isoformat(), "model_id": self.model_id, "feedback": feedback_data, } + from wildedge.events.common import add_optional_fields + + add_optional_fields( + event, + { + "trace_id": self.trace_id, + "span_id": self.span_id, + "parent_span_id": self.parent_span_id, + "run_id": self.run_id, + "agent_id": self.agent_id, + "step_index": self.step_index, + "conversation_id": self.conversation_id, + "attributes": self.attributes, + }, + ) + return event diff --git a/wildedge/events/inference.py b/wildedge/events/inference.py index 0c02996..ac74a86 100644 --- a/wildedge/events/inference.py +++ b/wildedge/events/inference.py @@ -304,6 +304,14 @@ class InferenceEvent: generation_config: GenerationConfig | None = None hardware: HardwareContext | None = None api_meta: ApiMeta | None = None + trace_id: str | None = None + span_id: str | None = None + parent_span_id: str | None = None + run_id: str | None = None + agent_id: str | None = None + step_index: int | None = None + conversation_id: str | None = None + attributes: dict[str, Any] | None = None event_id: str = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) inference_id: str = field(default_factory=lambda: str(uuid.uuid4())) @@ -333,10 +341,26 @@ def to_dict(self) -> dict: if self.api_meta is not None: inference_data["api_meta"] = self.api_meta.to_dict() - return { + event = { "event_id": self.event_id, "event_type": "inference", "timestamp": self.timestamp.isoformat(), "model_id": self.model_id, "inference": inference_data, } + from wildedge.events.common import add_optional_fields + + add_optional_fields( + event, + { + "trace_id": self.trace_id, + "span_id": self.span_id, + "parent_span_id": self.parent_span_id, + "run_id": self.run_id, + "agent_id": self.agent_id, + "step_index": self.step_index, + "conversation_id": self.conversation_id, + "attributes": self.attributes, + }, + ) + return event diff --git a/wildedge/events/model_download.py b/wildedge/events/model_download.py index 784c62f..b889909 100644 --- a/wildedge/events/model_download.py +++ b/wildedge/events/model_download.py @@ -49,6 +49,14 @@ class ModelDownloadEvent: cdn_edge: str | None = None error_code: str | None = None adapter: AdapterDownload | None = None + trace_id: str | None = None + span_id: str | None = None + parent_span_id: str | None = None + run_id: str | None = None + agent_id: str | None = None + step_index: int | None = None + conversation_id: str | None = None + attributes: dict[str, Any] | None = None event_id: str = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) @@ -83,10 +91,26 @@ def to_dict(self) -> dict: if self.adapter is not None: download_data["adapter"] = self.adapter.to_dict() - return { + event = { "event_id": self.event_id, "event_type": "model_download", "timestamp": self.timestamp.isoformat(), "model_id": self.model_id, "download": download_data, } + from wildedge.events.common import add_optional_fields + + add_optional_fields( + event, + { + "trace_id": self.trace_id, + "span_id": self.span_id, + "parent_span_id": self.parent_span_id, + "run_id": self.run_id, + "agent_id": self.agent_id, + "step_index": self.step_index, + "conversation_id": self.conversation_id, + "attributes": self.attributes, + }, + ) + return event diff --git a/wildedge/events/model_load.py b/wildedge/events/model_load.py index 9fae742..d9d4849 100644 --- a/wildedge/events/model_load.py +++ b/wildedge/events/model_load.py @@ -55,6 +55,14 @@ class ModelLoadEvent: cold_start: bool | None = None compile_time_ms: int | None = None adapter: AdapterLoad | None = None + trace_id: str | None = None + span_id: str | None = None + parent_span_id: str | None = None + run_id: str | None = None + agent_id: str | None = None + step_index: int | None = None + conversation_id: str | None = None + attributes: dict[str, Any] | None = None event_id: str = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) @@ -84,10 +92,26 @@ def to_dict(self) -> dict: if self.adapter is not None: load_data["adapter"] = self.adapter.to_dict() - return { + event = { "event_id": self.event_id, "event_type": "model_load", "timestamp": self.timestamp.isoformat(), "model_id": self.model_id, "load": load_data, } + from wildedge.events.common import add_optional_fields + + add_optional_fields( + event, + { + "trace_id": self.trace_id, + "span_id": self.span_id, + "parent_span_id": self.parent_span_id, + "run_id": self.run_id, + "agent_id": self.agent_id, + "step_index": self.step_index, + "conversation_id": self.conversation_id, + "attributes": self.attributes, + }, + ) + return event diff --git a/wildedge/events/model_unload.py b/wildedge/events/model_unload.py index 9dab481..ec3b316 100644 --- a/wildedge/events/model_unload.py +++ b/wildedge/events/model_unload.py @@ -14,6 +14,14 @@ class ModelUnloadEvent: memory_freed_bytes: int | None = None peak_memory_bytes: int | None = None uptime_ms: int | None = None + trace_id: str | None = None + span_id: str | None = None + parent_span_id: str | None = None + run_id: str | None = None + agent_id: str | None = None + step_index: int | None = None + conversation_id: str | None = None + attributes: dict[str, Any] | None = None event_id: str = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) @@ -30,10 +38,26 @@ def to_dict(self) -> dict: if v is not None: unload_data[k] = v - return { + event = { "event_id": self.event_id, "event_type": "model_unload", "timestamp": self.timestamp.isoformat(), "model_id": self.model_id, "unload": unload_data, } + from wildedge.events.common import add_optional_fields + + add_optional_fields( + event, + { + "trace_id": self.trace_id, + "span_id": self.span_id, + "parent_span_id": self.parent_span_id, + "run_id": self.run_id, + "agent_id": self.agent_id, + "step_index": self.step_index, + "conversation_id": self.conversation_id, + "attributes": self.attributes, + }, + ) + return event diff --git a/wildedge/events/span.py b/wildedge/events/span.py new file mode 100644 index 0000000..c84e233 --- /dev/null +++ b/wildedge/events/span.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +import uuid +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any + +from wildedge.events.common import add_optional_fields + + +@dataclass +class SpanEvent: + kind: str + name: str + duration_ms: int + status: str + model_id: str | None = None + input_summary: str | None = None + output_summary: str | None = None + span_attributes: dict[str, Any] | None = None + trace_id: str | None = None + span_id: str | None = None + parent_span_id: str | None = None + run_id: str | None = None + agent_id: str | None = None + step_index: int | None = None + conversation_id: str | None = None + attributes: dict[str, Any] | None = None + event_id: str = field(default_factory=lambda: str(uuid.uuid4())) + timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + + def to_dict(self) -> dict: + span_data: dict[str, Any] = { + "kind": self.kind, + "name": self.name, + "duration_ms": self.duration_ms, + "status": self.status, + } + if self.input_summary is not None: + span_data["input_summary"] = self.input_summary + if self.output_summary is not None: + span_data["output_summary"] = self.output_summary + if self.span_attributes is not None: + span_data["attributes"] = self.span_attributes + + event = { + "event_id": self.event_id, + "event_type": "span", + "timestamp": self.timestamp.isoformat(), + "model_id": self.model_id, + "span": span_data, + } + add_optional_fields( + event, + { + "trace_id": self.trace_id, + "span_id": self.span_id, + "parent_span_id": self.parent_span_id, + "run_id": self.run_id, + "agent_id": self.agent_id, + "step_index": self.step_index, + "conversation_id": self.conversation_id, + "attributes": self.attributes, + }, + ) + return event diff --git a/wildedge/model.py b/wildedge/model.py index de7e7b1..14f2c66 100644 --- a/wildedge/model.py +++ b/wildedge/model.py @@ -28,6 +28,7 @@ from wildedge.logging import logger from wildedge.platforms import capture_hardware, is_sampling from wildedge.platforms.hardware import HardwareContext +from wildedge.tracing import merge_correlation_fields @dataclass @@ -68,8 +69,26 @@ def track_load( accelerator: str | None = None, success: bool = True, error_code: str | None = None, + trace_id: str | None = None, + span_id: str | None = None, + parent_span_id: str | None = None, + run_id: str | None = None, + agent_id: str | None = None, + step_index: int | None = None, + conversation_id: str | None = None, + attributes: dict[str, Any] | None = None, **kwargs: Any, ) -> None: + correlation = merge_correlation_fields( + trace_id=trace_id, + span_id=span_id, + parent_span_id=parent_span_id, + run_id=run_id, + agent_id=agent_id, + step_index=step_index, + conversation_id=conversation_id, + attributes=attributes, + ) event = ModelLoadEvent( model_id=self.model_id, duration_ms=duration_ms, @@ -77,6 +96,7 @@ def track_load( accelerator=accelerator or self.detected_accelerator, success=success, error_code=error_code, + **correlation, **kwargs, ) self.publish(event.to_dict()) @@ -89,7 +109,25 @@ def track_unload( memory_freed_bytes: int | None = None, peak_memory_bytes: int | None = None, uptime_ms: int | None = None, + trace_id: str | None = None, + span_id: str | None = None, + parent_span_id: str | None = None, + run_id: str | None = None, + agent_id: str | None = None, + step_index: int | None = None, + conversation_id: str | None = None, + attributes: dict[str, Any] | None = None, ) -> None: + correlation = merge_correlation_fields( + trace_id=trace_id, + span_id=span_id, + parent_span_id=parent_span_id, + run_id=run_id, + agent_id=agent_id, + step_index=step_index, + conversation_id=conversation_id, + attributes=attributes, + ) event = ModelUnloadEvent( model_id=self.model_id, duration_ms=duration_ms, @@ -97,6 +135,7 @@ def track_unload( memory_freed_bytes=memory_freed_bytes, peak_memory_bytes=peak_memory_bytes, uptime_ms=uptime_ms, + **correlation, ) self.publish(event.to_dict()) @@ -111,8 +150,26 @@ def track_download( resumed: bool, cache_hit: bool, success: bool, + trace_id: str | None = None, + span_id: str | None = None, + parent_span_id: str | None = None, + run_id: str | None = None, + agent_id: str | None = None, + step_index: int | None = None, + conversation_id: str | None = None, + attributes: dict[str, Any] | None = None, **kwargs: Any, ) -> None: + correlation = merge_correlation_fields( + trace_id=trace_id, + span_id=span_id, + parent_span_id=parent_span_id, + run_id=run_id, + agent_id=agent_id, + step_index=step_index, + conversation_id=conversation_id, + attributes=attributes, + ) event = ModelDownloadEvent( model_id=self.model_id, source_url=source_url, @@ -124,6 +181,7 @@ def track_download( resumed=resumed, cache_hit=cache_hit, success=success, + **correlation, **kwargs, ) self.publish(event.to_dict()) @@ -146,9 +204,27 @@ def track_inference( generation_config: GenerationConfig | None = None, hardware: HardwareContext | None = None, api_meta: ApiMeta | None = None, + trace_id: str | None = None, + span_id: str | None = None, + parent_span_id: str | None = None, + run_id: str | None = None, + agent_id: str | None = None, + step_index: int | None = None, + conversation_id: str | None = None, + attributes: dict[str, Any] | None = None, ) -> str: if hardware is None and is_sampling(): hardware = capture_hardware() + correlation = merge_correlation_fields( + trace_id=trace_id, + span_id=span_id, + parent_span_id=parent_span_id, + run_id=run_id, + agent_id=agent_id, + step_index=step_index, + conversation_id=conversation_id, + attributes=attributes, + ) event = InferenceEvent( model_id=self.model_id, duration_ms=duration_ms, @@ -162,6 +238,7 @@ def track_inference( generation_config=generation_config, hardware=hardware, api_meta=api_meta, + **correlation, ) self.last_inference_id = event.inference_id self.publish(event.to_dict()) @@ -174,13 +251,32 @@ def track_feedback( *, delay_ms: int | None = None, edit_distance: int | None = None, + trace_id: str | None = None, + span_id: str | None = None, + parent_span_id: str | None = None, + run_id: str | None = None, + agent_id: str | None = None, + step_index: int | None = None, + conversation_id: str | None = None, + attributes: dict[str, Any] | None = None, ) -> None: + correlation = merge_correlation_fields( + trace_id=trace_id, + span_id=span_id, + parent_span_id=parent_span_id, + run_id=run_id, + agent_id=agent_id, + step_index=step_index, + conversation_id=conversation_id, + attributes=attributes, + ) event = FeedbackEvent( model_id=self.model_id, related_inference_id=related_inference_id, feedback_type=feedback_type, delay_ms=delay_ms, edit_distance=edit_distance, + **correlation, ) self.publish(event.to_dict()) @@ -205,13 +301,32 @@ def track_error( error_message: str | None = None, stack_trace_hash: str | None = None, related_event_id: str | None = None, + trace_id: str | None = None, + span_id: str | None = None, + parent_span_id: str | None = None, + run_id: str | None = None, + agent_id: str | None = None, + step_index: int | None = None, + conversation_id: str | None = None, + attributes: dict[str, Any] | None = None, ) -> None: + correlation = merge_correlation_fields( + trace_id=trace_id, + span_id=span_id, + parent_span_id=parent_span_id, + run_id=run_id, + agent_id=agent_id, + step_index=step_index, + conversation_id=conversation_id, + attributes=attributes, + ) event = ErrorEvent( model_id=self.model_id, error_code=error_code, error_message=error_message, stack_trace_hash=stack_trace_hash, related_event_id=related_event_id, + **correlation, ) self.publish(event.to_dict()) diff --git a/wildedge/tracing.py b/wildedge/tracing.py new file mode 100644 index 0000000..5398e27 --- /dev/null +++ b/wildedge/tracing.py @@ -0,0 +1,161 @@ +from __future__ import annotations + +import contextlib +import contextvars +import uuid +from dataclasses import dataclass +from typing import Any + + +@dataclass(frozen=True) +class TraceContext: + trace_id: str + run_id: str | None = None + agent_id: str | None = None + conversation_id: str | None = None + attributes: dict[str, Any] | None = None + + +@dataclass(frozen=True) +class SpanContext: + span_id: str + parent_span_id: str | None = None + step_index: int | None = None + attributes: dict[str, Any] | None = None + + +_TRACE_CTX: contextvars.ContextVar[TraceContext | None] = contextvars.ContextVar( + "wildedge_trace_ctx", default=None +) +_SPAN_CTX: contextvars.ContextVar[SpanContext | None] = contextvars.ContextVar( + "wildedge_span_ctx", default=None +) + + +def get_trace_context() -> TraceContext | None: + return _TRACE_CTX.get() + + +def get_span_context() -> SpanContext | None: + return _SPAN_CTX.get() + + +def set_trace_context(ctx: TraceContext) -> contextvars.Token: + return _TRACE_CTX.set(ctx) + + +def reset_trace_context(token: contextvars.Token) -> None: + _TRACE_CTX.reset(token) + + +def set_span_context(ctx: SpanContext) -> contextvars.Token: + return _SPAN_CTX.set(ctx) + + +def reset_span_context(token: contextvars.Token) -> None: + _SPAN_CTX.reset(token) + + +@contextlib.contextmanager +def trace_context( + *, + trace_id: str | None = None, + run_id: str | None = None, + agent_id: str | None = None, + conversation_id: str | None = None, + attributes: dict[str, Any] | None = None, +): + if trace_id is None: + trace_id = str(uuid.uuid4()) + token = set_trace_context( + TraceContext( + trace_id=trace_id, + run_id=run_id, + agent_id=agent_id, + conversation_id=conversation_id, + attributes=attributes, + ) + ) + try: + yield get_trace_context() + finally: + reset_trace_context(token) + + +@contextlib.contextmanager +def span_context( + *, + span_id: str | None = None, + parent_span_id: str | None = None, + step_index: int | None = None, + attributes: dict[str, Any] | None = None, +): + if span_id is None: + span_id = str(uuid.uuid4()) + if parent_span_id is None: + current = get_span_context() + parent_span_id = current.span_id if current else None + token = set_span_context( + SpanContext( + span_id=span_id, + parent_span_id=parent_span_id, + step_index=step_index, + attributes=attributes, + ) + ) + try: + yield get_span_context() + finally: + reset_span_context(token) + + +def _merge_attributes(*candidates: dict[str, Any] | None) -> dict[str, Any] | None: + merged: dict[str, Any] = {} + for attrs in candidates: + if not attrs: + continue + merged.update(attrs) + return merged or None + + +def merge_correlation_fields( + *, + trace_id: str | None = None, + span_id: str | None = None, + parent_span_id: str | None = None, + run_id: str | None = None, + agent_id: str | None = None, + step_index: int | None = None, + conversation_id: str | None = None, + attributes: dict[str, Any] | None = None, +) -> dict[str, Any]: + trace = get_trace_context() + span = get_span_context() + + resolved_trace_id = trace_id or (trace.trace_id if trace else None) + resolved_span_id = span_id + resolved_parent_span_id = parent_span_id or (span.span_id if span else None) + resolved_run_id = run_id or (trace.run_id if trace else None) + resolved_agent_id = agent_id or (trace.agent_id if trace else None) + resolved_step_index = ( + step_index if step_index is not None else (span.step_index if span else None) + ) + resolved_conversation_id = conversation_id or ( + trace.conversation_id if trace else None + ) + resolved_attributes = _merge_attributes( + trace.attributes if trace else None, + span.attributes if span else None, + attributes, + ) + + return { + "trace_id": resolved_trace_id, + "span_id": resolved_span_id, + "parent_span_id": resolved_parent_span_id, + "run_id": resolved_run_id, + "agent_id": resolved_agent_id, + "step_index": resolved_step_index, + "conversation_id": resolved_conversation_id, + "attributes": resolved_attributes, + } From 1f9bac11d6a5241885de5af111c9411f6dff2e8e Mon Sep 17 00:00:00 2001 From: Piotr Duda Date: Sat, 21 Mar 2026 21:36:41 +0100 Subject: [PATCH 2/3] cleanup --- .gitignore | 3 +- docs/manual-tracking.md | 28 ++- examples/agentic_example.py | 154 ++++++++++++ .../research/Supporting Agentic Workflows.md | 150 ------------ internal/research/Trace | 0 internal/research/Trace Span API Tweaks.md | 66 ----- tests/test_client.py | 2 + tests/test_event_serialization.py | 29 ++- tests/test_tracing.py | 10 +- wildedge/__init__.py | 19 +- wildedge/client.py | 229 ++++++++++-------- wildedge/decorators.py | 56 ++--- wildedge/events/error.py | 4 +- wildedge/events/feedback.py | 4 +- wildedge/events/inference.py | 4 +- wildedge/events/model_download.py | 4 +- wildedge/events/model_load.py | 4 +- wildedge/events/model_unload.py | 4 +- wildedge/events/span.py | 31 ++- wildedge/model.py | 38 +-- wildedge/tracing.py | 34 +-- 21 files changed, 449 insertions(+), 424 deletions(-) create mode 100644 examples/agentic_example.py delete mode 100644 internal/research/Supporting Agentic Workflows.md delete mode 100644 internal/research/Trace delete mode 100644 internal/research/Trace Span API Tweaks.md diff --git a/.gitignore b/.gitignore index 64d49ae..468c518 100644 --- a/.gitignore +++ b/.gitignore @@ -213,4 +213,5 @@ marimo/_lsp/ __marimo__/ # Streamlit -.streamlit/secrets.toml \ No newline at end of file +.streamlit/secrets.toml +internal/ diff --git a/docs/manual-tracking.md b/docs/manual-tracking.md index d908d33..0fa227e 100644 --- a/docs/manual-tracking.md +++ b/docs/manual-tracking.md @@ -238,17 +238,35 @@ client.track_span( You can also attach optional correlation fields (`trace_id`, `span_id`, `parent_span_id`, `run_id`, `agent_id`, `step_index`, `conversation_id`) to any event by passing them into `track_inference`, `track_error`, `track_feedback`, -or `track_span`. +or `track_span`. Use `context=` for correlation attributes shared across events. ### Trace context helpers -Use `trace_context()` and `span_context()` to auto-populate correlation fields -for all events emitted inside the block: +Use `client.trace()` and `client.span()` to auto-populate correlation fields for +all events emitted inside the block. `client.span()` times the block and emits a +span event on exit: ```python import wildedge +from wildedge.timing import Timer + +client = wildedge.init() +handle = client.register_model(my_model, model_id="my-org/my-model") + +with client.trace(run_id="run-123", agent_id="agent-1"): + with client.span(kind="agent_step", name="plan", step_index=1): + with Timer() as t: + result = my_model(prompt) + handle.track_inference(duration_ms=t.elapsed_ms, input_modality="text", output_modality="generation") +``` + +If you need to set correlation fields without emitting a span event, use the +lower-level `span_context()` directly: -with wildedge.trace_context(run_id="run-123", agent_id="agent-1"): +```python +with client.trace(run_id="run-123", agent_id="agent-1"): with wildedge.span_context(step_index=1): - handle.track_inference(duration_ms=12) + with Timer() as t: + result = my_model(prompt) + handle.track_inference(duration_ms=t.elapsed_ms, input_modality="text", output_modality="generation") ``` diff --git a/examples/agentic_example.py b/examples/agentic_example.py new file mode 100644 index 0000000..6560b30 --- /dev/null +++ b/examples/agentic_example.py @@ -0,0 +1,154 @@ +# /// script +# requires-python = ">=3.10" +# dependencies = ["wildedge-sdk", "openai"] +# +# [tool.uv.sources] +# wildedge-sdk = { path = "..", editable = true } +# /// +"""Agentic workflow example with tool use. + +Demonstrates WildEdge tracing for a simple agent that: + - Runs within a trace (one per agent session) + - Wraps each reasoning step in an agent_step span + - Wraps each tool call in a tool span + - Tracks LLM inference automatically via the OpenAI integration + +Run with: uv run agentic_example.py +Requires: OPENAI_API_KEY environment variable. Set WILDEDGE_DSN to send events. +""" + +import json + +from openai import OpenAI + +import wildedge + +we = wildedge.init( + app_version="1.0.0", + integrations="openai", +) + +openai_client = OpenAI() + +# --- Tools ------------------------------------------------------------------- + +TOOLS = [ + { + "type": "function", + "function": { + "name": "get_weather", + "description": "Return current weather for a city.", + "parameters": { + "type": "object", + "properties": { + "city": {"type": "string"}, + }, + "required": ["city"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "calculator", + "description": "Evaluate a simple arithmetic expression.", + "parameters": { + "type": "object", + "properties": { + "expression": {"type": "string"}, + }, + "required": ["expression"], + }, + }, + }, +] + + +def get_weather(city: str) -> str: + # Stub: replace with a real weather API call. + return json.dumps({"city": city, "temperature_c": 18, "condition": "partly cloudy"}) + + +def calculator(expression: str) -> str: + try: + result = eval(expression, {"__builtins__": {}}) # noqa: S307 + return json.dumps({"expression": expression, "result": result}) + except Exception as e: + return json.dumps({"error": str(e)}) + + +TOOL_HANDLERS = { + "get_weather": get_weather, + "calculator": calculator, +} + + +# --- Agent loop -------------------------------------------------------------- + + +def call_tool(name: str, arguments: dict) -> str: + with we.span( + kind="tool", + name=name, + input_summary=json.dumps(arguments)[:200], + ) as span: + result = TOOL_HANDLERS[name](**arguments) + span.output_summary = result[:200] + return result + + +def run_agent(task: str, step_index: int, messages: list) -> str: + messages.append({"role": "user", "content": task}) + + while True: + with we.span( + kind="agent_step", + name="reason", + step_index=step_index, + input_summary=task[:200], + ) as span: + response = openai_client.chat.completions.create( + model="gpt-4o", + messages=messages, + tools=TOOLS, + tool_choice="auto", + ) + choice = response.choices[0] + span.output_summary = choice.finish_reason + + messages.append(choice.message) + + if choice.finish_reason == "tool_calls": + step_index += 1 + for tool_call in choice.message.tool_calls: + arguments = json.loads(tool_call.function.arguments) + result = call_tool(tool_call.function.name, arguments) + messages.append( + { + "role": "tool", + "tool_call_id": tool_call.id, + "content": result, + } + ) + else: + return choice.message.content + + +# --- Main -------------------------------------------------------------------- + +TASKS = [ + "What's the weather like in Tokyo, and what is 42 * 18?", + "Is it warmer in Paris or Berlin right now?", +] + +system_prompt = "You are a helpful assistant. Use tools when needed." +messages = [{"role": "system", "content": system_prompt}] + +with we.trace(agent_id="demo-agent", run_id="example-run-001"): + for i, task in enumerate(TASKS, start=1): + print(f"\nTask {i}: {task}") + reply = run_agent(task, step_index=i, messages=messages) + print(f"Reply: {reply}") + +we.flush() +print("\nDone. Events flushed to WildEdge.") diff --git a/internal/research/Supporting Agentic Workflows.md b/internal/research/Supporting Agentic Workflows.md deleted file mode 100644 index c03d2e3..0000000 --- a/internal/research/Supporting Agentic Workflows.md +++ /dev/null @@ -1,150 +0,0 @@ -# Supporting Agentic Workflows - -## Summary -WildEdge already captures model load/download/inference events and basic errors. Agentic systems introduce multi-step plans, tool calls, retrieval, memory, and feedback loops that require traceability across steps and richer metadata. This document proposes a full, production-grade extension to the SDK and protocol to support agentic workflows while remaining backward compatible. - -## Goals -- Support end-to-end tracing of agent runs across LLM calls, tools, retrieval, and memory. -- Provide a minimal, ergonomic SDK API for trace/span creation and propagation. -- Preserve existing integrations and CLI behavior with no required app changes. -- Allow vendor-agnostic metadata attachment without frequent schema changes. -- Enable safe payload controls for privacy and compliance. - -## Non-Goals -- Build a full UI or backend visualization in this repository. -- Require user code changes for existing integrations. -- Enforce a single agent framework or orchestration style. - -## Current SDK Snapshot -- Events: model_load, model_unload, model_download, inference, error, feedback. -- Protocol envelope: batch with protocol_version, device, models, session_id, events. -- Integrations: onnx, gguf, openai, timm, tensorflow, transformers, ultralytics, mlx, torch/keras noop. -- Manual tracking: register_model + track_inference, track_error, track_feedback. -- CLI: wildedge run autoloads runtime and patches integrations. - -## Requirements For Agentic Monitoring -- Correlate events into a single run with a DAG of spans. -- Capture agent steps and tool calls that are not model inference. -- Associate retrieval results, memory writes, and guardrails with the same run. -- Support async workloads and nested spans safely. -- Keep payload sizes and sensitive data under control. - -## Proposed Protocol Extensions -### 1) Correlation Fields (Optional on All Events) -- trace_id: string -- span_id: string -- parent_span_id: string -- run_id: string -- agent_id: string -- step_index: integer -- conversation_id: string - -### 2) Generic Attributes Map (Optional on All Events) -- attributes: object (string keys, JSON-serializable values) - -### 3) New Span Event Type -- event_type: "span" -- span.kind: "agent_step" | "tool" | "retrieval" | "memory" | "router" | "guardrail" | "cache" | "eval" | "custom" -- span.name: string -- span.duration_ms: integer -- span.status: "ok" | "error" -- span.input_summary: string (optional, truncated) -- span.output_summary: string (optional, truncated) -- span.attributes: object - -### 4) Optional Usage Block For Inference Events -- inference.usage.tokens_in: integer -- inference.usage.tokens_out: integer -- inference.usage.cached_input_tokens: integer -- inference.usage.reasoning_tokens_out: integer -- inference.usage.cost_usd: number - -### 5) Protocol Versioning -- Bump PROTOCOL_VERSION to 1.1 -- All new fields remain optional and are ignored by older backends. - -## SDK API Additions -### Trace Context -- wildedge.trace(name: str, run_id: str | None, agent_id: str | None, conversation_id: str | None) -- wildedge.span(kind: str, name: str, attributes: dict | None) -- Context propagation via contextvars for async support. - -### Span Emission -- Span context managers emit a SpanEvent on exit with duration and status. -- Current trace/span context attaches to all events automatically. - -### Example -```python -import wildedge - -with wildedge.trace("puzzle_run", run_id="run-123", agent_id="puzzle-agent"): - with wildedge.span("agent_step", name="analyze_board", attributes={"iteration": 4}): - result = planner() - with wildedge.span("tool", name="rotate_cell", attributes={"col": 2, "row": 1}): - apply_move() -``` - -## Integration Strategy -### Phase 1: Core SDK -- Add trace/span context and SpanEvent. -- Attach correlation fields to all existing events. -- Add generic attributes map. - -### Phase 2: Framework Integrations -- langchain / langgraph -- llamaindex -- autogen -- crewai -- dspy -- litellm (covers many providers) - -### Integration Behavior -- Wrap framework callbacks to create spans for steps, tools, retrieval, memory. -- Reuse existing inference tracking when LLM calls happen under the hood. -- Map framework-specific fields into attributes without schema changes. - -## CLI Impact -- No changes required to wildedge run. -- New SDK features activate automatically when integrations emit spans. - -## Privacy, Security, and Payload Controls -- Add settings to disable input/output capture globally. -- Redaction hooks for PII or secrets. -- Truncation and hashing for large content fields. -- Clear documentation and defaults that favor safety. - -## Compatibility and Migration -- Existing clients continue to work unchanged. -- Server must accept optional new fields. -- Unknown event types should be ignored safely by older servers. - -## Testing Plan -- Unit tests for trace context propagation and span emission. -- Integration tests for nested spans with async workloads. -- Golden tests for event JSON serialization and protocol versioning. - -## Risks -- High cardinality from unbounded attributes. -- Large payloads from agent inputs/outputs. -- Trace context leaks across tasks if not isolated. - -## Milestones -### M1: Protocol and Core SDK -- Add correlation fields and attributes map. -- Add SpanEvent and tracing API. -- Update documentation and examples. - -### M2: First Framework Integration -- Implement langchain or llamaindex integration. -- Validate span graph integrity. - -### M3: Broader Ecosystem -- Add autogen, crewai, dspy, litellm. -- Add configurable redaction and capture policies. - -## Open Questions -- Should SpanEvent share top-level shape with InferenceEvent or be its own payload? -- Should span.status allow more than ok/error? -- Should run_id be auto-generated when a trace starts without explicit id? -- Should attributes enforce a key whitelist or allow arbitrary JSON? - diff --git a/internal/research/Trace b/internal/research/Trace deleted file mode 100644 index e69de29..0000000 diff --git a/internal/research/Trace Span API Tweaks.md b/internal/research/Trace Span API Tweaks.md deleted file mode 100644 index 826b93c..0000000 --- a/internal/research/Trace Span API Tweaks.md +++ /dev/null @@ -1,66 +0,0 @@ -# API Tweak Proposal: Trace and Span UX - -## Background -The current API mixes module-level context managers (trace_context, span_context) with a client method for span emission (WildEdge.span). This works, but it is slightly asymmetric and makes usage feel less Pythonic. There is also easy confusion between correlation attributes (event-level attributes) and span attributes (span.attributes), because the API currently exposes both "attributes" and "span_attributes". - -## Goals -- Make the API feel symmetric and Pythonic for common workflows. -- Reduce cognitive overhead for users who only need basic tracing. -- Keep event schema and protocol unchanged. -- Preserve backward compatibility with minimal code changes. - -## Proposal -### 1) Add client-level trace context -Add a client method that mirrors WildEdge.span: - -- WildEdge.trace(...) -> context manager that sets trace context -- Optional return value: TraceContext (for advanced use) - -This makes usage consistent: - -with we.trace(agent_id=..., conversation_id=...): - with we.span(...): - ... - -### 2) Clarify attributes vs correlation -Rename or alias parameters to make intent explicit: - -- WildEdge.span(..., attributes=...) refers to span attributes (stored under span.attributes) -- WildEdge.span(..., context=...) refers to correlation attributes (stored at the event root) - -Backwards compatible behavior: -- span_attributes remains accepted as an alias for attributes -- attributes on trace_context remains accepted as an alias for context - -### 3) Optional client-level span_context -Expose WildEdge.span_context(...) as a thin wrapper over span_context for symmetry. This is a low-level API for correlation without emitting a span event. - -### 4) Optional decorator sugar (static cases) -Add @we.trace as an optional decorator for static metadata. This is explicit sugar only, not the recommended default for dynamic metadata: - -@we.trace(agent_id="agent-1") -def handle_message(...): - ... - -## Example (proposed usage) - -we = wildedge.WildEdge() -we.instrument("openai") - -with we.trace(agent_id="agent-a", conversation_id=channel): - with we.span( - kind="agent_step", - name="respond", - step_index=step_index, - attributes={"model": model}, - ): - response = client.chat.completions.create(...) - -## Migration and compatibility -- No protocol changes. -- Existing code using trace_context and span_context continues to work. -- Existing code using span_attributes continues to work. -- Add warnings only if we want to encourage migration; otherwise keep silent aliases. - -## Notes for agntr -Once WildEdge.trace exists, agntr can switch from wildedge.trace_context(...) to we.trace(...) for symmetry, and use attributes= for span attributes. diff --git a/tests/test_client.py b/tests/test_client.py index e09cb2f..3417ba8 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -42,6 +42,7 @@ def test_no_dsn_is_noop(monkeypatch, caplog): assert client.closed is True assert "no DSN configured" in caplog.text + def test_no_dsn_instrument_does_not_raise(monkeypatch): from wildedge.client import WildEdge @@ -61,6 +62,7 @@ def test_no_dsn_publish_does_not_enqueue(monkeypatch): client.publish({"event_type": "inference", "model_id": "m"}) client.queue.add.assert_not_called() + def test_batch_size_too_high(): from wildedge.client import WildEdge diff --git a/tests/test_event_serialization.py b/tests/test_event_serialization.py index cfd1655..587940b 100644 --- a/tests/test_event_serialization.py +++ b/tests/test_event_serialization.py @@ -81,9 +81,36 @@ def test_span_event_to_dict_includes_required_fields(): name="search", duration_ms=250, status="ok", - span_attributes={"provider": "custom"}, + attributes={"provider": "custom"}, ) data = event.to_dict() assert data["event_type"] == "span" assert data["span"]["kind"] == "tool" assert data["span"]["attributes"]["provider"] == "custom" + + +def test_span_event_context_serializes_under_context_key(): + event = SpanEvent( + kind="agent_step", + name="plan", + duration_ms=10, + status="ok", + context={"user_id": "u1"}, + ) + data = event.to_dict() + assert data["context"] == {"user_id": "u1"} + assert "attributes" not in data + + +def test_span_event_attributes_and_context_are_independent(): + event = SpanEvent( + kind="tool", + name="search", + duration_ms=50, + status="ok", + attributes={"provider": "custom"}, + context={"user_id": "u1"}, + ) + data = event.to_dict() + assert data["span"]["attributes"] == {"provider": "custom"} + assert data["context"] == {"user_id": "u1"} diff --git a/tests/test_tracing.py b/tests/test_tracing.py index dd72927..fef4531 100644 --- a/tests/test_tracing.py +++ b/tests/test_tracing.py @@ -21,8 +21,13 @@ def publish(event: dict) -> None: publish, ) - with trace_context(trace_id="trace-123", run_id="run-1", agent_id="agent-1"): - with span_context(span_id="span-abc", step_index=2): + with trace_context( + trace_id="trace-123", + run_id="run-1", + agent_id="agent-1", + attributes={"trace_key": "trace_val"}, + ): + with span_context(span_id="span-abc", step_index=2, attributes={"span_key": 2}): handle.track_inference(duration_ms=5) assert events[0]["trace_id"] == "trace-123" @@ -30,3 +35,4 @@ def publish(event: dict) -> None: assert events[0]["run_id"] == "run-1" assert events[0]["agent_id"] == "agent-1" assert events[0]["step_index"] == 2 + assert events[0]["attributes"] == {"trace_key": "trace_val", "span_key": 2} diff --git a/wildedge/__init__.py b/wildedge/__init__.py index 5701608..2b7993f 100644 --- a/wildedge/__init__.py +++ b/wildedge/__init__.py @@ -1,6 +1,6 @@ """WildEdge Python SDK.""" -from wildedge.client import WildEdge +from wildedge.client import SpanContextManager, WildEdge from wildedge.convenience import init from wildedge.decorators import track from wildedge.events import ( @@ -18,12 +18,19 @@ SpanEvent, TextInputMeta, ) +from wildedge.events.span import SpanKind, SpanStatus from wildedge.platforms import capture_hardware from wildedge.platforms.device_info import DeviceInfo from wildedge.platforms.hardware import HardwareContext, ThermalContext from wildedge.queue import QueuePolicy from wildedge.timing import Timer -from wildedge.tracing import span_context, trace_context +from wildedge.tracing import ( + SpanContext, + TraceContext, + get_span_context, + get_trace_context, + span_context, +) __all__ = [ "WildEdge", @@ -48,6 +55,12 @@ "FeedbackType", "ErrorCode", "Timer", - "trace_context", "span_context", + "TraceContext", + "SpanContext", + "get_trace_context", + "get_span_context", + "SpanKind", + "SpanStatus", + "SpanContextManager", ] diff --git a/wildedge/client.py b/wildedge/client.py index 5dbf275..e3a9aab 100644 --- a/wildedge/client.py +++ b/wildedge/client.py @@ -13,6 +13,7 @@ from wildedge.consumer import Consumer from wildedge.dead_letters import DeadLetterStore from wildedge.events import SpanEvent +from wildedge.events.span import SpanKind, SpanStatus from wildedge.hubs.base import BaseHubTracker from wildedge.hubs.huggingface import HuggingFaceHubTracker from wildedge.hubs.registry import supported_hubs @@ -42,10 +43,11 @@ from wildedge.timing import Timer, elapsed_ms from wildedge.tracing import ( SpanContext, + _merge_correlation_fields, + _reset_span_context, + _set_span_context, get_span_context, - merge_correlation_fields, - reset_span_context, - set_span_context, + trace_context, ) from wildedge.transmitter import Transmitter @@ -111,6 +113,97 @@ def parse_dsn(dsn: str) -> tuple[str, str, str]: ] +class SpanContextManager: + def __init__( + self, + client: WildEdge, + *, + kind: SpanKind, + name: str, + status: SpanStatus = "ok", + model_id: str | None = None, + input_summary: str | None = None, + output_summary: str | None = None, + attributes: dict[str, Any] | None = None, + trace_id: str | None = None, + span_id: str | None = None, + parent_span_id: str | None = None, + run_id: str | None = None, + agent_id: str | None = None, + step_index: int | None = None, + conversation_id: str | None = None, + context: dict[str, Any] | None = None, + ): + self._client = client + self.kind = kind + self.name = name + self.status = status + self.model_id = model_id + self.input_summary = input_summary + self.output_summary = output_summary + self.attributes = attributes + self.trace_id = trace_id + self.span_id = span_id + self.parent_span_id = parent_span_id + self.run_id = run_id + self.agent_id = agent_id + self.step_index = step_index + self.conversation_id = conversation_id + self.context = context + self._t0: float | None = None + self._span_token = None + + def __enter__(self): + self._t0 = time.perf_counter() + if self.span_id is None: + self.span_id = str(uuid.uuid4()) + if self.parent_span_id is None: + current = get_span_context() + self.parent_span_id = current.span_id if current else None + self._span_token = _set_span_context( + SpanContext( + span_id=self.span_id, + parent_span_id=self.parent_span_id, + step_index=self.step_index, + attributes=self.context, + ) + ) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self._t0 is None: + return False + duration_ms = elapsed_ms(self._t0) + status = "error" if exc_type else self.status + self._client.track_span( + kind=self.kind, + name=self.name, + duration_ms=duration_ms, + status=status, + model_id=self.model_id, + input_summary=self.input_summary, + output_summary=self.output_summary, + attributes=self.attributes, + trace_id=self.trace_id, + span_id=self.span_id, + parent_span_id=self.parent_span_id, + run_id=self.run_id, + agent_id=self.agent_id, + step_index=self.step_index, + conversation_id=self.conversation_id, + context=self.context, + ) + if self._span_token is not None: + _reset_span_context(self._span_token) + return False + + async def __aenter__(self): + return self.__enter__() + + async def __aexit__(self, exc_type, exc_val, exc_tb): + return self.__exit__(exc_type, exc_val, exc_tb) + + class WildEdge: """ WildEdge on-device ML monitoring client. @@ -389,100 +482,34 @@ def register_model( return handle - class _SpanContextManager: - def __init__( - self, - client: WildEdge, - *, - kind: str, - name: str, - status: str = "ok", - model_id: str | None = None, - input_summary: str | None = None, - output_summary: str | None = None, - span_attributes: dict[str, Any] | None = None, - trace_id: str | None = None, - span_id: str | None = None, - parent_span_id: str | None = None, - run_id: str | None = None, - agent_id: str | None = None, - step_index: int | None = None, - conversation_id: str | None = None, - attributes: dict[str, Any] | None = None, - ): - self._client = client - self._kind = kind - self._name = name - self._status = status - self._model_id = model_id - self._input_summary = input_summary - self._output_summary = output_summary - self._span_attributes = span_attributes - self._trace_id = trace_id - self._span_id = span_id - self._parent_span_id = parent_span_id - self._run_id = run_id - self._agent_id = agent_id - self._step_index = step_index - self._conversation_id = conversation_id - self._attributes = attributes - self._t0: float | None = None - self._span_token = None - - def __enter__(self): - self._t0 = time.perf_counter() - if self._span_id is None: - self._span_id = str(uuid.uuid4()) - if self._parent_span_id is None: - current = get_span_context() - self._parent_span_id = current.span_id if current else None - self._span_token = set_span_context( - SpanContext( - span_id=self._span_id, - parent_span_id=self._parent_span_id, - step_index=self._step_index, - attributes=self._attributes, - ) - ) - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - if self._t0 is None: - return False - duration_ms = elapsed_ms(self._t0) - status = "error" if exc_type else self._status - self._client.track_span( - kind=self._kind, - name=self._name, - duration_ms=duration_ms, - status=status, - model_id=self._model_id, - input_summary=self._input_summary, - output_summary=self._output_summary, - span_attributes=self._span_attributes, - trace_id=self._trace_id, - span_id=self._span_id, - parent_span_id=self._parent_span_id, - run_id=self._run_id, - agent_id=self._agent_id, - step_index=self._step_index, - conversation_id=self._conversation_id, - attributes=self._attributes, - ) - if self._span_token is not None: - reset_span_context(self._span_token) - return False + def trace( + self, + *, + trace_id: str | None = None, + run_id: str | None = None, + agent_id: str | None = None, + conversation_id: str | None = None, + attributes: dict[str, Any] | None = None, + ): + """Context manager that sets trace correlation fields.""" + return trace_context( + trace_id=trace_id, + run_id=run_id, + agent_id=agent_id, + conversation_id=conversation_id, + attributes=attributes, + ) def span( self, *, - kind: str, + kind: SpanKind, name: str, - status: str = "ok", + status: SpanStatus = "ok", model_id: str | None = None, input_summary: str | None = None, output_summary: str | None = None, - span_attributes: dict[str, Any] | None = None, + attributes: dict[str, Any] | None = None, trace_id: str | None = None, span_id: str | None = None, parent_span_id: str | None = None, @@ -490,10 +517,10 @@ def span( agent_id: str | None = None, step_index: int | None = None, conversation_id: str | None = None, - attributes: dict[str, Any] | None = None, - ) -> _SpanContextManager: + context: dict[str, Any] | None = None, + ) -> SpanContextManager: """Context manager that times and emits a span event.""" - return WildEdge._SpanContextManager( + return SpanContextManager( self, kind=kind, name=name, @@ -501,7 +528,7 @@ def span( model_id=model_id, input_summary=input_summary, output_summary=output_summary, - span_attributes=span_attributes, + attributes=attributes, trace_id=trace_id, span_id=span_id, parent_span_id=parent_span_id, @@ -509,20 +536,20 @@ def span( agent_id=agent_id, step_index=step_index, conversation_id=conversation_id, - attributes=attributes, + context=context, ) def track_span( self, *, - kind: str, + kind: SpanKind, name: str, duration_ms: int, - status: str = "ok", + status: SpanStatus = "ok", model_id: str | None = None, input_summary: str | None = None, output_summary: str | None = None, - span_attributes: dict[str, Any] | None = None, + attributes: dict[str, Any] | None = None, trace_id: str | None = None, span_id: str | None = None, parent_span_id: str | None = None, @@ -530,10 +557,10 @@ def track_span( agent_id: str | None = None, step_index: int | None = None, conversation_id: str | None = None, - attributes: dict[str, Any] | None = None, + context: dict[str, Any] | None = None, ) -> str: """Emit a span event for agentic workflows and tooling.""" - correlation = merge_correlation_fields( + correlation = _merge_correlation_fields( trace_id=trace_id, span_id=span_id, parent_span_id=parent_span_id, @@ -541,7 +568,7 @@ def track_span( agent_id=agent_id, step_index=step_index, conversation_id=conversation_id, - attributes=attributes, + context=context, ) if correlation["span_id"] is None: correlation["span_id"] = str(uuid.uuid4()) @@ -553,7 +580,7 @@ def track_span( model_id=model_id, input_summary=input_summary, output_summary=output_summary, - span_attributes=span_attributes, + attributes=attributes, **correlation, ) self.publish(event.to_dict()) diff --git a/wildedge/decorators.py b/wildedge/decorators.py index 47e2fef..f953159 100644 --- a/wildedge/decorators.py +++ b/wildedge/decorators.py @@ -45,7 +45,7 @@ def __init__( agent_id: str | None = None, step_index: int | None = None, conversation_id: str | None = None, - attributes: dict[str, Any] | None = None, + context: dict[str, Any] | None = None, ): self.handle = handle self.input_type = input_type @@ -54,14 +54,16 @@ def __init__( self.input_meta = input_meta self.output_meta = output_meta self.generation_config = generation_config - self.trace_id = trace_id - self.span_id = span_id - self.parent_span_id = parent_span_id - self.run_id = run_id - self.agent_id = agent_id - self.step_index = step_index - self.conversation_id = conversation_id - self.attributes = attributes + self._correlation = dict( + trace_id=trace_id, + span_id=span_id, + parent_span_id=parent_span_id, + run_id=run_id, + agent_id=agent_id, + step_index=step_index, + conversation_id=conversation_id, + context=context, + ) self.start_time: float | None = None def __call__(self, func): @@ -78,14 +80,7 @@ def wrapper(*args, **kwargs): input_meta=self.input_meta, output_meta=self.output_meta, generation_config=self.generation_config, - trace_id=self.trace_id, - span_id=self.span_id, - parent_span_id=self.parent_span_id, - run_id=self.run_id, - agent_id=self.agent_id, - step_index=self.step_index, - conversation_id=self.conversation_id, - attributes=self.attributes, + **self._correlation, ) return result except Exception as exc: @@ -93,14 +88,7 @@ def wrapper(*args, **kwargs): self.handle.track_error( error_code="UNKNOWN", error_message=str(exc)[: constants.ERROR_MSG_MAX_LEN], - trace_id=self.trace_id, - span_id=self.span_id, - parent_span_id=self.parent_span_id, - run_id=self.run_id, - agent_id=self.agent_id, - step_index=self.step_index, - conversation_id=self.conversation_id, - attributes=self.attributes, + **self._correlation, ) raise @@ -121,14 +109,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): error_message=str(exc_val)[: constants.ERROR_MSG_MAX_LEN] if exc_val else None, - trace_id=self.trace_id, - span_id=self.span_id, - parent_span_id=self.parent_span_id, - run_id=self.run_id, - agent_id=self.agent_id, - step_index=self.step_index, - conversation_id=self.conversation_id, - attributes=self.attributes, + **self._correlation, ) else: self.handle.track_inference( @@ -139,13 +120,6 @@ def __exit__(self, exc_type, exc_val, exc_tb): input_meta=self.input_meta, output_meta=self.output_meta, generation_config=self.generation_config, - trace_id=self.trace_id, - span_id=self.span_id, - parent_span_id=self.parent_span_id, - run_id=self.run_id, - agent_id=self.agent_id, - step_index=self.step_index, - conversation_id=self.conversation_id, - attributes=self.attributes, + **self._correlation, ) return False diff --git a/wildedge/events/error.py b/wildedge/events/error.py index 56501b9..e814120 100644 --- a/wildedge/events/error.py +++ b/wildedge/events/error.py @@ -30,7 +30,7 @@ class ErrorEvent: agent_id: str | None = None step_index: int | None = None conversation_id: str | None = None - attributes: dict[str, Any] | None = None + context: dict[str, Any] | None = None event_id: str = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) @@ -67,7 +67,7 @@ def to_dict(self) -> dict: "agent_id": self.agent_id, "step_index": self.step_index, "conversation_id": self.conversation_id, - "attributes": self.attributes, + "attributes": self.context, }, ) return event diff --git a/wildedge/events/feedback.py b/wildedge/events/feedback.py index 3634123..dfb6efa 100644 --- a/wildedge/events/feedback.py +++ b/wildedge/events/feedback.py @@ -31,7 +31,7 @@ class FeedbackEvent: agent_id: str | None = None step_index: int | None = None conversation_id: str | None = None - attributes: dict[str, Any] | None = None + context: dict[str, Any] | None = None event_id: str = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) @@ -69,7 +69,7 @@ def to_dict(self) -> dict: "agent_id": self.agent_id, "step_index": self.step_index, "conversation_id": self.conversation_id, - "attributes": self.attributes, + "attributes": self.context, }, ) return event diff --git a/wildedge/events/inference.py b/wildedge/events/inference.py index ac74a86..ef76ae5 100644 --- a/wildedge/events/inference.py +++ b/wildedge/events/inference.py @@ -311,7 +311,7 @@ class InferenceEvent: agent_id: str | None = None step_index: int | None = None conversation_id: str | None = None - attributes: dict[str, Any] | None = None + context: dict[str, Any] | None = None event_id: str = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) inference_id: str = field(default_factory=lambda: str(uuid.uuid4())) @@ -360,7 +360,7 @@ def to_dict(self) -> dict: "agent_id": self.agent_id, "step_index": self.step_index, "conversation_id": self.conversation_id, - "attributes": self.attributes, + "attributes": self.context, }, ) return event diff --git a/wildedge/events/model_download.py b/wildedge/events/model_download.py index b889909..2e68a3a 100644 --- a/wildedge/events/model_download.py +++ b/wildedge/events/model_download.py @@ -56,7 +56,7 @@ class ModelDownloadEvent: agent_id: str | None = None step_index: int | None = None conversation_id: str | None = None - attributes: dict[str, Any] | None = None + context: dict[str, Any] | None = None event_id: str = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) @@ -110,7 +110,7 @@ def to_dict(self) -> dict: "agent_id": self.agent_id, "step_index": self.step_index, "conversation_id": self.conversation_id, - "attributes": self.attributes, + "attributes": self.context, }, ) return event diff --git a/wildedge/events/model_load.py b/wildedge/events/model_load.py index d9d4849..e058092 100644 --- a/wildedge/events/model_load.py +++ b/wildedge/events/model_load.py @@ -62,7 +62,7 @@ class ModelLoadEvent: agent_id: str | None = None step_index: int | None = None conversation_id: str | None = None - attributes: dict[str, Any] | None = None + context: dict[str, Any] | None = None event_id: str = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) @@ -111,7 +111,7 @@ def to_dict(self) -> dict: "agent_id": self.agent_id, "step_index": self.step_index, "conversation_id": self.conversation_id, - "attributes": self.attributes, + "attributes": self.context, }, ) return event diff --git a/wildedge/events/model_unload.py b/wildedge/events/model_unload.py index ec3b316..16def90 100644 --- a/wildedge/events/model_unload.py +++ b/wildedge/events/model_unload.py @@ -21,7 +21,7 @@ class ModelUnloadEvent: agent_id: str | None = None step_index: int | None = None conversation_id: str | None = None - attributes: dict[str, Any] | None = None + context: dict[str, Any] | None = None event_id: str = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) @@ -57,7 +57,7 @@ def to_dict(self) -> dict: "agent_id": self.agent_id, "step_index": self.step_index, "conversation_id": self.conversation_id, - "attributes": self.attributes, + "attributes": self.context, }, ) return event diff --git a/wildedge/events/span.py b/wildedge/events/span.py index c84e233..9d5be3c 100644 --- a/wildedge/events/span.py +++ b/wildedge/events/span.py @@ -3,21 +3,34 @@ import uuid from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import Any +from typing import Any, Literal from wildedge.events.common import add_optional_fields +SpanKind = Literal[ + "agent_step", + "tool", + "retrieval", + "memory", + "router", + "guardrail", + "cache", + "eval", + "custom", +] +SpanStatus = Literal["ok", "error"] + @dataclass class SpanEvent: - kind: str + kind: SpanKind name: str duration_ms: int - status: str + status: SpanStatus model_id: str | None = None input_summary: str | None = None output_summary: str | None = None - span_attributes: dict[str, Any] | None = None + attributes: dict[str, Any] | None = None trace_id: str | None = None span_id: str | None = None parent_span_id: str | None = None @@ -25,7 +38,7 @@ class SpanEvent: agent_id: str | None = None step_index: int | None = None conversation_id: str | None = None - attributes: dict[str, Any] | None = None + context: dict[str, Any] | None = None event_id: str = field(default_factory=lambda: str(uuid.uuid4())) timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) @@ -40,19 +53,19 @@ def to_dict(self) -> dict: span_data["input_summary"] = self.input_summary if self.output_summary is not None: span_data["output_summary"] = self.output_summary - if self.span_attributes is not None: - span_data["attributes"] = self.span_attributes + if self.attributes is not None: + span_data["attributes"] = self.attributes event = { "event_id": self.event_id, "event_type": "span", "timestamp": self.timestamp.isoformat(), - "model_id": self.model_id, "span": span_data, } add_optional_fields( event, { + "model_id": self.model_id, "trace_id": self.trace_id, "span_id": self.span_id, "parent_span_id": self.parent_span_id, @@ -60,7 +73,7 @@ def to_dict(self) -> dict: "agent_id": self.agent_id, "step_index": self.step_index, "conversation_id": self.conversation_id, - "attributes": self.attributes, + "context": self.context, }, ) return event diff --git a/wildedge/model.py b/wildedge/model.py index 14f2c66..2502b14 100644 --- a/wildedge/model.py +++ b/wildedge/model.py @@ -28,7 +28,7 @@ from wildedge.logging import logger from wildedge.platforms import capture_hardware, is_sampling from wildedge.platforms.hardware import HardwareContext -from wildedge.tracing import merge_correlation_fields +from wildedge.tracing import _merge_correlation_fields @dataclass @@ -76,10 +76,10 @@ def track_load( agent_id: str | None = None, step_index: int | None = None, conversation_id: str | None = None, - attributes: dict[str, Any] | None = None, + context: dict[str, Any] | None = None, **kwargs: Any, ) -> None: - correlation = merge_correlation_fields( + correlation = _merge_correlation_fields( trace_id=trace_id, span_id=span_id, parent_span_id=parent_span_id, @@ -87,7 +87,7 @@ def track_load( agent_id=agent_id, step_index=step_index, conversation_id=conversation_id, - attributes=attributes, + context=context, ) event = ModelLoadEvent( model_id=self.model_id, @@ -116,9 +116,9 @@ def track_unload( agent_id: str | None = None, step_index: int | None = None, conversation_id: str | None = None, - attributes: dict[str, Any] | None = None, + context: dict[str, Any] | None = None, ) -> None: - correlation = merge_correlation_fields( + correlation = _merge_correlation_fields( trace_id=trace_id, span_id=span_id, parent_span_id=parent_span_id, @@ -126,7 +126,7 @@ def track_unload( agent_id=agent_id, step_index=step_index, conversation_id=conversation_id, - attributes=attributes, + context=context, ) event = ModelUnloadEvent( model_id=self.model_id, @@ -157,10 +157,10 @@ def track_download( agent_id: str | None = None, step_index: int | None = None, conversation_id: str | None = None, - attributes: dict[str, Any] | None = None, + context: dict[str, Any] | None = None, **kwargs: Any, ) -> None: - correlation = merge_correlation_fields( + correlation = _merge_correlation_fields( trace_id=trace_id, span_id=span_id, parent_span_id=parent_span_id, @@ -168,7 +168,7 @@ def track_download( agent_id=agent_id, step_index=step_index, conversation_id=conversation_id, - attributes=attributes, + context=context, ) event = ModelDownloadEvent( model_id=self.model_id, @@ -211,11 +211,11 @@ def track_inference( agent_id: str | None = None, step_index: int | None = None, conversation_id: str | None = None, - attributes: dict[str, Any] | None = None, + context: dict[str, Any] | None = None, ) -> str: if hardware is None and is_sampling(): hardware = capture_hardware() - correlation = merge_correlation_fields( + correlation = _merge_correlation_fields( trace_id=trace_id, span_id=span_id, parent_span_id=parent_span_id, @@ -223,7 +223,7 @@ def track_inference( agent_id=agent_id, step_index=step_index, conversation_id=conversation_id, - attributes=attributes, + context=context, ) event = InferenceEvent( model_id=self.model_id, @@ -258,9 +258,9 @@ def track_feedback( agent_id: str | None = None, step_index: int | None = None, conversation_id: str | None = None, - attributes: dict[str, Any] | None = None, + context: dict[str, Any] | None = None, ) -> None: - correlation = merge_correlation_fields( + correlation = _merge_correlation_fields( trace_id=trace_id, span_id=span_id, parent_span_id=parent_span_id, @@ -268,7 +268,7 @@ def track_feedback( agent_id=agent_id, step_index=step_index, conversation_id=conversation_id, - attributes=attributes, + context=context, ) event = FeedbackEvent( model_id=self.model_id, @@ -308,9 +308,9 @@ def track_error( agent_id: str | None = None, step_index: int | None = None, conversation_id: str | None = None, - attributes: dict[str, Any] | None = None, + context: dict[str, Any] | None = None, ) -> None: - correlation = merge_correlation_fields( + correlation = _merge_correlation_fields( trace_id=trace_id, span_id=span_id, parent_span_id=parent_span_id, @@ -318,7 +318,7 @@ def track_error( agent_id=agent_id, step_index=step_index, conversation_id=conversation_id, - attributes=attributes, + context=context, ) event = ErrorEvent( model_id=self.model_id, diff --git a/wildedge/tracing.py b/wildedge/tracing.py index 5398e27..a205543 100644 --- a/wildedge/tracing.py +++ b/wildedge/tracing.py @@ -40,19 +40,19 @@ def get_span_context() -> SpanContext | None: return _SPAN_CTX.get() -def set_trace_context(ctx: TraceContext) -> contextvars.Token: +def _set_trace_context(ctx: TraceContext) -> contextvars.Token: return _TRACE_CTX.set(ctx) -def reset_trace_context(token: contextvars.Token) -> None: +def _reset_trace_context(token: contextvars.Token) -> None: _TRACE_CTX.reset(token) -def set_span_context(ctx: SpanContext) -> contextvars.Token: +def _set_span_context(ctx: SpanContext) -> contextvars.Token: return _SPAN_CTX.set(ctx) -def reset_span_context(token: contextvars.Token) -> None: +def _reset_span_context(token: contextvars.Token) -> None: _SPAN_CTX.reset(token) @@ -67,7 +67,7 @@ def trace_context( ): if trace_id is None: trace_id = str(uuid.uuid4()) - token = set_trace_context( + token = _set_trace_context( TraceContext( trace_id=trace_id, run_id=run_id, @@ -79,7 +79,7 @@ def trace_context( try: yield get_trace_context() finally: - reset_trace_context(token) + _reset_trace_context(token) @contextlib.contextmanager @@ -90,12 +90,18 @@ def span_context( step_index: int | None = None, attributes: dict[str, Any] | None = None, ): + """Low-level context manager that sets span correlation fields without emitting a span event. + + Prefer client.span() for most use cases. Use this only when you need correlation + fields attached to auto-instrumented events (e.g. an OpenAI call) without emitting + a redundant span wrapper. + """ if span_id is None: span_id = str(uuid.uuid4()) if parent_span_id is None: current = get_span_context() parent_span_id = current.span_id if current else None - token = set_span_context( + token = _set_span_context( SpanContext( span_id=span_id, parent_span_id=parent_span_id, @@ -106,10 +112,10 @@ def span_context( try: yield get_span_context() finally: - reset_span_context(token) + _reset_span_context(token) -def _merge_attributes(*candidates: dict[str, Any] | None) -> dict[str, Any] | None: +def _merge_context(*candidates: dict[str, Any] | None) -> dict[str, Any] | None: merged: dict[str, Any] = {} for attrs in candidates: if not attrs: @@ -118,7 +124,7 @@ def _merge_attributes(*candidates: dict[str, Any] | None) -> dict[str, Any] | No return merged or None -def merge_correlation_fields( +def _merge_correlation_fields( *, trace_id: str | None = None, span_id: str | None = None, @@ -127,7 +133,7 @@ def merge_correlation_fields( agent_id: str | None = None, step_index: int | None = None, conversation_id: str | None = None, - attributes: dict[str, Any] | None = None, + context: dict[str, Any] | None = None, ) -> dict[str, Any]: trace = get_trace_context() span = get_span_context() @@ -143,10 +149,10 @@ def merge_correlation_fields( resolved_conversation_id = conversation_id or ( trace.conversation_id if trace else None ) - resolved_attributes = _merge_attributes( + resolved_context = _merge_context( trace.attributes if trace else None, span.attributes if span else None, - attributes, + context, ) return { @@ -157,5 +163,5 @@ def merge_correlation_fields( "agent_id": resolved_agent_id, "step_index": resolved_step_index, "conversation_id": resolved_conversation_id, - "attributes": resolved_attributes, + "context": resolved_context, } From c498be019961e1fb9b683de3eea275526b21388a Mon Sep 17 00:00:00 2001 From: Piotr Duda Date: Sun, 22 Mar 2026 01:44:03 +0100 Subject: [PATCH 3/3] Nicer agentic example --- examples/agentic_example.py | 45 ++++++++++++++++++++------ tests/test_tracing.py | 57 ++++++++++++++++++++++++++++++++- wildedge/client.py | 8 +++-- wildedge/integrations/openai.py | 12 +++++-- 4 files changed, 108 insertions(+), 14 deletions(-) diff --git a/examples/agentic_example.py b/examples/agentic_example.py index 6560b30..446c812 100644 --- a/examples/agentic_example.py +++ b/examples/agentic_example.py @@ -14,10 +14,13 @@ - Tracks LLM inference automatically via the OpenAI integration Run with: uv run agentic_example.py -Requires: OPENAI_API_KEY environment variable. Set WILDEDGE_DSN to send events. +Requires: OPENROUTER_API_KEY environment variable. Set WILDEDGE_DSN to send events. """ import json +import os +import time +import uuid from openai import OpenAI @@ -28,7 +31,10 @@ integrations="openai", ) -openai_client = OpenAI() +openai_client = OpenAI( + base_url="https://openrouter.ai/api/v1", + api_key=os.getenv("OPENROUTER_API_KEY"), +) # --- Tools ------------------------------------------------------------------- @@ -65,11 +71,14 @@ def get_weather(city: str) -> str: - # Stub: replace with a real weather API call. + # ~150ms to simulate a real weather API call. + time.sleep(0.15) return json.dumps({"city": city, "temperature_c": 18, "condition": "partly cloudy"}) def calculator(expression: str) -> str: + # ~60ms to simulate a remote computation call. + time.sleep(0.06) try: result = eval(expression, {"__builtins__": {}}) # noqa: S307 return json.dumps({"expression": expression, "result": result}) @@ -97,8 +106,23 @@ def call_tool(name: str, arguments: dict) -> str: return result +def retrieve_context(query: str) -> str: + """Fetch relevant context from the vector store (~120ms).""" + with we.span( + kind="retrieval", + name="vector_search", + input_summary=query[:200], + ) as span: + time.sleep(0.12) + result = f"[context: background knowledge relevant to '{query[:40]}']" + span.output_summary = result + return result + + def run_agent(task: str, step_index: int, messages: list) -> str: - messages.append({"role": "user", "content": task}) + # Fetch context before the first reasoning step, include it in the user turn. + context = retrieve_context(task) + messages.append({"role": "user", "content": f"{task}\n\nContext: {context}"}) while True: with we.span( @@ -108,15 +132,16 @@ def run_agent(task: str, step_index: int, messages: list) -> str: input_summary=task[:200], ) as span: response = openai_client.chat.completions.create( - model="gpt-4o", + model="qwen/qwen3.5-flash-02-23", messages=messages, tools=TOOLS, tool_choice="auto", + max_tokens=512, ) choice = response.choices[0] span.output_summary = choice.finish_reason - messages.append(choice.message) + messages.append(choice.message.model_dump(exclude_none=True)) if choice.finish_reason == "tool_calls": step_index += 1 @@ -130,8 +155,11 @@ def run_agent(task: str, step_index: int, messages: list) -> str: "content": result, } ) + # Not instrumented: context window update between tool calls (~80ms). + # Shows up as a gap stripe in the trace view. + time.sleep(0.08) else: - return choice.message.content + return choice.message.content or "" # --- Main -------------------------------------------------------------------- @@ -144,11 +172,10 @@ def run_agent(task: str, step_index: int, messages: list) -> str: system_prompt = "You are a helpful assistant. Use tools when needed." messages = [{"role": "system", "content": system_prompt}] -with we.trace(agent_id="demo-agent", run_id="example-run-001"): +with we.trace(agent_id="demo-agent", run_id=str(uuid.uuid4())): for i, task in enumerate(TASKS, start=1): print(f"\nTask {i}: {task}") reply = run_agent(task, step_index=i, messages=messages) print(f"Reply: {reply}") we.flush() -print("\nDone. Events flushed to WildEdge.") diff --git a/tests/test_tracing.py b/tests/test_tracing.py index fef4531..944f1e0 100644 --- a/tests/test_tracing.py +++ b/tests/test_tracing.py @@ -1,7 +1,8 @@ from __future__ import annotations +from wildedge.client import SpanContextManager from wildedge.model import ModelHandle, ModelInfo -from wildedge.tracing import span_context, trace_context +from wildedge.tracing import get_span_context, span_context, trace_context def test_track_inference_uses_trace_context(): @@ -36,3 +37,57 @@ def publish(event: dict) -> None: assert events[0]["agent_id"] == "agent-1" assert events[0]["step_index"] == 2 assert events[0]["attributes"] == {"trace_key": "trace_val", "span_key": 2} + + +class _FakeClient: + def __init__(self, events: list[dict]) -> None: + self._events = events + + def track_span(self, **kwargs) -> str: + self._events.append(kwargs) + return kwargs.get("span_id", "") + + +def test_span_root_has_no_parent(): + """A root span must not reference itself as its own parent.""" + events: list[dict] = [] + client = _FakeClient(events) + + with SpanContextManager(client, kind="agent_step", name="root"): + pass + + assert len(events) == 1 + assert events[0]["parent_span_id"] is None + + +def test_span_context_restored_after_exit(): + """The active span context must revert to the parent after a span exits.""" + events: list[dict] = [] + client = _FakeClient(events) + + with span_context(span_id="parent-span"): + with SpanContextManager(client, kind="agent_step", name="child"): + inner_id = get_span_context().span_id + + assert get_span_context().span_id == "parent-span" + + assert inner_id != "parent-span" + assert events[0]["parent_span_id"] == "parent-span" + assert events[0]["span_id"] != "parent-span" + + +def test_nested_spans_correct_parent_chain(): + """Nested spans must each point to their direct parent, not themselves.""" + events: list[dict] = [] + client = _FakeClient(events) + + with SpanContextManager(client, kind="agent_step", name="outer") as outer: + with SpanContextManager(client, kind="tool", name="inner") as inner: + pass + + assert len(events) == 2 + inner_ev, outer_ev = events[0], events[1] + assert inner_ev["span_id"] == inner.span_id + assert inner_ev["parent_span_id"] == outer.span_id + assert outer_ev["span_id"] == outer.span_id + assert outer_ev["parent_span_id"] is None diff --git a/wildedge/client.py b/wildedge/client.py index e3a9aab..627cbc1 100644 --- a/wildedge/client.py +++ b/wildedge/client.py @@ -175,6 +175,12 @@ def __exit__(self, exc_type, exc_val, exc_tb): return False duration_ms = elapsed_ms(self._t0) status = "error" if exc_type else self.status + # Restore parent span context before emitting, so _merge_correlation_fields + # sees the parent context rather than this span (which would make the span + # appear as its own parent). + if self._span_token is not None: + _reset_span_context(self._span_token) + self._span_token = None self._client.track_span( kind=self.kind, name=self.name, @@ -193,8 +199,6 @@ def __exit__(self, exc_type, exc_val, exc_tb): conversation_id=self.conversation_id, context=self.context, ) - if self._span_token is not None: - _reset_span_context(self._span_token) return False async def __aenter__(self): diff --git a/wildedge/integrations/openai.py b/wildedge/integrations/openai.py index 31281e9..700aace 100644 --- a/wildedge/integrations/openai.py +++ b/wildedge/integrations/openai.py @@ -45,13 +45,21 @@ def source_from_base_url(base_url: str | None) -> str: return SOURCE_BY_HOSTNAME.get(hostname or "", hostname or "openai") +def _msg_role(m) -> str | None: + return m.get("role") if isinstance(m, dict) else getattr(m, "role", None) + + +def _msg_content(m) -> str | None: + return m.get("content") if isinstance(m, dict) else getattr(m, "content", None) + + def build_input_meta(messages: list, tokens_in: int | None) -> TextInputMeta | None: if not messages: return None - last_user = next((m for m in reversed(messages) if m.get("role") == "user"), None) + last_user = next((m for m in reversed(messages) if _msg_role(m) == "user"), None) if not last_user: return None - content = last_user.get("content", "") + content = _msg_content(last_user) or "" if not isinstance(content, str) or not content: return None return TextInputMeta(