Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions examples/otlp-export/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# BMasterAI — Native OTLP Export

Send all bmasterai agent traces and metrics to any OpenTelemetry-compatible backend.

## Supported backends

| Backend | Transport | Notes |
|---|---|---|
| **Grafana Tempo** | gRPC or HTTP | Pairs with Grafana dashboards |
| **Jaeger** | gRPC | Open source distributed tracing |
| **Honeycomb** | HTTP | SaaS, needs API key header |
| **Datadog** | gRPC/HTTP | Via Datadog Agent OTLP ingestion |
| **New Relic** | HTTP | Needs license key header |
| **Prometheus** | HTTP | Via OTLP bridge receiver |
| **Local collector** | gRPC | `otel-collector` → any backend |

## Install

```bash
# gRPC (default, recommended)
pip install "bmasterai[otlp]"

# HTTP/protobuf (Honeycomb, New Relic, some SaaS)
pip install "bmasterai[otlp-http]"
```

## Usage

```python
from bmasterai import configure_otlp, get_monitor, configure_logging

# 1. Configure OTLP first (before any monitor calls)
configure_otlp(
endpoint="http://localhost:4317", # your collector endpoint
service_name="my-ai-agent",
)

# 2. Use bmasterai normally — OTLP export happens automatically
configure_logging()
monitor = get_monitor()
monitor.start_monitoring()

monitor.track_agent_start("researcher")
monitor.track_llm_call("researcher", "claude-3-5-sonnet", tokens_used=1200, duration_ms=1840)
monitor.track_task_duration("researcher", "web_search", 620)
monitor.track_agent_stop("researcher")
```

## What gets exported

### Traces (spans)
| Span | Trigger | Attributes |
|---|---|---|
| `agent.<agent_id>` | `track_agent_start` / `track_agent_stop` | `bmasterai.agent_id`, `bmasterai.runtime_seconds` |
| `llm.call` | `track_llm_call` | `bmasterai.model`, `bmasterai.tokens_used`, `bmasterai.duration_ms`, `bmasterai.reasoning_steps` |
| `task.<task_name>` | `track_task_duration` | `bmasterai.agent_id`, `bmasterai.task_name`, `bmasterai.duration_ms` |

### Metrics
| Metric | Type | Labels |
|---|---|---|
| `bmasterai.llm.tokens_used` | Counter | `agent_id`, `model` |
| `bmasterai.llm.call_duration` | Histogram | `agent_id`, `model` |
| `bmasterai.task.duration` | Histogram | `agent_id`, `task_name` |
| `bmasterai.agent.errors` | Counter | `agent_id`, `error_type` |
| `bmasterai.custom.metric` | Counter | `metric_name` + any custom labels |

## Examples

### Local Jaeger (docker)
```bash
docker run -d --name jaeger \
-p 4317:4317 -p 16686:16686 \
jaegertracing/all-in-one:latest
```
```python
configure_otlp(endpoint="http://localhost:4317", service_name="my-agent")
```
Open http://localhost:16686 to see traces.

### Grafana Cloud
```python
configure_otlp(
endpoint="https://otlp-gateway-prod-us-central-0.grafana.net/otlp",
use_http=True,
headers={
"Authorization": "Basic <base64(instanceID:apikey)>",
},
service_name="my-agent",
insecure=False,
)
```

### Honeycomb
```python
configure_otlp(
endpoint="https://api.honeycomb.io",
use_http=True,
headers={"x-honeycomb-team": "YOUR_API_KEY"},
service_name="my-agent",
)
```

### Datadog
```python
configure_otlp(
endpoint="http://localhost:4317", # Datadog Agent running locally
service_name="my-agent",
)
```

## Run the example
```bash
pip install "bmasterai[otlp]"
python agent_with_otlp.py
```
142 changes: 142 additions & 0 deletions examples/otlp-export/agent_with_otlp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
"""
BMasterAI — Native OTLP Export Example

Demonstrates automatic trace + metric export to any OTel-compatible backend.
All you need is configure_otlp() before your first monitor call.

Run with a local Jaeger instance:
docker run -d --name jaeger -p 4317:4317 -p 16686:16686 jaegertracing/all-in-one:latest
python agent_with_otlp.py
open http://localhost:16686
"""

import time
import os
from bmasterai import configure_otlp, get_monitor, configure_logging, LogLevel

# ── 1. Configure OTLP ─────────────────────────────────────────────────────────
# All subsequent monitor calls will automatically emit spans + metrics.
otlp_endpoint = os.getenv("OTLP_ENDPOINT", "http://localhost:4317")
otlp_headers = {}
if os.getenv("OTLP_API_KEY"):
otlp_headers["x-api-key"] = os.getenv("OTLP_API_KEY")

success = configure_otlp(
endpoint=otlp_endpoint,
service_name="bmasterai-example",
service_version="0.2.3",
headers=otlp_headers,
)

if success:
print(f"✅ OTLP configured → {otlp_endpoint}")
else:
print("⚠️ OTLP not available (opentelemetry-sdk not installed). Continuing without export.")
print(" Install with: pip install 'bmasterai[otlp]'")

# ── 2. Normal bmasterai setup ─────────────────────────────────────────────────
configure_logging(log_level=LogLevel.INFO)
monitor = get_monitor()
monitor.start_monitoring()


# ── 3. Simulate a multi-step research agent ───────────────────────────────────
def run_research_agent():
agent_id = "research-agent-01"
print(f"\n🤖 Starting agent: {agent_id}")

monitor.track_agent_start(agent_id)
# → OTLP: opens root span "agent.research-agent-01"

# Step 1: Web search task
print(" 🔎 Running web search...")
time.sleep(0.3)
monitor.track_task_duration(agent_id, "web_search", 312)
# → OTLP: child span "task.web_search" with duration_ms=312

# Step 2: LLM analysis call
print(" 🧠 Calling LLM for analysis...")
time.sleep(0.5)
monitor.track_llm_call(
agent_id=agent_id,
model="claude-3-5-sonnet-20241022",
tokens_used=1847,
duration_ms=1203,
reasoning_steps=4,
)
# → OTLP: child span "llm.call" + increments bmasterai.llm.tokens_used counter

# Step 3: Synthesis task
print(" 📝 Synthesizing results...")
time.sleep(0.2)
monitor.track_task_duration(agent_id, "synthesize", 198)
# → OTLP: child span "task.synthesize"

# Step 4: Simulate an error
print(" ⚠️ Simulating transient error...")
monitor.track_error(agent_id, "rate_limit")
# → OTLP: increments bmasterai.agent.errors counter, adds event to root span

# Retry
print(" 🔁 Retrying after backoff...")
time.sleep(0.1)
monitor.track_llm_call(
agent_id=agent_id,
model="claude-3-5-haiku-20241022",
tokens_used=924,
duration_ms=620,
)

# Custom metric
monitor.record_custom_metric("documents_processed", 12, {"agent_id": agent_id})
# → OTLP: increments bmasterai.custom.metric{metric_name=documents_processed}

monitor.track_agent_stop(agent_id)
# → OTLP: closes root span with runtime_seconds

print(f" ✅ Agent {agent_id} complete\n")


def run_pipeline():
"""Run multiple agents to show multi-agent tracing."""
agents = [
("planner-agent", "claude-3-5-haiku-20241022", 450, 380),
("research-agent", "claude-3-5-sonnet-20241022", 2100, 1840),
("writer-agent", "claude-3-5-sonnet-20241022", 3200, 2100),
]

print("\n🏭 Running multi-agent pipeline...")
for agent_id, model, tokens, latency in agents:
monitor.track_agent_start(agent_id)
time.sleep(0.1)
monitor.track_llm_call(agent_id, model, tokens, latency)
monitor.track_task_duration(agent_id, "main_task", latency * 1.2)
monitor.track_agent_stop(agent_id)
print(f" ✅ {agent_id} ({model}) — {tokens} tokens in {latency}ms")


if __name__ == "__main__":
run_research_agent()
run_pipeline()

# Dashboard check
dashboard = monitor.get_agent_dashboard("research-agent-01")
health = monitor.get_system_health()

print("=" * 50)
print("📊 BMasterAI Dashboard (research-agent-01)")
print(f" Status: {dashboard['status']}")
print(f" Tasks: {list(dashboard['performance'].keys())}")
print(f" Errors: {dashboard['metrics'].get('total_errors', 0)}")
print()
print(f"🖥️ System: CPU {health['system_metrics']['cpu'].get('latest', 0):.1f}% | "
f"Mem {health['system_metrics']['memory'].get('latest', 0):.1f}%")
print(f"🤖 Agents: {health['active_agents']} active / {health['total_agents']} total")

if success:
print(f"\n🔭 View traces at your OTel backend")
print(f" (Jaeger: http://localhost:16686 → search 'bmasterai-example')")

# Give batch exporter time to flush
time.sleep(2)
monitor.stop_monitoring()
14 changes: 13 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,20 @@ integrations = [
"redis>=4.0.0",
"celery>=5.2.0",
]
otlp = [
"opentelemetry-api>=1.20.0",
"opentelemetry-sdk>=1.20.0",
"opentelemetry-exporter-otlp-proto-grpc>=1.20.0",
"opentelemetry-semantic-conventions>=0.41b0",
]
otlp-http = [
"opentelemetry-api>=1.20.0",
"opentelemetry-sdk>=1.20.0",
"opentelemetry-exporter-otlp-proto-http>=1.20.0",
"opentelemetry-semantic-conventions>=0.41b0",
]
all = [
"bmasterai[dev,integrations]"
"bmasterai[dev,integrations,otlp]"
]

[project.urls]
Expand Down
4 changes: 3 additions & 1 deletion src/bmasterai/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from .reasoning_logger import (
ReasoningSession, ChainOfThought, with_reasoning_logging, log_reasoning
)
from .otlp import configure_otlp

__all__ = [
"configure_logging",
Expand All @@ -28,5 +29,6 @@
"ReasoningSession",
"ChainOfThought",
"with_reasoning_logging",
"log_reasoning"
"log_reasoning",
"configure_otlp",
]
24 changes: 24 additions & 0 deletions src/bmasterai/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@
from collections import defaultdict, deque
import statistics

# Optional OTLP export — no-op if not configured or opentelemetry-sdk not installed
try:
from bmasterai import otlp as _otlp
except ImportError: # pragma: no cover
_otlp = None # type: ignore

@dataclass
class MetricPoint:
timestamp: datetime
Expand Down Expand Up @@ -231,17 +237,22 @@ def track_agent_start(self, agent_id: str):
self.agent_metrics[agent_id]['start_time'] = datetime.now(timezone.utc)
self.agent_metrics[agent_id]['status'] = 'running'
self.metrics_collector.record_custom_metric('agents_active', 1, {'agent_id': agent_id})
if _otlp:
_otlp.on_agent_start(agent_id)

def track_agent_stop(self, agent_id: str):
if agent_id in self.agent_metrics:
start_time = self.agent_metrics[agent_id].get('start_time')
runtime = None
if start_time:
runtime = (datetime.now(timezone.utc) - start_time).total_seconds()
self.agent_metrics[agent_id]['total_runtime'] = runtime
self.metrics_collector.record_custom_metric('agent_runtime_seconds', runtime, {'agent_id': agent_id})

self.agent_metrics[agent_id]['status'] = 'stopped'
self.agent_metrics[agent_id]['stop_time'] = datetime.now(timezone.utc)
if _otlp:
_otlp.on_agent_stop(agent_id, runtime_seconds=runtime)

def track_task_duration(self, agent_id: str, task_name: str, duration_ms: float):
self.task_timings[f"{agent_id}:{task_name}"].append(duration_ms)
Expand All @@ -250,6 +261,8 @@ def track_task_duration(self, agent_id: str, task_name: str, duration_ms: float)
duration_ms,
{'agent_id': agent_id, 'task_name': task_name}
)
if _otlp:
_otlp.on_task_duration(agent_id, task_name, duration_ms)

def track_error(self, agent_id: str, error_type: str = 'general'):
key = f"{agent_id}:{error_type}"
Expand All @@ -259,6 +272,8 @@ def track_error(self, agent_id: str, error_type: str = 'general'):
1,
{'agent_id': agent_id, 'error_type': error_type}
)
if _otlp:
_otlp.on_error(agent_id, error_type)

def track_llm_call(self, agent_id: str, model: str, tokens_used: int, duration_ms: float,
reasoning_steps: Optional[int] = None, thinking_depth: Optional[int] = None):
Expand Down Expand Up @@ -287,6 +302,13 @@ def track_llm_call(self, agent_id: str, model: str, tokens_used: int, duration_m
thinking_depth,
{'agent_id': agent_id, 'model': model}
)

if _otlp:
_otlp.on_llm_call(
agent_id, model, tokens_used, duration_ms,
reasoning_steps=reasoning_steps,
thinking_depth=thinking_depth,
)

def track_reasoning_session(self, agent_id: str, session_id: str,
total_steps: int, duration_ms: float,
Expand Down Expand Up @@ -353,6 +375,8 @@ def get_system_health(self) -> Dict[str, Any]:
def record_custom_metric(self, name: str, value: float, labels: Optional[Dict[str, str]] = None):
"""Record a custom metric"""
self.metrics_collector.record_custom_metric(name, value, labels)
if _otlp:
_otlp.on_custom_metric(name, value, labels)

def get_metric_stats(self, metric_name: str, duration_minutes: int = 60) -> Dict[str, float]:
"""Get statistics for a specific metric"""
Expand Down
Loading
Loading