Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- **buswatch-sdk**: Prometheus exposition format export (`prometheus` feature)
- HTTP server serving metrics at configurable endpoint
- All metrics include `module` and `topic` labels
- Health check endpoints (`/health`, `/healthz`) for Kubernetes probes
- Metrics: read/write counts, backlog, pending seconds, rates

## [0.1.0] - 2025-12-21

### Added
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ See [buswatch-types](/buswatch-types) for the full schema and serialization opti
- **Multiple data sources**: files, TCP streams, message bus subscriptions
- **Instrumentation SDK** for Rust applications
- **Adapters** for RabbitMQ, Kafka, and NATS
- **Prometheus** metrics endpoint for scraping
- **OpenTelemetry** export support
- **JSON and CBOR** serialization

Expand Down
6 changes: 6 additions & 0 deletions buswatch-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ categories = ["development-tools::debugging", "development-tools::profiling"]
default = ["tokio"]
tokio = ["dep:tokio"]
otel = ["dep:opentelemetry", "dep:opentelemetry_sdk", "dep:opentelemetry-otlp", "tokio"]
prometheus = ["tokio", "dep:hyper", "dep:hyper-util", "dep:http-body-util"]

[dependencies]
buswatch-types = { path = "../buswatch-types", features = ["serde"] }
Expand All @@ -30,5 +31,10 @@ opentelemetry = { version = "0.31", optional = true }
opentelemetry_sdk = { version = "0.31", features = ["metrics", "rt-tokio"], optional = true }
opentelemetry-otlp = { version = "0.31", features = ["metrics", "http-proto", "reqwest-blocking-client"], optional = true }

# Prometheus HTTP server (optional)
hyper = { version = "1", features = ["server", "http1"], optional = true }
hyper-util = { version = "0.1", features = ["tokio"], optional = true }
http-body-util = { version = "0.1", optional = true }

[dev-dependencies]
tokio = { version = "1", features = ["full", "test-util"] }
53 changes: 53 additions & 0 deletions buswatch-sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,33 @@ use buswatch_sdk::Output;
let output = Output::otlp("http://localhost:4317");
```

### Prometheus

Serves metrics in Prometheus exposition format via HTTP (requires `prometheus` feature):

```rust
use buswatch_sdk::{Output, prometheus::PrometheusConfig};

let config = PrometheusConfig::builder()
.listen_addr("0.0.0.0:9090")
.metrics_path("/metrics")
.namespace("myapp") // optional prefix for all metrics
.build();

let output = Output::prometheus(config);
```

This starts an HTTP server that Prometheus can scrape. Metrics available:
- `buswatch_read_count` - Total messages read (counter)
- `buswatch_write_count` - Total messages written (counter)
- `buswatch_read_backlog` - Unread message count (gauge)
- `buswatch_read_pending_seconds` - Read wait time (gauge)
- `buswatch_write_pending_seconds` - Write wait time (gauge)
- `buswatch_read_rate_per_second` - Read throughput (gauge)
- `buswatch_write_rate_per_second` - Write throughput (gauge)

Health check endpoints (`/health`, `/healthz`) are also available for Kubernetes probes.

## Recording Metrics

### Basic Counting
Expand Down Expand Up @@ -145,6 +172,7 @@ let instrumentor = Instrumentor::builder()
|---------|-------------|
| `tokio` | Async runtime support (enabled by default) |
| `otel` | OpenTelemetry OTLP export |
| `prometheus` | Prometheus metrics endpoint |

### OpenTelemetry Integration

Expand All @@ -165,6 +193,31 @@ let instrumentor = Instrumentor::builder()

This allows buswatch metrics to flow into Grafana, Datadog, or any OTLP-compatible backend.

### Prometheus Integration

Enable the `prometheus` feature for Prometheus scraping:

```toml
[dependencies]
buswatch-sdk = { version = "0.1", features = ["prometheus"] }
```

```rust
use buswatch_sdk::{Instrumentor, Output, prometheus::PrometheusConfig};

let config = PrometheusConfig::builder()
.listen_addr("0.0.0.0:9090")
.metrics_path("/metrics")
.build();

let instrumentor = Instrumentor::builder()
.output(Output::prometheus(config))
.build();

instrumentor.start();
// Metrics now available at http://localhost:9090/metrics
```

## Thread Safety

The SDK is designed for concurrent use:
Expand Down
10 changes: 10 additions & 0 deletions buswatch-sdk/src/instrumentor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ impl Instrumentor {
/// This spawns a tokio task that periodically collects and emits
/// snapshots to all configured outputs.
///
/// For Prometheus outputs, this also starts the HTTP server to serve metrics.
///
/// Returns a handle that can be used to stop the emission.
#[cfg(feature = "tokio")]
pub fn start(&self) -> EmissionHandle {
Expand All @@ -102,6 +104,14 @@ impl Instrumentor {
let outputs = self.outputs.clone();
let interval = self.interval;

// Start Prometheus HTTP servers for any Prometheus outputs
#[cfg(feature = "prometheus")]
for output in outputs.iter() {
if let Output::Prometheus(exporter) = output {
exporter.start_server();
}
}

tokio::spawn(async move {
let mut interval_timer = tokio::time::interval(interval);
let mut stop_rx = stop_rx;
Expand Down
3 changes: 3 additions & 0 deletions buswatch-sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ mod state;
#[cfg(feature = "otel")]
pub mod otel;

#[cfg(feature = "prometheus")]
pub mod prometheus;

pub use handle::ModuleHandle;
pub use instrumentor::{Instrumentor, InstrumentorBuilder};
pub use output::Output;
Expand Down
40 changes: 39 additions & 1 deletion buswatch-sdk/src/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ use std::path::PathBuf;

use buswatch_types::Snapshot;

#[cfg(feature = "otel")]
#[cfg(any(feature = "otel", feature = "prometheus"))]
use std::sync::Arc;

#[cfg(feature = "otel")]
use crate::otel::{OtelConfig, OtelExporter};

#[cfg(feature = "prometheus")]
use crate::prometheus::{PrometheusConfig, PrometheusExporter};

/// Output destination for snapshots.
///
/// Configure where the instrumentor should emit snapshots.
Expand All @@ -36,6 +39,12 @@ pub enum Output {
/// Use `Output::otel()` to create this variant.
#[cfg(feature = "otel")]
Otel(Arc<OtelExporter>),

/// Serve metrics in Prometheus exposition format via HTTP.
///
/// Use `Output::prometheus()` to create this variant.
#[cfg(feature = "prometheus")]
Prometheus(Arc<PrometheusExporter>),
}

impl Output {
Expand Down Expand Up @@ -111,6 +120,30 @@ impl Output {
Ok(Output::Otel(Arc::new(exporter)))
}

/// Create a Prometheus HTTP endpoint output.
///
/// This serves metrics in Prometheus exposition format at the configured
/// HTTP endpoint, allowing Prometheus to scrape metrics.
///
/// # Example
///
/// ```rust,no_run
/// use buswatch_sdk::Output;
/// use buswatch_sdk::prometheus::PrometheusConfig;
///
/// let config = PrometheusConfig::builder()
/// .listen_addr("0.0.0.0:9090")
/// .metrics_path("/metrics")
/// .build();
///
/// let output = Output::prometheus(config);
/// ```
#[cfg(feature = "prometheus")]
pub fn prometheus(config: PrometheusConfig) -> Self {
let exporter = PrometheusExporter::new(config);
Output::Prometheus(Arc::new(exporter))
}

/// Emit a snapshot to this output.
#[cfg(feature = "tokio")]
pub(crate) async fn emit(&self, snapshot: &Snapshot) -> std::io::Result<()> {
Expand Down Expand Up @@ -139,6 +172,11 @@ impl Output {
// Record metrics to OpenTelemetry
exporter.record(snapshot);
}
#[cfg(feature = "prometheus")]
Output::Prometheus(exporter) => {
// Update the latest snapshot for Prometheus scraping
exporter.record(snapshot);
}
}
Ok(())
}
Expand Down
Loading