diff --git a/examples/otlp-export/README.md b/examples/otlp-export/README.md new file mode 100644 index 0000000..0a23113 --- /dev/null +++ b/examples/otlp-export/README.md @@ -0,0 +1,115 @@ +# BMasterAI — Native OTLP Export + +Send all bmasterai agent traces and metrics to any OpenTelemetry-compatible backend. + +## Supported backends + +| Backend | Transport | Notes | +|---|---|---| +| **Grafana Tempo** | gRPC or HTTP | Pairs with Grafana dashboards | +| **Jaeger** | gRPC | Open source distributed tracing | +| **Honeycomb** | HTTP | SaaS, needs API key header | +| **Datadog** | gRPC/HTTP | Via Datadog Agent OTLP ingestion | +| **New Relic** | HTTP | Needs license key header | +| **Prometheus** | HTTP | Via OTLP bridge receiver | +| **Local collector** | gRPC | `otel-collector` → any backend | + +## Install + +```bash +# gRPC (default, recommended) +pip install "bmasterai[otlp]" + +# HTTP/protobuf (Honeycomb, New Relic, some SaaS) +pip install "bmasterai[otlp-http]" +``` + +## Usage + +```python +from bmasterai import configure_otlp, get_monitor, configure_logging + +# 1. Configure OTLP first (before any monitor calls) +configure_otlp( + endpoint="http://localhost:4317", # your collector endpoint + service_name="my-ai-agent", +) + +# 2. Use bmasterai normally — OTLP export happens automatically +configure_logging() +monitor = get_monitor() +monitor.start_monitoring() + +monitor.track_agent_start("researcher") +monitor.track_llm_call("researcher", "claude-3-5-sonnet", tokens_used=1200, duration_ms=1840) +monitor.track_task_duration("researcher", "web_search", 620) +monitor.track_agent_stop("researcher") +``` + +## What gets exported + +### Traces (spans) +| Span | Trigger | Attributes | +|---|---|---| +| `agent.` | `track_agent_start` / `track_agent_stop` | `bmasterai.agent_id`, `bmasterai.runtime_seconds` | +| `llm.call` | `track_llm_call` | `bmasterai.model`, `bmasterai.tokens_used`, `bmasterai.duration_ms`, `bmasterai.reasoning_steps` | +| `task.` | `track_task_duration` | `bmasterai.agent_id`, `bmasterai.task_name`, `bmasterai.duration_ms` | + +### Metrics +| Metric | Type | Labels | +|---|---|---| +| `bmasterai.llm.tokens_used` | Counter | `agent_id`, `model` | +| `bmasterai.llm.call_duration` | Histogram | `agent_id`, `model` | +| `bmasterai.task.duration` | Histogram | `agent_id`, `task_name` | +| `bmasterai.agent.errors` | Counter | `agent_id`, `error_type` | +| `bmasterai.custom.metric` | Counter | `metric_name` + any custom labels | + +## Examples + +### Local Jaeger (docker) +```bash +docker run -d --name jaeger \ + -p 4317:4317 -p 16686:16686 \ + jaegertracing/all-in-one:latest +``` +```python +configure_otlp(endpoint="http://localhost:4317", service_name="my-agent") +``` +Open http://localhost:16686 to see traces. + +### Grafana Cloud +```python +configure_otlp( + endpoint="https://otlp-gateway-prod-us-central-0.grafana.net/otlp", + use_http=True, + headers={ + "Authorization": "Basic ", + }, + service_name="my-agent", + insecure=False, +) +``` + +### Honeycomb +```python +configure_otlp( + endpoint="https://api.honeycomb.io", + use_http=True, + headers={"x-honeycomb-team": "YOUR_API_KEY"}, + service_name="my-agent", +) +``` + +### Datadog +```python +configure_otlp( + endpoint="http://localhost:4317", # Datadog Agent running locally + service_name="my-agent", +) +``` + +## Run the example +```bash +pip install "bmasterai[otlp]" +python agent_with_otlp.py +``` diff --git a/examples/otlp-export/agent_with_otlp.py b/examples/otlp-export/agent_with_otlp.py new file mode 100644 index 0000000..9ef06b1 --- /dev/null +++ b/examples/otlp-export/agent_with_otlp.py @@ -0,0 +1,142 @@ +""" +BMasterAI — Native OTLP Export Example + +Demonstrates automatic trace + metric export to any OTel-compatible backend. +All you need is configure_otlp() before your first monitor call. + +Run with a local Jaeger instance: + docker run -d --name jaeger -p 4317:4317 -p 16686:16686 jaegertracing/all-in-one:latest + python agent_with_otlp.py + open http://localhost:16686 +""" + +import time +import os +from bmasterai import configure_otlp, get_monitor, configure_logging, LogLevel + +# ── 1. Configure OTLP ───────────────────────────────────────────────────────── +# All subsequent monitor calls will automatically emit spans + metrics. +otlp_endpoint = os.getenv("OTLP_ENDPOINT", "http://localhost:4317") +otlp_headers = {} +if os.getenv("OTLP_API_KEY"): + otlp_headers["x-api-key"] = os.getenv("OTLP_API_KEY") + +success = configure_otlp( + endpoint=otlp_endpoint, + service_name="bmasterai-example", + service_version="0.2.3", + headers=otlp_headers, +) + +if success: + print(f"✅ OTLP configured → {otlp_endpoint}") +else: + print("⚠️ OTLP not available (opentelemetry-sdk not installed). Continuing without export.") + print(" Install with: pip install 'bmasterai[otlp]'") + +# ── 2. Normal bmasterai setup ───────────────────────────────────────────────── +configure_logging(log_level=LogLevel.INFO) +monitor = get_monitor() +monitor.start_monitoring() + + +# ── 3. Simulate a multi-step research agent ─────────────────────────────────── +def run_research_agent(): + agent_id = "research-agent-01" + print(f"\n🤖 Starting agent: {agent_id}") + + monitor.track_agent_start(agent_id) + # → OTLP: opens root span "agent.research-agent-01" + + # Step 1: Web search task + print(" 🔎 Running web search...") + time.sleep(0.3) + monitor.track_task_duration(agent_id, "web_search", 312) + # → OTLP: child span "task.web_search" with duration_ms=312 + + # Step 2: LLM analysis call + print(" 🧠 Calling LLM for analysis...") + time.sleep(0.5) + monitor.track_llm_call( + agent_id=agent_id, + model="claude-3-5-sonnet-20241022", + tokens_used=1847, + duration_ms=1203, + reasoning_steps=4, + ) + # → OTLP: child span "llm.call" + increments bmasterai.llm.tokens_used counter + + # Step 3: Synthesis task + print(" 📝 Synthesizing results...") + time.sleep(0.2) + monitor.track_task_duration(agent_id, "synthesize", 198) + # → OTLP: child span "task.synthesize" + + # Step 4: Simulate an error + print(" ⚠️ Simulating transient error...") + monitor.track_error(agent_id, "rate_limit") + # → OTLP: increments bmasterai.agent.errors counter, adds event to root span + + # Retry + print(" 🔁 Retrying after backoff...") + time.sleep(0.1) + monitor.track_llm_call( + agent_id=agent_id, + model="claude-3-5-haiku-20241022", + tokens_used=924, + duration_ms=620, + ) + + # Custom metric + monitor.record_custom_metric("documents_processed", 12, {"agent_id": agent_id}) + # → OTLP: increments bmasterai.custom.metric{metric_name=documents_processed} + + monitor.track_agent_stop(agent_id) + # → OTLP: closes root span with runtime_seconds + + print(f" ✅ Agent {agent_id} complete\n") + + +def run_pipeline(): + """Run multiple agents to show multi-agent tracing.""" + agents = [ + ("planner-agent", "claude-3-5-haiku-20241022", 450, 380), + ("research-agent", "claude-3-5-sonnet-20241022", 2100, 1840), + ("writer-agent", "claude-3-5-sonnet-20241022", 3200, 2100), + ] + + print("\n🏭 Running multi-agent pipeline...") + for agent_id, model, tokens, latency in agents: + monitor.track_agent_start(agent_id) + time.sleep(0.1) + monitor.track_llm_call(agent_id, model, tokens, latency) + monitor.track_task_duration(agent_id, "main_task", latency * 1.2) + monitor.track_agent_stop(agent_id) + print(f" ✅ {agent_id} ({model}) — {tokens} tokens in {latency}ms") + + +if __name__ == "__main__": + run_research_agent() + run_pipeline() + + # Dashboard check + dashboard = monitor.get_agent_dashboard("research-agent-01") + health = monitor.get_system_health() + + print("=" * 50) + print("📊 BMasterAI Dashboard (research-agent-01)") + print(f" Status: {dashboard['status']}") + print(f" Tasks: {list(dashboard['performance'].keys())}") + print(f" Errors: {dashboard['metrics'].get('total_errors', 0)}") + print() + print(f"🖥️ System: CPU {health['system_metrics']['cpu'].get('latest', 0):.1f}% | " + f"Mem {health['system_metrics']['memory'].get('latest', 0):.1f}%") + print(f"🤖 Agents: {health['active_agents']} active / {health['total_agents']} total") + + if success: + print(f"\n🔭 View traces at your OTel backend") + print(f" (Jaeger: http://localhost:16686 → search 'bmasterai-example')") + + # Give batch exporter time to flush + time.sleep(2) + monitor.stop_monitoring() diff --git a/pyproject.toml b/pyproject.toml index 31ed0ee..292663c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,8 +47,20 @@ integrations = [ "redis>=4.0.0", "celery>=5.2.0", ] +otlp = [ + "opentelemetry-api>=1.20.0", + "opentelemetry-sdk>=1.20.0", + "opentelemetry-exporter-otlp-proto-grpc>=1.20.0", + "opentelemetry-semantic-conventions>=0.41b0", +] +otlp-http = [ + "opentelemetry-api>=1.20.0", + "opentelemetry-sdk>=1.20.0", + "opentelemetry-exporter-otlp-proto-http>=1.20.0", + "opentelemetry-semantic-conventions>=0.41b0", +] all = [ - "bmasterai[dev,integrations]" + "bmasterai[dev,integrations,otlp]" ] [project.urls] diff --git a/src/bmasterai/__init__.py b/src/bmasterai/__init__.py index ee900c3..d60c943 100644 --- a/src/bmasterai/__init__.py +++ b/src/bmasterai/__init__.py @@ -16,6 +16,7 @@ from .reasoning_logger import ( ReasoningSession, ChainOfThought, with_reasoning_logging, log_reasoning ) +from .otlp import configure_otlp __all__ = [ "configure_logging", @@ -28,5 +29,6 @@ "ReasoningSession", "ChainOfThought", "with_reasoning_logging", - "log_reasoning" + "log_reasoning", + "configure_otlp", ] \ No newline at end of file diff --git a/src/bmasterai/monitoring.py b/src/bmasterai/monitoring.py index b87f20c..e7d8164 100644 --- a/src/bmasterai/monitoring.py +++ b/src/bmasterai/monitoring.py @@ -9,6 +9,12 @@ from collections import defaultdict, deque import statistics +# Optional OTLP export — no-op if not configured or opentelemetry-sdk not installed +try: + from bmasterai import otlp as _otlp +except ImportError: # pragma: no cover + _otlp = None # type: ignore + @dataclass class MetricPoint: timestamp: datetime @@ -231,10 +237,13 @@ def track_agent_start(self, agent_id: str): self.agent_metrics[agent_id]['start_time'] = datetime.now(timezone.utc) self.agent_metrics[agent_id]['status'] = 'running' self.metrics_collector.record_custom_metric('agents_active', 1, {'agent_id': agent_id}) + if _otlp: + _otlp.on_agent_start(agent_id) def track_agent_stop(self, agent_id: str): if agent_id in self.agent_metrics: start_time = self.agent_metrics[agent_id].get('start_time') + runtime = None if start_time: runtime = (datetime.now(timezone.utc) - start_time).total_seconds() self.agent_metrics[agent_id]['total_runtime'] = runtime @@ -242,6 +251,8 @@ def track_agent_stop(self, agent_id: str): self.agent_metrics[agent_id]['status'] = 'stopped' self.agent_metrics[agent_id]['stop_time'] = datetime.now(timezone.utc) + if _otlp: + _otlp.on_agent_stop(agent_id, runtime_seconds=runtime) def track_task_duration(self, agent_id: str, task_name: str, duration_ms: float): self.task_timings[f"{agent_id}:{task_name}"].append(duration_ms) @@ -250,6 +261,8 @@ def track_task_duration(self, agent_id: str, task_name: str, duration_ms: float) duration_ms, {'agent_id': agent_id, 'task_name': task_name} ) + if _otlp: + _otlp.on_task_duration(agent_id, task_name, duration_ms) def track_error(self, agent_id: str, error_type: str = 'general'): key = f"{agent_id}:{error_type}" @@ -259,6 +272,8 @@ def track_error(self, agent_id: str, error_type: str = 'general'): 1, {'agent_id': agent_id, 'error_type': error_type} ) + if _otlp: + _otlp.on_error(agent_id, error_type) def track_llm_call(self, agent_id: str, model: str, tokens_used: int, duration_ms: float, reasoning_steps: Optional[int] = None, thinking_depth: Optional[int] = None): @@ -287,6 +302,13 @@ def track_llm_call(self, agent_id: str, model: str, tokens_used: int, duration_m thinking_depth, {'agent_id': agent_id, 'model': model} ) + + if _otlp: + _otlp.on_llm_call( + agent_id, model, tokens_used, duration_ms, + reasoning_steps=reasoning_steps, + thinking_depth=thinking_depth, + ) def track_reasoning_session(self, agent_id: str, session_id: str, total_steps: int, duration_ms: float, @@ -353,6 +375,8 @@ def get_system_health(self) -> Dict[str, Any]: def record_custom_metric(self, name: str, value: float, labels: Optional[Dict[str, str]] = None): """Record a custom metric""" self.metrics_collector.record_custom_metric(name, value, labels) + if _otlp: + _otlp.on_custom_metric(name, value, labels) def get_metric_stats(self, metric_name: str, duration_minutes: int = 60) -> Dict[str, float]: """Get statistics for a specific metric""" diff --git a/src/bmasterai/otlp.py b/src/bmasterai/otlp.py new file mode 100644 index 0000000..9ebae5a --- /dev/null +++ b/src/bmasterai/otlp.py @@ -0,0 +1,323 @@ +""" +BMasterAI — Native OTLP (OpenTelemetry Protocol) Export Layer + +Wraps bmasterai agent events into OTel traces and metrics, then exports via OTLP +to any compatible backend: Grafana Tempo, Jaeger, Honeycomb, Datadog, New Relic, +Prometheus (via OTLP bridge), or a local collector. + +Usage: + from bmasterai.otlp import configure_otlp + + # gRPC (default OTLP) + configure_otlp(endpoint="http://localhost:4317") + + # HTTP/protobuf + configure_otlp(endpoint="http://localhost:4318", use_http=True) + + # With auth headers (Honeycomb, Grafana Cloud, etc.) + configure_otlp( + endpoint="https://api.honeycomb.io", + headers={"x-honeycomb-team": "YOUR_API_KEY"}, + service_name="my-ai-agent", + ) + +After configure_otlp() is called, all bmasterai monitor calls automatically emit spans: + - agent_start / agent_stop → root span per agent lifecycle + - track_llm_call → child span with token/latency attributes + - track_task_duration → child span per task + - track_error → span event with error details + - record_custom_metric → OTel counter/histogram via metrics API + +Requires: + pip install opentelemetry-sdk opentelemetry-exporter-otlp-proto-grpc + # or for HTTP: + pip install opentelemetry-sdk opentelemetry-exporter-otlp-proto-http +""" + +from __future__ import annotations + +import logging +from typing import Dict, Optional, Any + +logger = logging.getLogger(__name__) + +# ── Availability check ───────────────────────────────────────────────────────── + +try: + from opentelemetry import trace, metrics as otel_metrics + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchSpanProcessor + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + from opentelemetry.sdk.resources import Resource + from opentelemetry.semconv.resource import ResourceAttributes + _OTEL_AVAILABLE = True +except ImportError: + _OTEL_AVAILABLE = False + +_otlp_configured = False +_tracer: Optional[Any] = None +_meter: Optional[Any] = None + +# Active spans keyed by agent_id for lifecycle tracking +_agent_spans: Dict[str, Any] = {} + +# Metric instruments (created once) +_instruments: Dict[str, Any] = {} + + +def is_available() -> bool: + """Return True if opentelemetry-sdk is installed.""" + return _OTEL_AVAILABLE + + +def configure_otlp( + endpoint: str = "http://localhost:4317", + service_name: str = "bmasterai", + service_version: str = "0.2.3", + use_http: bool = False, + headers: Optional[Dict[str, str]] = None, + export_interval_ms: int = 5000, + insecure: bool = True, +) -> bool: + """ + Configure native OTLP export for all bmasterai monitor calls. + + Parameters + ---------- + endpoint : str + OTLP collector endpoint. + gRPC default: ``http://localhost:4317`` + HTTP default: ``http://localhost:4318`` + service_name : str + ``service.name`` resource attribute sent with every span/metric. + service_version : str + ``service.version`` resource attribute. + use_http : bool + Use HTTP/protobuf transport instead of gRPC. + Requires ``opentelemetry-exporter-otlp-proto-http``. + headers : dict, optional + Extra headers for authenticated SaaS backends (Honeycomb, Grafana Cloud, etc.). + export_interval_ms : int + How often to flush metrics (milliseconds). Default 5 000. + insecure : bool + Allow plaintext gRPC (no TLS). Set False for production TLS endpoints. + + Returns + ------- + bool + True if configuration succeeded, False if opentelemetry-sdk is missing. + """ + global _otlp_configured, _tracer, _meter, _instruments + + if not _OTEL_AVAILABLE: + logger.warning( + "opentelemetry-sdk not installed. " + "Run: pip install opentelemetry-sdk opentelemetry-exporter-otlp-proto-grpc" + ) + return False + + resource = Resource.create({ + ResourceAttributes.SERVICE_NAME: service_name, + ResourceAttributes.SERVICE_VERSION: service_version, + "bmasterai.version": service_version, + }) + + # ── Tracer provider ──────────────────────────────────────────────────────── + if use_http: + try: + from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + except ImportError: + logger.error( + "HTTP exporter not available. " + "Run: pip install opentelemetry-exporter-otlp-proto-http" + ) + return False + span_exporter = OTLPSpanExporter( + endpoint=f"{endpoint.rstrip('/')}/v1/traces", + headers=headers or {}, + ) + else: + try: + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter + except ImportError: + logger.error( + "gRPC exporter not available. " + "Run: pip install opentelemetry-exporter-otlp-proto-grpc" + ) + return False + span_exporter = OTLPSpanExporter( + endpoint=endpoint, + headers=headers or {}, + insecure=insecure, + ) + + tracer_provider = TracerProvider(resource=resource) + tracer_provider.add_span_processor(BatchSpanProcessor(span_exporter)) + trace.set_tracer_provider(tracer_provider) + _tracer = trace.get_tracer("bmasterai", service_version) + + # ── Meter provider ───────────────────────────────────────────────────────── + try: + if use_http: + from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter + metric_exporter = OTLPMetricExporter( + endpoint=f"{endpoint.rstrip('/')}/v1/metrics", + headers=headers or {}, + ) + else: + from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter + metric_exporter = OTLPMetricExporter( + endpoint=endpoint, + headers=headers or {}, + insecure=insecure, + ) + + reader = PeriodicExportingMetricReader( + metric_exporter, + export_interval_millis=export_interval_ms, + ) + meter_provider = MeterProvider(resource=resource, metric_readers=[reader]) + otel_metrics.set_meter_provider(meter_provider) + _meter = otel_metrics.get_meter("bmasterai", service_version) + + # Pre-create instruments + _instruments["llm_tokens"] = _meter.create_counter( + "bmasterai.llm.tokens_used", + unit="tokens", + description="Total LLM tokens consumed per agent/model", + ) + _instruments["llm_duration"] = _meter.create_histogram( + "bmasterai.llm.call_duration", + unit="ms", + description="LLM call latency in milliseconds", + ) + _instruments["task_duration"] = _meter.create_histogram( + "bmasterai.task.duration", + unit="ms", + description="Agent task execution duration in milliseconds", + ) + _instruments["agent_errors"] = _meter.create_counter( + "bmasterai.agent.errors", + unit="1", + description="Total agent errors by type", + ) + _instruments["custom_counter"] = _meter.create_counter( + "bmasterai.custom.metric", + description="Custom metrics emitted via record_custom_metric", + ) + except Exception as exc: + logger.warning("Metrics OTLP setup failed (traces still active): %s", exc) + _meter = None + + _otlp_configured = True + logger.info("BMasterAI OTLP configured → %s (transport=%s)", endpoint, "http" if use_http else "grpc") + return True + + +# ── Instrumentation hooks (called by AgentMonitor) ──────────────────────────── + +def on_agent_start(agent_id: str, attributes: Optional[Dict[str, Any]] = None): + """Open a root span for an agent lifecycle. Called by AgentMonitor.track_agent_start.""" + if not _otlp_configured or _tracer is None: + return + span = _tracer.start_span( + f"agent.{agent_id}", + attributes={ + "bmasterai.agent_id": agent_id, + **(attributes or {}), + }, + ) + _agent_spans[agent_id] = span + + +def on_agent_stop(agent_id: str, runtime_seconds: Optional[float] = None): + """Close the root span for an agent. Called by AgentMonitor.track_agent_stop.""" + if not _otlp_configured: + return + span = _agent_spans.pop(agent_id, None) + if span is not None: + if runtime_seconds is not None: + span.set_attribute("bmasterai.runtime_seconds", runtime_seconds) + span.end() + + +def on_llm_call( + agent_id: str, + model: str, + tokens_used: int, + duration_ms: float, + reasoning_steps: Optional[int] = None, + thinking_depth: Optional[int] = None, +): + """Emit a child span + metrics for an LLM call.""" + if not _otlp_configured: + return + + attrs = { + "bmasterai.agent_id": agent_id, + "bmasterai.model": model, + "bmasterai.tokens_used": tokens_used, + "bmasterai.duration_ms": duration_ms, + } + if reasoning_steps is not None: + attrs["bmasterai.reasoning_steps"] = reasoning_steps + if thinking_depth is not None: + attrs["bmasterai.thinking_depth"] = thinking_depth + + if _tracer: + parent_span = _agent_spans.get(agent_id) + ctx = trace.set_span_in_context(parent_span) if parent_span else None + with _tracer.start_as_current_span("llm.call", context=ctx, attributes=attrs): + pass # span closes immediately — duration captured as attribute + + if _meter and _instruments: + label = {"agent_id": agent_id, "model": model} + _instruments["llm_tokens"].add(tokens_used, label) + _instruments["llm_duration"].record(duration_ms, label) + + +def on_task_duration(agent_id: str, task_name: str, duration_ms: float): + """Emit a child span + histogram for a task.""" + if not _otlp_configured: + return + + attrs = { + "bmasterai.agent_id": agent_id, + "bmasterai.task_name": task_name, + "bmasterai.duration_ms": duration_ms, + } + + if _tracer: + parent_span = _agent_spans.get(agent_id) + ctx = trace.set_span_in_context(parent_span) if parent_span else None + with _tracer.start_as_current_span(f"task.{task_name}", context=ctx, attributes=attrs): + pass + + if _meter and _instruments: + _instruments["task_duration"].record( + duration_ms, {"agent_id": agent_id, "task_name": task_name} + ) + + +def on_error(agent_id: str, error_type: str): + """Increment error counter and add event to active agent span.""" + if not _otlp_configured: + return + + if _meter and _instruments: + _instruments["agent_errors"].add(1, {"agent_id": agent_id, "error_type": error_type}) + + span = _agent_spans.get(agent_id) + if span: + span.add_event("error", {"bmasterai.error_type": error_type}) + + +def on_custom_metric(name: str, value: float, labels: Optional[Dict[str, str]] = None): + """Forward a custom metric to OTel as a counter increment.""" + if not _otlp_configured or _meter is None or not _instruments: + return + try: + _instruments["custom_counter"].add(value, {**(labels or {}), "metric_name": name}) + except Exception: + pass