Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- All metrics include `module` and `topic` labels
- Health check endpoints (`/health`, `/healthz`) for Kubernetes probes
- Metrics: read/write counts, backlog, pending seconds, rates
- **buswatch-sdk**: Tracing integration for structured logging (`tracing` feature)
- TRACE level logs for record_read/record_write operations
- DEBUG level logs for module registration and snapshot collection
- INFO/WARN level logs for background emission lifecycle and errors
- Zero overhead when feature is disabled

## [0.1.0] - 2025-12-21

Expand Down
4 changes: 4 additions & 0 deletions buswatch-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ default = ["tokio"]
tokio = ["dep:tokio"]
otel = ["dep:opentelemetry", "dep:opentelemetry_sdk", "dep:opentelemetry-otlp", "tokio"]
prometheus = ["tokio", "dep:hyper", "dep:hyper-util", "dep:http-body-util"]
tracing = ["dep:tracing"]

[dependencies]
buswatch-types = { path = "../buswatch-types", features = ["serde"] }
Expand All @@ -36,5 +37,8 @@ hyper = { version = "1", features = ["server", "http1"], optional = true }
hyper-util = { version = "0.1", features = ["tokio"], optional = true }
http-body-util = { version = "0.1", optional = true }

# Tracing (optional, for structured logging)
tracing = { version = "0.1", optional = true }

[dev-dependencies]
tokio = { version = "1", features = ["full", "test-util"] }
35 changes: 35 additions & 0 deletions buswatch-sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ let instrumentor = Instrumentor::builder()
| `tokio` | Async runtime support (enabled by default) |
| `otel` | OpenTelemetry OTLP export |
| `prometheus` | Prometheus metrics endpoint |
| `tracing` | Structured logging via tracing crate |

### OpenTelemetry Integration

Expand Down Expand Up @@ -218,6 +219,40 @@ instrumentor.start();
// Metrics now available at http://localhost:9090/metrics
```

### Tracing Integration

Enable the `tracing` feature for structured logging:

```toml
[dependencies]
buswatch-sdk = { version = "0.1", features = ["tracing"] }
```

```rust
use tracing_subscriber;

fn main() {
// Initialize a tracing subscriber
tracing_subscriber::fmt::init();

// Now buswatch operations emit structured logs
let instrumentor = buswatch_sdk::Instrumentor::new();
let handle = instrumentor.register("my-service"); // DEBUG log
handle.record_read("events", 10); // TRACE log
}
```

**Log levels:**
- `TRACE`: Individual `record_read`/`record_write` calls (high volume)
- `DEBUG`: Module registration, snapshot collection
- `INFO`: Background emission start/stop
- `WARN`: Emission failures

Control verbosity with the `RUST_LOG` environment variable:
```bash
RUST_LOG=buswatch_sdk=debug cargo run
```

## Thread Safety

The SDK is designed for concurrent use:
Expand Down
6 changes: 6 additions & 0 deletions buswatch-sdk/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ impl ModuleHandle {
/// * `topic` - The topic name
/// * `count` - Number of messages read
pub fn record_read(&self, topic: &str, count: u64) {
#[cfg(feature = "tracing")]
tracing::trace!(module = %self.name, topic = topic, count = count, "record_read");

let read_state = self.state.get_or_create_read(topic);
read_state.count.fetch_add(count, Ordering::Relaxed);
}
Expand All @@ -56,6 +59,9 @@ impl ModuleHandle {
/// * `topic` - The topic name
/// * `count` - Number of messages written
pub fn record_write(&self, topic: &str, count: u64) {
#[cfg(feature = "tracing")]
tracing::trace!(module = %self.name, topic = topic, count = count, "record_write");

let write_state = self.state.get_or_create_write(topic);
write_state.count.fetch_add(count, Ordering::Relaxed);

Expand Down
21 changes: 20 additions & 1 deletion buswatch-sdk/src/instrumentor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ impl Instrumentor {
///
/// * `name` - The module name (e.g., "order-processor", "notification-sender")
pub fn register(&self, name: &str) -> ModuleHandle {
#[cfg(feature = "tracing")]
tracing::debug!(module = name, "Registering module");

let module_state = self.state.register_module(name);
ModuleHandle {
state: module_state,
Expand All @@ -84,6 +87,9 @@ impl Instrumentor {
/// This is useful if you want to emit snapshots manually rather than
/// using the background emission.
pub fn collect(&self) -> buswatch_types::Snapshot {
#[cfg(feature = "tracing")]
let _span = tracing::debug_span!("collect_snapshot").entered();

self.state.collect()
}

Expand Down Expand Up @@ -113,19 +119,32 @@ impl Instrumentor {
}

tokio::spawn(async move {
#[cfg(feature = "tracing")]
tracing::info!("Background emission started");

let mut interval_timer = tokio::time::interval(interval);
let mut stop_rx = stop_rx;

loop {
tokio::select! {
_ = interval_timer.tick() => {
let snapshot = state.collect();

#[cfg(feature = "tracing")]
tracing::debug!(modules = snapshot.modules.len(), "Emitting snapshot");

for output in outputs.iter() {
let _ = output.emit(&snapshot).await;
if let Err(e) = output.emit(&snapshot).await {
#[cfg(feature = "tracing")]
tracing::warn!(error = %e, "Failed to emit snapshot");
let _ = e; // suppress unused warning when tracing disabled
}
}
}
_ = stop_rx.changed() => {
if *stop_rx.borrow() {
#[cfg(feature = "tracing")]
tracing::info!("Background emission stopped");
break;
}
}
Expand Down