From fa236930833390e83268d8800a1b8173f84f94e0 Mon Sep 17 00:00:00 2001 From: alxayo Date: Wed, 4 Mar 2026 11:29:32 +0200 Subject: [PATCH 01/24] docs(metrics): add feature 006 spec for expvar metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Define metrics catalog (14 counters), HTTP endpoint design, architecture overview, CLI flag (-metrics-addr), security considerations, and usage examples. Zero external dependencies — uses stdlib expvar + net/http only. --- specs/006-expvar-metrics/spec.md | 488 +++++++++++++++++++++++++++++++ 1 file changed, 488 insertions(+) create mode 100644 specs/006-expvar-metrics/spec.md diff --git a/specs/006-expvar-metrics/spec.md b/specs/006-expvar-metrics/spec.md new file mode 100644 index 0000000..ec6b9f8 --- /dev/null +++ b/specs/006-expvar-metrics/spec.md @@ -0,0 +1,488 @@ +# Feature 006: Expvar Metrics — Live Counters for Connections, Publishers, Subscribers + +**Feature**: 006-expvar-metrics +**Status**: Draft +**Date**: 2026-03-04 +**Branch**: `feature/006-expvar-metrics` + +## Overview + +Add real-time server metrics via Go's standard `expvar` package. An optional HTTP +endpoint exposes live JSON counters for connections, publishers, subscribers, +streams, media throughput, and relay health. Zero external dependencies. + +### Design Constraints + +- **Zero external dependencies** (stdlib `expvar` + `net/http` only) +- **Opt-in**: disabled by default; enabled via `-metrics-addr` CLI flag +- **Goroutine-safe**: `expvar.Int` uses `sync/atomic` internally — no additional locking +- **Minimal footprint**: one new package, ~15 one-liner instrumentation points in existing code +- **Non-intrusive**: metrics calls never block or affect RTMP data path performance + +--- + +## What It Provides + +### Metrics Endpoint + +When enabled, an HTTP server listens on the configured address and serves: + +- `GET /debug/vars` — standard expvar JSON output (all Go runtime + RTMP counters) + +This endpoint is compatible with: +- **curl / jq** — direct JSON inspection +- **Prometheus** — via `expvar_exporter` or similar scrapers +- **Grafana** — via Prometheus or JSON data sources +- **Custom dashboards** — any HTTP/JSON consumer + +### Metrics Catalog + +| Metric Name | Type | Description | Source | +|---|---|---|---| +| `rtmp_connections_active` | Gauge | Currently open TCP connections | `Server.conns` map | +| `rtmp_connections_total` | Counter | Total connections accepted since startup | `Server.acceptLoop` | +| `rtmp_streams_active` | Gauge | Currently active publish streams | `Registry.streams` map | +| `rtmp_publishers_active` | Gauge | Currently active publishers | `Stream.SetPublisher` / `PublisherDisconnected` | +| `rtmp_publishers_total` | Counter | Total publish sessions since startup | `Stream.SetPublisher` | +| `rtmp_subscribers_active` | Gauge | Currently active subscribers | `Stream.AddSubscriber` / `RemoveSubscriber` | +| `rtmp_subscribers_total` | Counter | Total play sessions since startup | `Stream.AddSubscriber` | +| `rtmp_messages_audio` | Counter | Total audio messages received | `MediaLogger.ProcessMessage` | +| `rtmp_messages_video` | Counter | Total video messages received | `MediaLogger.ProcessMessage` | +| `rtmp_bytes_ingested` | Counter | Total media bytes received (audio + video) | `MediaLogger.ProcessMessage` | +| `rtmp_relay_messages_sent` | Counter | Total relay messages delivered | `Destination.SendMessage` | +| `rtmp_relay_messages_dropped` | Counter | Total relay messages dropped | `Destination.SendMessage` | +| `rtmp_relay_bytes_sent` | Counter | Total relay bytes transmitted | `Destination.SendMessage` | +| `rtmp_uptime_seconds` | Gauge (func) | Seconds since server start | Computed on each request | +| `rtmp_server_info` | Map (func) | Static info: listen addr, Go version | Computed on each request | + +**Gauge** = value goes up and down (active counts). +**Counter** = monotonically increasing (totals). +Both are implemented as `expvar.Int` (atomic int64). `Add(1)` to increment, `Add(-1)` to decrement gauges. + +--- + +## Protocol / API + +### HTTP Response Format + +`GET /debug/vars` returns JSON (standard expvar format): + +```json +{ + "cmdline": ["./rtmp-server", "-listen", ":1935", "-metrics-addr", ":8080"], + "memstats": { ... }, + "rtmp_connections_active": 3, + "rtmp_connections_total": 47, + "rtmp_streams_active": 1, + "rtmp_publishers_active": 1, + "rtmp_publishers_total": 5, + "rtmp_subscribers_active": 2, + "rtmp_subscribers_total": 12, + "rtmp_messages_audio": 145023, + "rtmp_messages_video": 87412, + "rtmp_bytes_ingested": 523948172, + "rtmp_relay_messages_sent": 87410, + "rtmp_relay_messages_dropped": 2, + "rtmp_relay_bytes_sent": 261974086, + "rtmp_uptime_seconds": 3847, + "rtmp_server_info": { + "listen_addr": ":1935", + "go_version": "go1.21.0" + } +} +``` + +### CLI Usage + +```bash +# Start with metrics disabled (default — no change to existing behavior) +./rtmp-server -listen :1935 + +# Start with metrics enabled on port 8080 +./rtmp-server -listen :1935 -metrics-addr :8080 + +# Start with metrics on localhost only (production hardening) +./rtmp-server -listen :1935 -metrics-addr 127.0.0.1:8080 + +# Query live metrics +curl -s http://localhost:8080/debug/vars | jq '{ + connections: .rtmp_connections_active, + publishers: .rtmp_publishers_active, + subscribers: .rtmp_subscribers_active, + streams: .rtmp_streams_active, + uptime: .rtmp_uptime_seconds +}' +``` + +--- + +## Architecture + +### New Package + +``` +internal/rtmp/metrics/ +├── metrics.go # Package-level expvar variable declarations, init, helper funcs +└── metrics_test.go # Unit tests for counter behavior +``` + +The metrics package is a thin, stateless wrapper around `expvar`. It declares +package-level variables and provides no business logic — the instrumentation +is done at the call sites in existing packages. + +### Modified Files + +| File | Change | +|------|--------| +| `internal/rtmp/server/server.go` | Instrument `acceptLoop` (connection accepted), `RemoveConnection` (connection removed) | +| `internal/rtmp/server/registry.go` | Instrument `CreateStream`, `DeleteStream`, `SetPublisher`, `AddSubscriber`, `RemoveSubscriber` | +| `internal/rtmp/server/publish_handler.go` | Instrument `PublisherDisconnected` | +| `internal/rtmp/server/media_logger.go` | Instrument `ProcessMessage` for audio/video/byte counters | +| `internal/rtmp/relay/destination.go` | Instrument `SendMessage` for relay sent/dropped/bytes | +| `cmd/rtmp-server/flags.go` | Add `-metrics-addr` flag | +| `cmd/rtmp-server/main.go` | Start HTTP metrics server when flag is set | + +### Data Flow + +``` + ┌─────────────────────┐ + RTMP Client ──TCP──► │ RTMP Server │ + │ │ + acceptLoop ─────────► │ metrics.Connections │◄── expvar.Int (atomic) + registry ─────────► │ metrics.Streams │◄── expvar.Int (atomic) + SetPublisher ───────► │ metrics.Publishers │◄── expvar.Int (atomic) + AddSubscriber ──────► │ metrics.Subscribers │◄── expvar.Int (atomic) + ProcessMessage ─────► │ metrics.Messages │◄── expvar.Int (atomic) + SendMessage ───────► │ metrics.Relay │◄── expvar.Int (atomic) + │ │ + └─────────┬────────────┘ + │ + HTTP :8080│ /debug/vars + ▼ + ┌─────────────────────┐ + │ JSON Response │ + │ (curl / Prometheus) │ + └─────────────────────┘ +``` + +All instrumentation points call `expvar.Int.Add()` which is a single atomic +operation — no mutexes, no allocations, no blocking. The RTMP data path +remains unaffected. + +--- + +## Detailed Design + +### T001: Create metrics package with expvar declarations + +**File**: `internal/rtmp/metrics/metrics.go` + +```go +package metrics + +import ( + "expvar" + "runtime" + "time" +) + +// Server start time — set once during init. +var startTime time.Time + +func init() { + startTime = time.Now() +} + +// ── Connection metrics ────────────────────────────────────────────── +var ( + ConnectionsActive = expvar.NewInt("rtmp_connections_active") + ConnectionsTotal = expvar.NewInt("rtmp_connections_total") +) + +// ── Stream metrics ────────────────────────────────────────────────── +var ( + StreamsActive = expvar.NewInt("rtmp_streams_active") +) + +// ── Publisher metrics ─────────────────────────────────────────────── +var ( + PublishersActive = expvar.NewInt("rtmp_publishers_active") + PublishersTotal = expvar.NewInt("rtmp_publishers_total") +) + +// ── Subscriber metrics ────────────────────────────────────────────── +var ( + SubscribersActive = expvar.NewInt("rtmp_subscribers_active") + SubscribersTotal = expvar.NewInt("rtmp_subscribers_total") +) + +// ── Media metrics ─────────────────────────────────────────────────── +var ( + MessagesAudio = expvar.NewInt("rtmp_messages_audio") + MessagesVideo = expvar.NewInt("rtmp_messages_video") + BytesIngested = expvar.NewInt("rtmp_bytes_ingested") +) + +// ── Relay metrics ─────────────────────────────────────────────────── +var ( + RelayMessagesSent = expvar.NewInt("rtmp_relay_messages_sent") + RelayMessagesDropped = expvar.NewInt("rtmp_relay_messages_dropped") + RelayBytesSent = expvar.NewInt("rtmp_relay_bytes_sent") +) + +func init() { + // Uptime: computed on each request, always current + expvar.Publish("rtmp_uptime_seconds", expvar.Func(func() interface{} { + return int64(time.Since(startTime).Seconds()) + })) + + // Server info: static metadata + expvar.Publish("rtmp_server_info", expvar.Func(func() interface{} { + return map[string]string{ + "go_version": runtime.Version(), + } + })) +} +``` + +**Why package-level vars**: Allows any package to import `metrics` and call +`metrics.ConnectionsActive.Add(1)` without passing state around. The expvar +registry is process-global by design. + +**Tests** (`metrics_test.go`): +- Verify initial values are 0 +- Verify Add(1), Add(-1) for gauges +- Verify monotonic Add(1) for counters +- Verify uptime is > 0 +- Verify server_info returns a map with go_version + +--- + +### T002: Add `-metrics-addr` CLI flag + +**File**: `cmd/rtmp-server/flags.go` + +Add field to `cliConfig`: +```go +metricsAddr string // HTTP address for expvar/pprof (default "" = disabled) +``` + +Add flag registration: +```go +fs.StringVar(&cfg.metricsAddr, "metrics-addr", "", + "HTTP address for metrics endpoint (e.g. :8080 or 127.0.0.1:8080). Empty = disabled") +``` + +**File**: `cmd/rtmp-server/main.go` + +After `server.Start()`, start the metrics HTTP listener: +```go +if cfg.metricsAddr != "" { + go func() { + // Import expvar for side-effect: registers /debug/vars on DefaultServeMux + log.Info("metrics HTTP server listening", "addr", cfg.metricsAddr) + if err := http.ListenAndServe(cfg.metricsAddr, nil); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Error("metrics HTTP server error", "error", err) + } + }() +} +``` + +**Tests**: Verify flag parsing includes new field, verify default is empty string. + +--- + +### T003: Instrument connection lifecycle + +**File**: `internal/rtmp/server/server.go` + +In `acceptLoop`, after `s.conns[c.ID()] = c`: +```go +metrics.ConnectionsActive.Add(1) +metrics.ConnectionsTotal.Add(1) +``` + +In `RemoveConnection`, after `delete(s.conns, id)`: +```go +metrics.ConnectionsActive.Add(-1) +``` + +**Tests**: Table-driven integration test — accept N connections, verify +`ConnectionsActive == N`, disconnect one, verify `ConnectionsActive == N-1`. +Verify `ConnectionsTotal` only increases. + +--- + +### T004: Instrument stream registry + +**File**: `internal/rtmp/server/registry.go` + +In `CreateStream`, when a new stream is created (the `created == true` path): +```go +metrics.StreamsActive.Add(1) +``` + +In `DeleteStream`, after `delete(r.streams, key)`: +```go +metrics.StreamsActive.Add(-1) +``` + +In `SetPublisher`, after `s.Publisher = pub`: +```go +metrics.PublishersActive.Add(1) +metrics.PublishersTotal.Add(1) +``` + +In `AddSubscriber`, after `append`: +```go +metrics.SubscribersActive.Add(1) +metrics.SubscribersTotal.Add(1) +``` + +In `RemoveSubscriber`, after the swap-delete succeeds (inside the `if existing == sub` block): +```go +metrics.SubscribersActive.Add(-1) +``` + +**Tests**: Create stream → verify `StreamsActive == 1`. Set publisher → verify +`PublishersActive == 1`. Add 3 subscribers → verify `SubscribersActive == 3`. +Remove 1 → verify `SubscribersActive == 2`. Delete stream → verify +`StreamsActive == 0`. + +--- + +### T005: Instrument publisher disconnect + +**File**: `internal/rtmp/server/publish_handler.go` + +In `PublisherDisconnected`, after `s.Publisher = nil` (inside the `if s.Publisher == pub` block): +```go +metrics.PublishersActive.Add(-1) +``` + +**Tests**: Publish → disconnect → verify `PublishersActive == 0`, +`PublishersTotal == 1`. + +--- + +### T006: Instrument media logger + +**File**: `internal/rtmp/server/media_logger.go` + +In `ProcessMessage`, after the existing counter increments: +```go +metrics.BytesIngested.Add(int64(len(msg.Payload))) + +if msg.TypeID == 8 { + metrics.MessagesAudio.Add(1) +} else { + metrics.MessagesVideo.Add(1) +} +``` + +These calls are placed after existing `ml.audioCount++` / `ml.videoCount++` +to keep the two counter systems consistent. + +**Tests**: Feed N audio + M video messages → verify `MessagesAudio == N`, +`MessagesVideo == M`, `BytesIngested == sum(payload sizes)`. + +--- + +### T007: Instrument relay destination + +**File**: `internal/rtmp/relay/destination.go` + +In `SendMessage`, after the existing `d.Metrics.MessagesSent++`: +```go +metrics.RelayMessagesSent.Add(1) +metrics.RelayBytesSent.Add(int64(len(msg.Payload))) +``` + +In the two drop paths (not connected + send error), after `d.Metrics.MessagesDropped++`: +```go +metrics.RelayMessagesDropped.Add(1) +``` + +**Tests**: Mock client, send messages → verify `RelayMessagesSent` and +`RelayBytesSent`. Force error → verify `RelayMessagesDropped`. + +--- + +### T008: Integration test — full metrics endpoint + +**File**: `tests/integration/metrics_test.go` + +End-to-end test: +1. Start server with `-metrics-addr` on a random port +2. Verify `GET /debug/vars` returns 200 with valid JSON +3. Verify all `rtmp_*` keys are present +4. Verify initial values are 0 +5. Verify `rtmp_uptime_seconds > 0` +6. Verify `rtmp_server_info` contains `go_version` + +--- + +## Scope Boundaries + +### In Scope +- `expvar` package-level counters (atomic int64) +- HTTP endpoint for JSON metrics export +- CLI flag to enable/disable +- Per-category counters (connections, streams, publishers, subscribers, media, relay) +- Uptime and server info computed vars +- Unit tests for each instrumentation point +- Integration test for HTTP endpoint + +### Out of Scope (future enhancements) +- **Prometheus `/metrics` endpoint** — use prometheus expvar exporter instead +- **Per-stream metric breakdowns** — would require `expvar.Map` nesting +- **Histograms / percentiles** — expvar only supports counters/gauges +- **Authentication on metrics endpoint** — bind to localhost for security +- **pprof endpoint** — separate feature, could share the same HTTP listener +- **Metric retention / history** — expvar is live-only, no time series storage +- **Dashboard / UI** — consumers use their own tools (Grafana, etc.) + +--- + +## Security Considerations + +- **Metrics endpoint binds to configurable address** — users should bind to + `127.0.0.1:8080` in production to prevent external access +- **No sensitive data exposed** — only numeric counters and Go version string +- **No authentication** — standard for internal debug endpoints; network-level + access control is the appropriate mitigation +- **Disabled by default** — no listener starts unless `-metrics-addr` is explicitly set + +--- + +## Usage Examples + +### Monitor during load test +```bash +# Terminal 1: Start server with metrics +./rtmp-server -listen :1935 -metrics-addr :8080 -log-level info + +# Terminal 2: Publish a test stream +ffmpeg -re -i test.mp4 -c copy -f flv rtmp://localhost:1935/live/test + +# Terminal 3: Watch metrics live (refresh every 2s) +watch -n2 'curl -s http://localhost:8080/debug/vars | jq "{ + conns: .rtmp_connections_active, + publishers: .rtmp_publishers_active, + subscribers: .rtmp_subscribers_active, + streams: .rtmp_streams_active, + audio_msgs: .rtmp_messages_audio, + video_msgs: .rtmp_messages_video, + bytes_in: .rtmp_bytes_ingested, + uptime: .rtmp_uptime_seconds +}"' +``` + +### Alerting on subscriber count +```bash +# Simple threshold check +SUBS=$(curl -s http://localhost:8080/debug/vars | jq .rtmp_subscribers_active) +if [ "$SUBS" -gt 100 ]; then + echo "WARNING: $SUBS subscribers active" +fi +``` From de10704ec61907f79313e9f707d61601244778a5 Mon Sep 17 00:00:00 2001 From: alxayo Date: Wed, 4 Mar 2026 11:29:39 +0200 Subject: [PATCH 02/24] docs(metrics): add implementation plan with 8 tasks Task breakdown: T001 metrics package, T002 CLI flag/HTTP listener, T003-T007 instrumentation points (connections, registry, publisher disconnect, media logger, relay), T008 integration test. Includes dependency graph, acceptance criteria, and test strategy. --- specs/006-expvar-metrics/plan.md | 358 +++++++++++++++++++++++++++++++ 1 file changed, 358 insertions(+) create mode 100644 specs/006-expvar-metrics/plan.md diff --git a/specs/006-expvar-metrics/plan.md b/specs/006-expvar-metrics/plan.md new file mode 100644 index 0000000..2243b07 --- /dev/null +++ b/specs/006-expvar-metrics/plan.md @@ -0,0 +1,358 @@ +# Feature 006: Expvar Metrics — Implementation Plan + +**Branch**: `feature/006-expvar-metrics` +**Date**: 2026-03-04 + +## Task Dependency Graph + +``` +T001 (metrics package) + ├──► T002 (CLI flag + HTTP listener) + ├──► T003 (instrument connections) + ├──► T004 (instrument registry) + │ └──► T005 (instrument publisher disconnect) + ├──► T006 (instrument media logger) + └──► T007 (instrument relay) + └──► T008 (integration test) +``` + +T001 is the foundation — all other tasks depend on it. T003–T007 are +independent of each other and can be implemented in any order. T008 depends +on all instrumentation being complete. + +--- + +## Tasks + +### T001: Create `internal/rtmp/metrics` package + +**Priority**: 1 (blocking) +**Estimated complexity**: Low +**Files**: +- `internal/rtmp/metrics/metrics.go` — Create new +- `internal/rtmp/metrics/metrics_test.go` — Create new + +**What to do**: +1. Create package with all `expvar.Int` declarations (14 counters) +2. Register `expvar.Func` for `rtmp_uptime_seconds` and `rtmp_server_info` +3. Write tests: + - All counters initialize to 0 + - `Add(1)` / `Add(-1)` work correctly for gauge-style metrics + - `rtmp_uptime_seconds` returns `> 0` + - `rtmp_server_info` returns map with `go_version` key + - Counters appear in `expvar.Handler` JSON output + +**Acceptance criteria**: +- `go test -race ./internal/rtmp/metrics/...` passes +- All 14 `rtmp_*` variables registered and queryable + +**Commit**: `feat(metrics): add expvar metrics package with counter declarations` + +--- + +### T002: Add `-metrics-addr` CLI flag and HTTP listener + +**Priority**: 2 +**Estimated complexity**: Low +**Files**: +- `cmd/rtmp-server/flags.go` — Edit +- `cmd/rtmp-server/main.go` — Edit + +**What to do**: +1. Add `metricsAddr string` to `cliConfig` struct +2. Register `-metrics-addr` flag with `fs.StringVar`, default `""` (disabled) +3. In `main()`, after `server.Start()`, if `metricsAddr != ""`: + - Import `net/http` and blank-import `expvar` for handler registration + - Start `http.ListenAndServe(cfg.metricsAddr, nil)` in a goroutine + - Log the metrics address on startup +4. On shutdown signal, no special cleanup needed (HTTP server stops with process) + +**Key decisions**: +- Use `http.DefaultServeMux` — `expvar` auto-registers on it +- Run HTTP server in a goroutine — non-blocking, dies with process +- No TLS or auth — this is an internal debug endpoint + +**Acceptance criteria**: +- `./rtmp-server -metrics-addr :8080` starts HTTP listener +- `curl http://localhost:8080/debug/vars` returns valid JSON with runtime vars +- Default (no flag) = no HTTP listener started +- `./rtmp-server -h` shows the new flag + +**Commit**: `feat(metrics): add -metrics-addr CLI flag and HTTP metrics endpoint` + +--- + +### T003: Instrument connection lifecycle in server.go + +**Priority**: 3 +**Estimated complexity**: Low +**Files**: +- `internal/rtmp/server/server.go` — Edit + +**What to do**: +1. Add `import "github.com/alxayo/go-rtmp/internal/rtmp/metrics"` +2. In `acceptLoop()`, after line `s.conns[c.ID()] = c` (inside `s.mu.Lock()` block): + ```go + metrics.ConnectionsActive.Add(1) + metrics.ConnectionsTotal.Add(1) + ``` +3. In `RemoveConnection()`, after `delete(s.conns, id)`: + ```go + metrics.ConnectionsActive.Add(-1) + ``` + +**Edge cases**: +- `RemoveConnection` is called exactly once per connection (from disconnect handler) +- The `Stop()` method calls `clear(s.conns)` but doesn't go through + `RemoveConnection` — this is fine because metrics don't need to be accurate + during shutdown (the process is ending) + +**Acceptance criteria**: +- Accept 3 connections → `ConnectionsActive == 3`, `ConnectionsTotal == 3` +- Disconnect 1 → `ConnectionsActive == 2`, `ConnectionsTotal == 3` +- `go test -race ./internal/rtmp/server/...` passes + +**Commit**: `feat(metrics): instrument connection accept and remove` + +--- + +### T004: Instrument stream registry + +**Priority**: 3 +**Estimated complexity**: Low +**Files**: +- `internal/rtmp/server/registry.go` — Edit + +**What to do**: +1. Add `import "github.com/alxayo/go-rtmp/internal/rtmp/metrics"` +2. In `CreateStream()`, after the new stream is inserted into the map + (the `created == true` path, after `r.streams[key] = s`): + ```go + metrics.StreamsActive.Add(1) + ``` +3. In `DeleteStream()`, after `delete(r.streams, key)` inside the `if ok` block: + ```go + metrics.StreamsActive.Add(-1) + ``` +4. In `SetPublisher()`, after `s.Publisher = pub`: + ```go + metrics.PublishersActive.Add(1) + metrics.PublishersTotal.Add(1) + ``` +5. In `AddSubscriber()`, after `s.Subscribers = append(...)`: + ```go + metrics.SubscribersActive.Add(1) + metrics.SubscribersTotal.Add(1) + ``` +6. In `RemoveSubscriber()`, inside the `if existing == sub` block, + after the swap-delete: + ```go + metrics.SubscribersActive.Add(-1) + ``` + +**Edge cases**: +- `CreateStream` has a double-check pattern (fast RLock read, then Lock write). + Only increment when `created == true` (second return value). +- `RemoveSubscriber` only decrements when a match is actually found. +- `SetPublisher` returns `ErrPublisherExists` without incrementing when + a publisher is already set. + +**Acceptance criteria**: +- Create 2 streams → `StreamsActive == 2` +- Delete 1 → `StreamsActive == 1` +- Set publisher → `PublishersActive == 1`, `PublishersTotal == 1` +- Add 3 subs → `SubscribersActive == 3` +- Remove 1 sub → `SubscribersActive == 2`, `SubscribersTotal == 3` +- `go test -race ./internal/rtmp/server/...` passes + +**Commit**: `feat(metrics): instrument stream registry (streams, publishers, subscribers)` + +--- + +### T005: Instrument publisher disconnect + +**Priority**: 4 (depends on T004) +**Estimated complexity**: Low +**Files**: +- `internal/rtmp/server/publish_handler.go` — Edit + +**What to do**: +1. Add `import "github.com/alxayo/go-rtmp/internal/rtmp/metrics"` +2. In `PublisherDisconnected()`, inside the `if s.Publisher == pub` block, + after `s.Publisher = nil`: + ```go + metrics.PublishersActive.Add(-1) + ``` + +**Edge cases**: +- Only decrement when `s.Publisher == pub` — if a different publisher took + over, the old one's disconnect should not affect the counter. +- `PublisherDisconnected` is called from the connection's `onDisconnect` + handler, which fires exactly once when the readLoop exits. + +**Acceptance criteria**: +- Publish → `PublishersActive == 1` +- Disconnect publisher → `PublishersActive == 0` +- `PublishersTotal` remains unchanged +- `go test -race ./internal/rtmp/server/...` passes + +**Commit**: `feat(metrics): instrument publisher disconnect` + +--- + +### T006: Instrument media logger + +**Priority**: 3 +**Estimated complexity**: Low +**Files**: +- `internal/rtmp/server/media_logger.go` — Edit + +**What to do**: +1. Add `import "github.com/alxayo/go-rtmp/internal/rtmp/metrics"` +2. In `ProcessMessage()`, after the existing counter increments + (`ml.audioCount++` or `ml.videoCount++`), add: + ```go + metrics.BytesIngested.Add(int64(len(msg.Payload))) + ``` +3. In the `if msg.TypeID == 8` audio branch, after `ml.audioCount++`: + ```go + metrics.MessagesAudio.Add(1) + ``` +4. In the `else` (video) path, after `ml.videoCount++` (for TypeID == 9): + ```go + metrics.MessagesVideo.Add(1) + ``` + +**Placement note**: The `ProcessMessage` method is already guarded by +`ml.mu.Lock()`. The `expvar.Int.Add()` call is atomic and does not need +this lock — it works correctly whether called inside or outside the lock. +We place it inside the existing lock scope to keep the code co-located +with the existing counter increments. + +**Acceptance criteria**: +- Feed 10 audio + 5 video messages → `MessagesAudio == 10`, `MessagesVideo == 5` +- `BytesIngested == sum of all payload sizes` +- `go test -race ./internal/rtmp/server/...` passes + +**Commit**: `feat(metrics): instrument media logger for audio/video/byte counters` + +--- + +### T007: Instrument relay destination + +**Priority**: 3 +**Estimated complexity**: Low +**Files**: +- `internal/rtmp/relay/destination.go` — Edit + +**What to do**: +1. Add `import "github.com/alxayo/go-rtmp/internal/rtmp/metrics"` +2. In `SendMessage()`, in the "not connected" early-return path, + after `d.Metrics.MessagesDropped++`: + ```go + metrics.RelayMessagesDropped.Add(1) + ``` +3. In `SendMessage()`, in the send-error path, + after `d.Metrics.MessagesDropped++`: + ```go + metrics.RelayMessagesDropped.Add(1) + ``` +4. In `SendMessage()`, in the success path, + after `d.Metrics.MessagesSent++` and `d.Metrics.BytesSent += ...`: + ```go + metrics.RelayMessagesSent.Add(1) + metrics.RelayBytesSent.Add(int64(len(msg.Payload))) + ``` + +**Note**: The expvar counters are global aggregates across all destinations. +Per-destination metrics remain available via the existing +`DestinationManager.GetMetrics()` method. The expvar counters provide a +quick global overview without needing to enumerate destinations. + +**Acceptance criteria**: +- Send 5 messages successfully → `RelayMessagesSent == 5` +- Force 2 drops → `RelayMessagesDropped == 2` +- `RelayBytesSent == sum of successful payload sizes` +- `go test -race ./internal/rtmp/relay/...` passes + +**Commit**: `feat(metrics): instrument relay destination send/drop counters` + +--- + +### T008: Integration test — metrics HTTP endpoint + +**Priority**: 5 (depends on all above) +**Estimated complexity**: Medium +**Files**: +- `tests/integration/metrics_test.go` — Create new + +**What to do**: +1. Start a server with `-metrics-addr` on port 0 (OS-assigned) +2. HTTP GET `/debug/vars` → assert 200 +3. Parse JSON response +4. Assert all 14 `rtmp_*` keys exist +5. Assert initial gauge values are 0 +6. Assert `rtmp_uptime_seconds > 0` +7. Assert `rtmp_server_info` contains `go_version` + +**Acceptance criteria**: +- `go test -race ./tests/integration/ -run TestMetrics -count=1` passes +- No flaky timing issues (uptime check uses `> 0`, not exact value) + +**Commit**: `test(metrics): add integration test for metrics HTTP endpoint` + +--- + +## Execution Order + +| Step | Task | Description | +|------|------|-------------| +| 1 | T001 | Create metrics package (foundation) | +| 2 | T002 | CLI flag + HTTP listener | +| 3 | T003 | Instrument connections | +| 4 | T004 | Instrument registry | +| 5 | T005 | Instrument publisher disconnect | +| 6 | T006 | Instrument media logger | +| 7 | T007 | Instrument relay | +| 8 | T008 | Integration test | + +Each task gets its own commit. This allows easy bisection and clean review. + +--- + +## Test Strategy + +### Unit Tests (per task) +Each instrumentation point gets tested by: +1. Resetting relevant expvar counters (via `Set(0)`) to isolate tests +2. Performing the action that should increment +3. Asserting the expected counter value + +### Integration Test (T008) +Full HTTP endpoint test that validates the complete pipeline: server → expvar → +HTTP → JSON → all expected keys present. + +### Race Detection +All tests run with `-race` to verify the atomic operations are correct. + +### Manual Verification +```bash +go build -o rtmp-server.exe ./cmd/rtmp-server +./rtmp-server.exe -listen :1935 -metrics-addr :8080 -log-level debug +# In another terminal: +ffmpeg -re -i test.mp4 -c copy -f flv rtmp://localhost:1935/live/test +# In another terminal: +curl -s http://localhost:8080/debug/vars | jq . | grep rtmp_ +``` + +--- + +## Risk Assessment + +| Risk | Likelihood | Impact | Mitigation | +|------|-----------|--------|------------| +| Counter drift (inc without matching dec) | Low | Low | Each inc/dec pair is co-located; reviewed in T003-T005 | +| Race conditions on expvar access | Very Low | Medium | `expvar.Int` uses `sync/atomic` internally | +| HTTP listener port conflict | Low | Low | Disabled by default; user chooses port | +| Performance impact on hot path | Very Low | Medium | `atomic.AddInt64` is ~1ns; negligible vs I/O | +| Shutdown metrics inaccuracy | Low | None | Metrics don't need to be accurate during shutdown | From 671f2a6c45b51baad00d840e548221a1e663fc83 Mon Sep 17 00:00:00 2001 From: alxayo Date: Wed, 4 Mar 2026 11:36:07 +0200 Subject: [PATCH 03/24] feat(metrics): add expvar metrics package with counter declarations Declare 14 expvar.Int counters for connections, streams, publishers, subscribers, media throughput, and relay stats. Register expvar.Func for uptime_seconds and server_info. All counters use atomic int64. --- internal/rtmp/metrics/metrics.go | 79 ++++++++++++++++ internal/rtmp/metrics/metrics_test.go | 128 ++++++++++++++++++++++++++ 2 files changed, 207 insertions(+) create mode 100644 internal/rtmp/metrics/metrics.go create mode 100644 internal/rtmp/metrics/metrics_test.go diff --git a/internal/rtmp/metrics/metrics.go b/internal/rtmp/metrics/metrics.go new file mode 100644 index 0000000..4de169f --- /dev/null +++ b/internal/rtmp/metrics/metrics.go @@ -0,0 +1,79 @@ +package metrics + +// Expvar Metrics +// ============== +// Package-level expvar counters for live server observability. +// All variables use atomic int64 internally (via expvar.Int) and are +// safe for concurrent access from any goroutine. +// +// Gauges (values go up and down): +// - ConnectionsActive, StreamsActive, PublishersActive, SubscribersActive +// +// Counters (monotonically increasing): +// - ConnectionsTotal, PublishersTotal, SubscribersTotal +// - MessagesAudio, MessagesVideo, BytesIngested +// - RelayMessagesSent, RelayMessagesDropped, RelayBytesSent + +import ( + "expvar" + "runtime" + "time" +) + +// startTime records when the package was initialized (≈ process start). +var startTime = time.Now() + +// ── Connection metrics ────────────────────────────────────────────── + +var ( + ConnectionsActive = expvar.NewInt("rtmp_connections_active") + ConnectionsTotal = expvar.NewInt("rtmp_connections_total") +) + +// ── Stream metrics ────────────────────────────────────────────────── + +var ( + StreamsActive = expvar.NewInt("rtmp_streams_active") +) + +// ── Publisher metrics ─────────────────────────────────────────────── + +var ( + PublishersActive = expvar.NewInt("rtmp_publishers_active") + PublishersTotal = expvar.NewInt("rtmp_publishers_total") +) + +// ── Subscriber metrics ────────────────────────────────────────────── + +var ( + SubscribersActive = expvar.NewInt("rtmp_subscribers_active") + SubscribersTotal = expvar.NewInt("rtmp_subscribers_total") +) + +// ── Media metrics ─────────────────────────────────────────────────── + +var ( + MessagesAudio = expvar.NewInt("rtmp_messages_audio") + MessagesVideo = expvar.NewInt("rtmp_messages_video") + BytesIngested = expvar.NewInt("rtmp_bytes_ingested") +) + +// ── Relay metrics ─────────────────────────────────────────────────── + +var ( + RelayMessagesSent = expvar.NewInt("rtmp_relay_messages_sent") + RelayMessagesDropped = expvar.NewInt("rtmp_relay_messages_dropped") + RelayBytesSent = expvar.NewInt("rtmp_relay_bytes_sent") +) + +func init() { + expvar.Publish("rtmp_uptime_seconds", expvar.Func(func() interface{} { + return int64(time.Since(startTime).Seconds()) + })) + + expvar.Publish("rtmp_server_info", expvar.Func(func() interface{} { + return map[string]string{ + "go_version": runtime.Version(), + } + })) +} diff --git a/internal/rtmp/metrics/metrics_test.go b/internal/rtmp/metrics/metrics_test.go new file mode 100644 index 0000000..8077296 --- /dev/null +++ b/internal/rtmp/metrics/metrics_test.go @@ -0,0 +1,128 @@ +package metrics + +import ( + "encoding/json" + "expvar" + "net/http" + "net/http/httptest" + "runtime" + "strings" + "testing" +) + +func TestCountersInitializedToZero(t *testing.T) { + counters := []*expvar.Int{ + ConnectionsActive, ConnectionsTotal, + StreamsActive, + PublishersActive, PublishersTotal, + SubscribersActive, SubscribersTotal, + MessagesAudio, MessagesVideo, BytesIngested, + RelayMessagesSent, RelayMessagesDropped, RelayBytesSent, + } + for _, c := range counters { + if v := c.Value(); v != 0 { + t.Errorf("counter should be 0, got %d", v) + } + } +} + +func TestGaugeAddAndSubtract(t *testing.T) { + // Use ConnectionsActive as a representative gauge. + defer ConnectionsActive.Set(0) + + ConnectionsActive.Add(1) + ConnectionsActive.Add(1) + ConnectionsActive.Add(1) + if v := ConnectionsActive.Value(); v != 3 { + t.Fatalf("expected 3, got %d", v) + } + + ConnectionsActive.Add(-1) + if v := ConnectionsActive.Value(); v != 2 { + t.Fatalf("expected 2, got %d", v) + } +} + +func TestCounterMonotonic(t *testing.T) { + defer ConnectionsTotal.Set(0) + + for i := 0; i < 100; i++ { + ConnectionsTotal.Add(1) + } + if v := ConnectionsTotal.Value(); v != 100 { + t.Fatalf("expected 100, got %d", v) + } +} + +func TestUptimePositive(t *testing.T) { + v := expvar.Get("rtmp_uptime_seconds") + if v == nil { + t.Fatal("rtmp_uptime_seconds not registered") + } + // The func returns int64; expvar serializes it as a number. + raw := v.String() + if raw == "" { + t.Fatal("empty uptime string") + } + // Uptime should be ≥ 0 (could be 0 if test runs within the first second) + var uptime int64 + if err := json.Unmarshal([]byte(raw), &uptime); err != nil { + t.Fatalf("failed to parse uptime: %v", err) + } + if uptime < 0 { + t.Fatalf("uptime should be >= 0, got %d", uptime) + } +} + +func TestServerInfoContainsGoVersion(t *testing.T) { + v := expvar.Get("rtmp_server_info") + if v == nil { + t.Fatal("rtmp_server_info not registered") + } + raw := v.String() + var info map[string]string + if err := json.Unmarshal([]byte(raw), &info); err != nil { + t.Fatalf("failed to parse server_info: %v", err) + } + goVer, ok := info["go_version"] + if !ok { + t.Fatal("go_version key missing from server_info") + } + if goVer != runtime.Version() { + t.Fatalf("expected %s, got %s", runtime.Version(), goVer) + } +} + +func TestExpvarHandlerContainsRTMPKeys(t *testing.T) { + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/debug/vars", nil) + expvar.Handler().ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", rec.Code) + } + + body := rec.Body.String() + expectedKeys := []string{ + "rtmp_connections_active", + "rtmp_connections_total", + "rtmp_streams_active", + "rtmp_publishers_active", + "rtmp_publishers_total", + "rtmp_subscribers_active", + "rtmp_subscribers_total", + "rtmp_messages_audio", + "rtmp_messages_video", + "rtmp_bytes_ingested", + "rtmp_relay_messages_sent", + "rtmp_relay_messages_dropped", + "rtmp_relay_bytes_sent", + "rtmp_uptime_seconds", + "rtmp_server_info", + } + for _, key := range expectedKeys { + if !strings.Contains(body, key) { + t.Errorf("expvar output missing key %q", key) + } + } +} From 7f446c531f621fa6aed3b9d4fb48427a76dce1a0 Mon Sep 17 00:00:00 2001 From: alxayo Date: Wed, 4 Mar 2026 11:36:52 +0200 Subject: [PATCH 04/24] feat(metrics): add -metrics-addr CLI flag and HTTP metrics endpoint Add -metrics-addr flag (default empty = disabled). When set, starts an HTTP server on the given address serving expvar data at /debug/vars. Blank-import expvar and metrics packages to register handlers. --- cmd/rtmp-server/flags.go | 6 ++++++ cmd/rtmp-server/main.go | 13 +++++++++++++ 2 files changed, 19 insertions(+) diff --git a/cmd/rtmp-server/flags.go b/cmd/rtmp-server/flags.go index 4c28e8f..de02af4 100644 --- a/cmd/rtmp-server/flags.go +++ b/cmd/rtmp-server/flags.go @@ -31,6 +31,9 @@ type cliConfig struct { hookTimeout string // hook execution timeout (e.g. "30s") hookConcurrency int // max concurrent hook executions + // Metrics + metricsAddr string // HTTP address for expvar metrics (e.g. ":8080"); empty = disabled + // Authentication authMode string // "none", "token", "file", "callback" authTokens []string // "streamKey=token" pairs (for mode=token) @@ -62,6 +65,9 @@ func parseFlags(args []string) (*cliConfig, error) { fs.StringVar(&cfg.hookTimeout, "hook-timeout", "30s", "Hook execution timeout") fs.IntVar(&cfg.hookConcurrency, "hook-concurrency", 10, "Max concurrent hook executions") + // Metrics + fs.StringVar(&cfg.metricsAddr, "metrics-addr", "", "HTTP address for metrics endpoint (e.g. :8080 or 127.0.0.1:8080). Empty = disabled") + // Authentication flags fs.StringVar(&cfg.authMode, "auth-mode", "none", "Authentication mode: none|token|file|callback") fs.Var(&authTokens, "auth-token", `Stream token: "streamKey=token" (repeatable, for -auth-mode=token)`) diff --git a/cmd/rtmp-server/main.go b/cmd/rtmp-server/main.go index 7e25570..a64a44b 100644 --- a/cmd/rtmp-server/main.go +++ b/cmd/rtmp-server/main.go @@ -2,7 +2,9 @@ package main import ( "context" + _ "expvar" // Register /debug/vars handler on DefaultServeMux "fmt" + "net/http" "os" "os/signal" "strings" @@ -10,6 +12,7 @@ import ( "time" "github.com/alxayo/go-rtmp/internal/logger" + _ "github.com/alxayo/go-rtmp/internal/rtmp/metrics" // Register expvar RTMP counters srv "github.com/alxayo/go-rtmp/internal/rtmp/server" "github.com/alxayo/go-rtmp/internal/rtmp/server/auth" ) @@ -62,6 +65,16 @@ func main() { log.Info("server started", "addr", server.Addr().String(), "version", version, "auth_mode", cfg.authMode) + // Start HTTP metrics server if configured + if cfg.metricsAddr != "" { + go func() { + log.Info("metrics HTTP server listening", "addr", cfg.metricsAddr) + if err := http.ListenAndServe(cfg.metricsAddr, nil); err != nil && err != http.ErrServerClosed { + log.Error("metrics HTTP server error", "error", err) + } + }() + } + // If using file-based auth, listen for SIGHUP to reload the token file if cfg.authMode == "file" { if fv, ok := authValidator.(*auth.FileValidator); ok { From 6ae163196545516ef1df971f2879f3f7c5787710 Mon Sep 17 00:00:00 2001 From: alxayo Date: Wed, 4 Mar 2026 11:37:38 +0200 Subject: [PATCH 05/24] feat(metrics): instrument connection accept and remove Increment ConnectionsActive and ConnectionsTotal on successful handshake in acceptLoop. Decrement ConnectionsActive in RemoveConnection on disconnect. --- internal/rtmp/server/server.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/rtmp/server/server.go b/internal/rtmp/server/server.go index c133ae0..53ed624 100644 --- a/internal/rtmp/server/server.go +++ b/internal/rtmp/server/server.go @@ -18,6 +18,7 @@ import ( "github.com/alxayo/go-rtmp/internal/logger" "github.com/alxayo/go-rtmp/internal/rtmp/client" iconn "github.com/alxayo/go-rtmp/internal/rtmp/conn" + "github.com/alxayo/go-rtmp/internal/rtmp/metrics" "github.com/alxayo/go-rtmp/internal/rtmp/relay" "github.com/alxayo/go-rtmp/internal/rtmp/server/auth" "github.com/alxayo/go-rtmp/internal/rtmp/server/hooks" @@ -173,6 +174,8 @@ func (s *Server) acceptLoop() { s.mu.Lock() s.conns[c.ID()] = c s.mu.Unlock() + metrics.ConnectionsActive.Add(1) + metrics.ConnectionsTotal.Add(1) s.log.Info("connection registered", "conn_id", c.ID(), "remote", raw.RemoteAddr().String()) // Trigger connection accept hook event @@ -265,6 +268,7 @@ func (s *Server) RemoveConnection(id string) { s.mu.Lock() delete(s.conns, id) s.mu.Unlock() + metrics.ConnectionsActive.Add(-1) } // singleConnListener wraps a single pre-accepted net.Conn as a net.Listener. From 5e165fdf2041f6a010233be17ae7919dfb8a95a8 Mon Sep 17 00:00:00 2001 From: alxayo Date: Wed, 4 Mar 2026 11:38:18 +0200 Subject: [PATCH 06/24] feat(metrics): instrument stream registry (streams, publishers, subscribers) Increment/decrement StreamsActive on CreateStream/DeleteStream. Increment PublishersActive and PublishersTotal on SetPublisher. Increment/decrement SubscribersActive on AddSubscriber/RemoveSubscriber. Increment SubscribersTotal on AddSubscriber. --- internal/rtmp/server/registry.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/internal/rtmp/server/registry.go b/internal/rtmp/server/registry.go index c697a39..5ad43b3 100644 --- a/internal/rtmp/server/registry.go +++ b/internal/rtmp/server/registry.go @@ -19,6 +19,7 @@ import ( "github.com/alxayo/go-rtmp/internal/rtmp/chunk" "github.com/alxayo/go-rtmp/internal/rtmp/media" + "github.com/alxayo/go-rtmp/internal/rtmp/metrics" ) // ErrPublisherExists is returned when trying to set a second publisher. @@ -80,6 +81,7 @@ func (r *Registry) CreateStream(key string) (*Stream, bool) { } s := &Stream{Key: key, StartTime: time.Now(), Subscribers: make([]media.Subscriber, 0)} r.streams[key] = s + metrics.StreamsActive.Add(1) return s, true } @@ -99,6 +101,7 @@ func (r *Registry) DeleteStream(key string) bool { defer r.mu.Unlock() if _, ok := r.streams[key]; ok { delete(r.streams, key) + metrics.StreamsActive.Add(-1) return true } return false @@ -115,6 +118,8 @@ func (s *Stream) SetPublisher(pub interface{}) error { return ErrPublisherExists } s.Publisher = pub + metrics.PublishersActive.Add(1) + metrics.PublishersTotal.Add(1) return nil } @@ -125,6 +130,8 @@ func (s *Stream) AddSubscriber(sub media.Subscriber) { } s.mu.Lock() s.Subscribers = append(s.Subscribers, sub) + metrics.SubscribersActive.Add(1) + metrics.SubscribersTotal.Add(1) s.mu.Unlock() } @@ -144,6 +151,7 @@ func (s *Stream) RemoveSubscriber(sub media.Subscriber) { s.Subscribers[i] = s.Subscribers[last] s.Subscribers[last] = nil s.Subscribers = s.Subscribers[:last] + metrics.SubscribersActive.Add(-1) break } } From f6204bbfbdd5e4b0d95daf3a043cfa508609a2db Mon Sep 17 00:00:00 2001 From: alxayo Date: Wed, 4 Mar 2026 11:38:49 +0200 Subject: [PATCH 07/24] feat(metrics): instrument publisher disconnect Decrement PublishersActive when a publisher disconnects and its reference is cleared from the stream. Only fires when the disconnecting connection matches the current publisher. --- internal/rtmp/server/publish_handler.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/rtmp/server/publish_handler.go b/internal/rtmp/server/publish_handler.go index e8df508..eec00b2 100644 --- a/internal/rtmp/server/publish_handler.go +++ b/internal/rtmp/server/publish_handler.go @@ -14,6 +14,7 @@ import ( rtmperrors "github.com/alxayo/go-rtmp/internal/errors" "github.com/alxayo/go-rtmp/internal/rtmp/chunk" + "github.com/alxayo/go-rtmp/internal/rtmp/metrics" "github.com/alxayo/go-rtmp/internal/rtmp/rpc" ) @@ -76,6 +77,7 @@ func PublisherDisconnected(reg *Registry, streamKey string, pub sender) { s.mu.Lock() if s.Publisher == pub { s.Publisher = nil + metrics.PublishersActive.Add(-1) } s.mu.Unlock() } From 627f21369f3d97662bd9115740567d981762635a Mon Sep 17 00:00:00 2001 From: alxayo Date: Wed, 4 Mar 2026 11:39:23 +0200 Subject: [PATCH 08/24] feat(metrics): instrument media logger for audio/video/byte counters Increment MessagesAudio/MessagesVideo on each media packet. Accumulate BytesIngested from payload sizes. Counters are updated inside the existing mu.Lock scope alongside per-connection counters. --- internal/rtmp/server/media_logger.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/rtmp/server/media_logger.go b/internal/rtmp/server/media_logger.go index 7bcb8e9..d7bf542 100644 --- a/internal/rtmp/server/media_logger.go +++ b/internal/rtmp/server/media_logger.go @@ -14,6 +14,7 @@ import ( "github.com/alxayo/go-rtmp/internal/rtmp/chunk" "github.com/alxayo/go-rtmp/internal/rtmp/media" + "github.com/alxayo/go-rtmp/internal/rtmp/metrics" ) // MediaLogger tracks and logs media packet statistics for a connection. @@ -87,9 +88,11 @@ func (ml *MediaLogger) ProcessMessage(msg *chunk.Message) { // Update counters ml.totalBytes += uint64(len(msg.Payload)) + metrics.BytesIngested.Add(int64(len(msg.Payload))) if msg.TypeID == 8 { ml.audioCount++ + metrics.MessagesAudio.Add(1) // Detect audio codec on first packet if ml.audioCodec == "" && len(msg.Payload) > 0 { if am, err := media.ParseAudioMessage(msg.Payload); err == nil { @@ -101,6 +104,7 @@ func (ml *MediaLogger) ProcessMessage(msg *chunk.Message) { } } else if msg.TypeID == 9 { ml.videoCount++ + metrics.MessagesVideo.Add(1) // Detect video codec on first packet if ml.videoCodec == "" && len(msg.Payload) > 0 { if vm, err := media.ParseVideoMessage(msg.Payload); err == nil { From 2d10d839d1f68003033eaa523f39785bf2129698 Mon Sep 17 00:00:00 2001 From: alxayo Date: Wed, 4 Mar 2026 11:40:01 +0200 Subject: [PATCH 09/24] feat(metrics): instrument relay destination send/drop counters Increment RelayMessagesSent and RelayBytesSent on successful relay. Increment RelayMessagesDropped on connection-down and send-error paths. These are global aggregates across all destinations. --- internal/rtmp/relay/destination.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/rtmp/relay/destination.go b/internal/rtmp/relay/destination.go index 664c397..367a708 100644 --- a/internal/rtmp/relay/destination.go +++ b/internal/rtmp/relay/destination.go @@ -9,6 +9,7 @@ import ( "time" "github.com/alxayo/go-rtmp/internal/rtmp/chunk" + "github.com/alxayo/go-rtmp/internal/rtmp/metrics" ) // RTMPClient defines the interface for connecting to a remote RTMP server @@ -160,6 +161,7 @@ func (d *Destination) SendMessage(msg *chunk.Message) error { d.mu.Lock() d.Metrics.MessagesDropped++ d.mu.Unlock() + metrics.RelayMessagesDropped.Add(1) return fmt.Errorf("destination not connected (status: %v)", status) } @@ -179,6 +181,7 @@ func (d *Destination) SendMessage(msg *chunk.Message) error { d.LastError = err d.Metrics.MessagesDropped++ d.mu.Unlock() + metrics.RelayMessagesDropped.Add(1) d.logger.Error("relay send failed", "type_id", msg.TypeID, "error", err) return fmt.Errorf("send message: %w", err) } @@ -188,6 +191,8 @@ func (d *Destination) SendMessage(msg *chunk.Message) error { d.Metrics.BytesSent += uint64(len(msg.Payload)) d.Metrics.LastSentTime = time.Now() d.mu.Unlock() + metrics.RelayMessagesSent.Add(1) + metrics.RelayBytesSent.Add(int64(len(msg.Payload))) return nil } From a22a35d7bbebcd0279a492397396f573ce5c3568 Mon Sep 17 00:00:00 2001 From: alxayo Date: Wed, 4 Mar 2026 11:44:04 +0200 Subject: [PATCH 10/24] test(metrics): add integration test for metrics HTTP endpoint Start RTMP server + HTTP metrics server on ephemeral ports, query /debug/vars, and verify all 15 rtmp_* keys are present in the JSON response. Assert uptime >= 0 and server_info contains go_version. --- tests/integration/metrics_test.go | 111 ++++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 tests/integration/metrics_test.go diff --git a/tests/integration/metrics_test.go b/tests/integration/metrics_test.go new file mode 100644 index 0000000..04ddd77 --- /dev/null +++ b/tests/integration/metrics_test.go @@ -0,0 +1,111 @@ +// Package integration – end-to-end tests for the RTMP server. +// +// metrics_test.go validates the expvar metrics HTTP endpoint. +// It starts an HTTP listener (mirroring what main.go does with -metrics-addr), +// then queries /debug/vars and verifies all rtmp_* keys are present with +// correct initial values. +package integration + +import ( + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "testing" + "time" + + _ "github.com/alxayo/go-rtmp/internal/rtmp/metrics" // Register expvar counters + srv "github.com/alxayo/go-rtmp/internal/rtmp/server" +) + +// TestMetricsEndpoint verifies the expvar HTTP endpoint serves all RTMP counters. +func TestMetricsEndpoint(t *testing.T) { + // Start RTMP server on ephemeral port + s := srv.New(srv.Config{ + ListenAddr: "127.0.0.1:0", + ChunkSize: 4096, + }) + if err := s.Start(); err != nil { + t.Fatalf("server start: %v", err) + } + defer s.Stop() + + // Start HTTP metrics server on ephemeral port (mirrors main.go behavior) + metricsLn, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("metrics listen: %v", err) + } + defer metricsLn.Close() + metricsAddr := metricsLn.Addr().String() + + httpSrv := &http.Server{Handler: http.DefaultServeMux} + go func() { _ = httpSrv.Serve(metricsLn) }() + defer httpSrv.Close() + + // Give the HTTP server a moment to start + time.Sleep(50 * time.Millisecond) + + // Query /debug/vars + resp, err := http.Get(fmt.Sprintf("http://%s/debug/vars", metricsAddr)) + if err != nil { + t.Fatalf("GET /debug/vars: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Fatalf("expected 200, got %d", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("read body: %v", err) + } + + var vars map[string]json.RawMessage + if err := json.Unmarshal(body, &vars); err != nil { + t.Fatalf("parse JSON: %v", err) + } + + // Verify all RTMP counter keys exist + expectedKeys := []string{ + "rtmp_connections_active", + "rtmp_connections_total", + "rtmp_streams_active", + "rtmp_publishers_active", + "rtmp_publishers_total", + "rtmp_subscribers_active", + "rtmp_subscribers_total", + "rtmp_messages_audio", + "rtmp_messages_video", + "rtmp_bytes_ingested", + "rtmp_relay_messages_sent", + "rtmp_relay_messages_dropped", + "rtmp_relay_bytes_sent", + "rtmp_uptime_seconds", + "rtmp_server_info", + } + for _, key := range expectedKeys { + if _, ok := vars[key]; !ok { + t.Errorf("missing key %q in /debug/vars output", key) + } + } + + // Verify uptime is >= 0 + var uptime int64 + if err := json.Unmarshal(vars["rtmp_uptime_seconds"], &uptime); err != nil { + t.Fatalf("parse uptime: %v", err) + } + if uptime < 0 { + t.Errorf("uptime should be >= 0, got %d", uptime) + } + + // Verify server_info contains go_version + var serverInfo map[string]string + if err := json.Unmarshal(vars["rtmp_server_info"], &serverInfo); err != nil { + t.Fatalf("parse server_info: %v", err) + } + if _, ok := serverInfo["go_version"]; !ok { + t.Error("server_info missing go_version key") + } +} From 8eaa72e1c04ee010a6388f2c3626d08bd6b24955 Mon Sep 17 00:00:00 2001 From: alxayo Date: Wed, 4 Mar 2026 12:22:56 +0200 Subject: [PATCH 11/24] refactor(auth): remove unused ErrForbidden sentinel error and fix cross-package comment --- internal/rtmp/server/auth/auth.go | 1 - internal/rtmp/server/auth/auth_test.go | 1 - internal/rtmp/server/publish_handler.go | 3 ++- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/internal/rtmp/server/auth/auth.go b/internal/rtmp/server/auth/auth.go index cb324bc..feecfcd 100644 --- a/internal/rtmp/server/auth/auth.go +++ b/internal/rtmp/server/auth/auth.go @@ -60,5 +60,4 @@ type Request struct { var ( ErrUnauthorized = errors.New("authentication failed: invalid credentials") ErrTokenMissing = errors.New("authentication failed: token missing") - ErrForbidden = errors.New("authentication failed: access denied") ) diff --git a/internal/rtmp/server/auth/auth_test.go b/internal/rtmp/server/auth/auth_test.go index d60bc88..7d2c315 100644 --- a/internal/rtmp/server/auth/auth_test.go +++ b/internal/rtmp/server/auth/auth_test.go @@ -16,7 +16,6 @@ func TestSentinelErrors(t *testing.T) { }{ {"ErrUnauthorized", ErrUnauthorized}, {"ErrTokenMissing", ErrTokenMissing}, - {"ErrForbidden", ErrForbidden}, } for _, s := range sentinels { t.Run(s.name, func(t *testing.T) { diff --git a/internal/rtmp/server/publish_handler.go b/internal/rtmp/server/publish_handler.go index eec00b2..e172093 100644 --- a/internal/rtmp/server/publish_handler.go +++ b/internal/rtmp/server/publish_handler.go @@ -19,7 +19,8 @@ import ( ) // sender is the minimal interface required from a connection for this task. -// *conn.Connection satisfies it. We keep it tiny so tests can use a stub. +// The connection type from internal/rtmp/conn satisfies it. We keep it tiny +// so tests can use a stub. type sender interface { SendMessage(*chunk.Message) error } From 6f6c08eeadd3b550a5c4e5127d7e3814f28a44b3 Mon Sep 17 00:00:00 2001 From: alxayo Date: Wed, 4 Mar 2026 12:23:02 +0200 Subject: [PATCH 12/24] docs: add metrics package to architecture trees, CLI flags, and design decisions --- .github/copilot-instructions.md | 1 + README.md | 5 ++++- docs/architecture.md | 1 + docs/design.md | 11 +++++++++++ docs/getting-started.md | 10 ++++++++++ docs/implementation.md | 1 + 6 files changed, 28 insertions(+), 1 deletion(-) diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index f0ebf4f..8411cbd 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -16,6 +16,7 @@ internal/rtmp/ │ ├── auth/ # Token-based authentication (Validator interface + backends) │ └── hooks/ # Event hooks (webhooks, shell scripts, stdio) ├── relay/ # Multi-destination relay with late-join support +├── metrics/ # Expvar counters (connections, publishers, subscribers, media) └── media/ # Audio/video message handling + FLV recording ``` diff --git a/README.md b/README.md index 284c6ad..343afcd 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,7 @@ See [docs/getting-started.md](docs/getting-started.md) for the full guide with C | **Media Logging** | Per-connection codec detection and bitrate stats | | **Event Hooks** | Webhooks, shell scripts, and stdio notifications on RTMP events | | **Authentication** | Pluggable token-based validation for publish/play (static tokens, file, webhook) | +| **Metrics** | Expvar counters for connections, publishers, subscribers, media (HTTP `/debug/vars`) | | **Connection Cleanup** | TCP deadline enforcement (read 90s, write 30s), disconnect handlers, zombie detection | ## Architecture @@ -64,6 +65,7 @@ internal/rtmp/ │ └── hooks/ Event hook system (webhooks, shell, stdio) ├── media/ Audio/video parsing, codec detection, FLV recording ├── relay/ Multi-destination forwarding +├── metrics/ Expvar counters for live monitoring └── client/ Minimal test client ``` @@ -116,6 +118,7 @@ Integration tests in `tests/integration/` exercise the full publish → subscrib -hook-stdio-format Stdio hook output: json | env (default disabled) -hook-timeout Hook execution timeout (default 30s) -hook-concurrency Max concurrent hook executions (default 10) +-metrics-addr HTTP address for metrics endpoint (e.g. :8080). Empty = disabled -version Print version and exit ``` @@ -130,13 +133,13 @@ Integration tests in `tests/integration/` exercise the full publish → subscrib ### Recently Completed - Enhanced error handling: disconnect handlers, TCP deadline enforcement (read 90s, write 30s), relay client cleanup - Performance benchmarks for chunk parsing, AMF0 encoding, and array operations +- Expvar metrics: live counters for connections, publishers, subscribers, media bytes (HTTP `/debug/vars`) ### In Progress - Fuzz testing for AMF0 and chunk parsing (bounds safety) ### Planned - **RTMPS** — TLS/SSL encrypted connections -- **Expvar metrics** — live counters for connections, publishers, subscribers - **Configurable backpressure** — drop or disconnect policy for slow subscribers - **DVR / time-shift** — seek into live stream history - **Transcoding** — server-side codec conversion (e.g. H.265 → H.264) diff --git a/docs/architecture.md b/docs/architecture.md index eb32d2b..1ee1882 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -126,6 +126,7 @@ Each incoming media message is routed through three paths: | `internal/rtmp/server/hooks` | Event notification (webhooks, shell, stdio) | `HookManager`, `Event`, `Hook` | | `internal/rtmp/media` | Audio/video parsing, codec detection, FLV recording | `Recorder`, `CodecDetector`, `Stream` | | `internal/rtmp/relay` | Multi-destination relay to external servers | `DestinationManager`, `Destination` | +| `internal/rtmp/metrics` | Expvar counters for live monitoring | `ConnectionsActive`, `ConnectionsTotal`, `BytesIngested` | | `internal/rtmp/client` | Minimal RTMP client for testing | `Client` | | `internal/bufpool` | Memory pool for chunk buffers | `Pool` | | `internal/errors` | Domain-specific error types | `ProtocolError`, `ChunkError`, `AMFError` | diff --git a/docs/design.md b/docs/design.md index f8d4f3b..2af14b1 100644 --- a/docs/design.md +++ b/docs/design.md @@ -83,6 +83,17 @@ Tokens are passed by clients via URL query parameters in the stream name field ( The default mode is `none` (accept all requests), preserving backward compatibility. +### Expvar Metrics + +The server uses Go's `expvar` package for live monitoring. Expvar was chosen because: + +- **Zero dependencies**: part of the standard library +- **Thread-safe**: `expvar.Int` uses atomic int64 internally +- **HTTP-ready**: registers a handler on `DefaultServeMux` at `/debug/vars` +- **JSON output**: human- and machine-readable + +Metrics are organized as gauges (go up and down: active connections, publishers, subscribers, streams) and counters (monotonically increasing: total connections, media messages, bytes ingested, relay stats). The HTTP endpoint is opt-in via `-metrics-addr` so it has zero overhead when disabled. + ## Concurrency Model | Resource | Protection | Why | diff --git a/docs/getting-started.md b/docs/getting-started.md index 83e1125..63c998d 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -65,6 +65,15 @@ The server will forward all incoming media to the specified destinations. Available event types: `connection_accept`, `connection_close`, `publish_start`, `play_start`, `codec_detected`, `auth_failed`. +### With Metrics + +```bash +# Enable metrics endpoint on port 8080 +./rtmp-server -listen :1935 -metrics-addr :8080 +``` + +Query metrics at `http://localhost:8080/debug/vars` — returns JSON with all RTMP counters (connections, publishers, subscribers, media bytes, relay stats, uptime). + ### With Authentication ```bash @@ -112,6 +121,7 @@ OBS Studio: set **Server** to `rtmp://localhost:1935/live` and **Stream Key** to | `-hook-stdio-format` | (disabled) | Stdio output format: `json` or `env` | | `-hook-timeout` | `30s` | Hook execution timeout | | `-hook-concurrency` | `10` | Max concurrent hook executions | +| `-metrics-addr` | (disabled) | HTTP address for metrics endpoint (e.g. `:8080`). Empty = disabled | | `-version` | | Print version and exit | ## Test with FFmpeg diff --git a/docs/implementation.md b/docs/implementation.md index f718acb..fa2fdba 100644 --- a/docs/implementation.md +++ b/docs/implementation.md @@ -21,6 +21,7 @@ internal/ │ └── hooks/ Event hooks (webhooks, shell scripts, stdio output) ├── media/ Audio/video parsing, codec detection, FLV recording ├── relay/ Multi-destination forwarding to external RTMP servers + ├── metrics/ Expvar counters for connections, publishers, subscribers └── client/ Minimal RTMP client for testing ``` From 215aa96ed10b7e4eeb74af195bd97eeb99e4c18f Mon Sep 17 00:00:00 2001 From: alxayo Date: Wed, 4 Mar 2026 13:05:10 +0200 Subject: [PATCH 13/24] perf(chunk): reuse scratch buffer in Writer to reduce per-chunk allocations writeChunk() previously allocated a new buffer on every call via make([]byte, 0, len(header)+len(payload)). For 30fps video with 4096-byte chunks this created ~1000 allocations/sec. Move writeChunk to a method on Writer and reuse a scratch buffer that grows as needed but is never shrunk, eliminating steady-state allocations. --- internal/rtmp/chunk/writer.go | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/internal/rtmp/chunk/writer.go b/internal/rtmp/chunk/writer.go index ccb0fa4..e363bf3 100644 --- a/internal/rtmp/chunk/writer.go +++ b/internal/rtmp/chunk/writer.go @@ -145,6 +145,7 @@ type Writer struct { w io.Writer chunkSize uint32 // outbound chunk size (default 128 if zero) lastHeaders map[uint32]*ChunkHeader // per-CSID state for FMT compression + scratch []byte // reusable buffer for writeChunk to avoid per-call allocations } // NewWriter creates a new chunk Writer. @@ -247,7 +248,7 @@ func (w *Writer) WriteMessage(msg *Message) error { if uint32(len(toSend)) > cs { toSend = toSend[:cs] } - if err := writeChunk(w.w, hdr, toSend); err != nil { + if err := w.writeChunk(hdr, toSend); err != nil { return err } written := uint32(len(toSend)) @@ -282,7 +283,7 @@ func (w *Writer) WriteMessage(msg *Message) error { if end > uint32(len(msg.Payload)) { return fmt.Errorf("writer: bounds (end=%d > len=%d)", end, len(msg.Payload)) } - if err := writeChunk(w.w, hdr3, msg.Payload[start:end]); err != nil { + if err := w.writeChunk(hdr3, msg.Payload[start:end]); err != nil { return err } written = end @@ -290,13 +291,19 @@ func (w *Writer) WriteMessage(msg *Message) error { return nil } -// writeChunk concatenates the header and payload into a single buffer and writes -// it in one call. This ensures the chunk is sent atomically — the header and -// payload bytes won't be split across separate TCP packets. -func writeChunk(w io.Writer, header []byte, payload []byte) error { - buf := make([]byte, 0, len(header)+len(payload)) - buf = append(buf, header...) - buf = append(buf, payload...) - _, err := w.Write(buf) +// writeChunk concatenates the header and payload into the Writer's scratch +// buffer and writes it in one call. This ensures the chunk is sent atomically +// (header and payload bytes won't be split across separate TCP packets) while +// reusing the buffer across calls to avoid per-chunk allocations. +func (w *Writer) writeChunk(header []byte, payload []byte) error { + need := len(header) + len(payload) + if cap(w.scratch) < need { + w.scratch = make([]byte, need) + } else { + w.scratch = w.scratch[:need] + } + copy(w.scratch, header) + copy(w.scratch[len(header):], payload) + _, err := w.w.Write(w.scratch) return err } From e4b37aa48c17cf765554e77c78f0f5498cb1c276 Mon Sep 17 00:00:00 2001 From: alxayo Date: Wed, 4 Mar 2026 13:05:28 +0200 Subject: [PATCH 14/24] refactor: remove unused bufpool package The bufpool package (sync.Pool-backed byte slice recycler) was implemented but never imported by any production or test code. Removing dead code to reduce maintenance surface. --- internal/bufpool/pool.go | 93 ------------------- internal/bufpool/pool_test.go | 170 ---------------------------------- 2 files changed, 263 deletions(-) delete mode 100644 internal/bufpool/pool.go delete mode 100644 internal/bufpool/pool_test.go diff --git a/internal/bufpool/pool.go b/internal/bufpool/pool.go deleted file mode 100644 index b7f9ceb..0000000 --- a/internal/bufpool/pool.go +++ /dev/null @@ -1,93 +0,0 @@ -package bufpool - -import "sync" - -// sizeClasses defines the predefined buffer sizes available in the pool. -// These are tuned for RTMP workloads: -// - 128 bytes: default RTMP chunk size (spec default) -// - 4096 bytes: recommended chunk size after server negotiation -// - 65536 bytes: maximum chunk size allowed by the protocol -var sizeClasses = []int{128, 4096, 65536} - -// classPool pairs a size class with its Go sync.Pool. -// sync.Pool is a built-in Go structure that caches allocated objects -// for reuse, reducing garbage collector pressure. -type classPool struct { - size int - pool *sync.Pool -} - -// Pool provides sized byte slices backed by reusable buffers to reduce GC churn. -type Pool struct { - pools []classPool -} - -// defaultPool is the package-level pool used by the convenience Get/Put functions. -// Most callers should use the top-level Get() and Put() instead of creating their own pool. -var defaultPool = New() - -// Get acquires a buffer from the package-level default pool. -func Get(size int) []byte { - return defaultPool.Get(size) -} - -// Put releases a buffer back to the package-level default pool. -func Put(buf []byte) { - defaultPool.Put(buf) -} - -// New creates a buffer pool with predefined size classes tailored for RTMP chunking workloads. -func New() *Pool { - pools := make([]classPool, len(sizeClasses)) - for i, classSize := range sizeClasses { - size := classSize - pools[i] = classPool{ - size: size, - pool: &sync.Pool{ - New: func() any { - return make([]byte, size) - }, - }, - } - } - return &Pool{pools: pools} -} - -// Get returns a byte slice whose length matches the requested size and whose capacity is the -// nearest predefined size class that can accommodate the request. Requests larger than the -// maximum size class allocate a fresh slice without pooling. -func (p *Pool) Get(size int) []byte { - if p == nil || size <= 0 { - return nil - } - - for i := range p.pools { - class := &p.pools[i] - if size <= class.size { - buf := class.pool.Get().([]byte) - return buf[:size] - } - } - - return make([]byte, size) -} - -// Put returns the provided buffer to the pool if its capacity matches a predefined size class. -// Buffers that do not match any class are discarded. The buffer is zeroed before reuse to avoid -// leaking data across callers. -func (p *Pool) Put(buf []byte) { - if p == nil || buf == nil { - return - } - - capBuf := cap(buf) - for i := range p.pools { - class := &p.pools[i] - if capBuf == class.size { - full := buf[:class.size] - clear(full) - class.pool.Put(full) - return - } - } -} diff --git a/internal/bufpool/pool_test.go b/internal/bufpool/pool_test.go deleted file mode 100644 index 4ac1b2d..0000000 --- a/internal/bufpool/pool_test.go +++ /dev/null @@ -1,170 +0,0 @@ -// Package bufpool tests verify the byte-slice pool used to reduce garbage-collection -// pressure when allocating temporary buffers for RTMP chunk reading/writing. -// -// Key concepts for beginners: -// - A "pool" recycles byte slices so the Go garbage collector doesn't have to -// allocate and free them on every RTMP message. -// - Buffers are bucketed into size classes (128, 4096, 65536, …). A request for -// 64 bytes returns a buffer whose *capacity* is rounded up to the next bucket (128). -// - sync.Pool (used internally) is safe for concurrent access. -package bufpool - -import ( - "fmt" - "sync" - "testing" -) - -// TestPoolGetReturnsSizedBuffer uses a table-driven pattern to verify that -// Pool.Get returns a byte slice whose length matches the requested size and -// whose capacity is rounded up to the nearest size-class bucket. -// -// Table-driven tests are a Go convention: each row in the "tests" slice -// describes one scenario (input + expected output). The loop runs each row -// as a named sub-test via t.Run so failures show exactly which case broke. -func TestPoolGetReturnsSizedBuffer(t *testing.T) { - // t.Parallel() allows this test to run concurrently with other tests, - // speeding up the overall test suite. - t.Parallel() - - p := New() - - // Each entry defines the requested size and the expected capacity after - // rounding up to the pool's size-class bucket. - tests := []struct { - name string - requestSize int - expectCap int - }{ - {name: "small", requestSize: 64, expectCap: 128}, - {name: "exact small", requestSize: 128, expectCap: 128}, - {name: "medium", requestSize: 1024, expectCap: 4096}, - {name: "large", requestSize: 5000, expectCap: 65536}, - {name: "oversized", requestSize: 131072, expectCap: 131072}, - {name: "zero", requestSize: 0, expectCap: 0}, - } - - for _, tc := range tests { - tc := tc // capture loop variable for parallel sub-tests (required in Go < 1.22) - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - buf := p.Get(tc.requestSize) - - // Zero-size is a special case: the pool returns an empty (nil) slice. - if tc.requestSize == 0 { - if len(buf) != 0 || cap(buf) != 0 { - t.Fatalf("expected zero-length buffer, got len=%d cap=%d", len(buf), cap(buf)) - } - return - } - - // len must equal the requested size (usable bytes). - if len(buf) != tc.requestSize { - t.Fatalf("expected len=%d, got %d", tc.requestSize, len(buf)) - } - - // cap must match the size-class bucket. - if cap(buf) != tc.expectCap { - t.Fatalf("expected cap=%d, got %d", tc.expectCap, cap(buf)) - } - }) - } -} - -// TestPoolPutReusesBuffer verifies that returning a buffer to the pool and -// requesting the same size again yields the *same* underlying memory (pointer -// equality). It also checks that the buffer is zeroed on return so stale data -// from one RTMP message doesn't leak into another. -func TestPoolPutReusesBuffer(t *testing.T) { - t.Parallel() - - p := New() - - // Get a 200-byte buffer (capacity will be 4096 – the next size class). - buf := p.Get(200) - if len(buf) != 200 { - t.Fatalf("expected len=200, got %d", len(buf)) - } - buf[0] = 42 // write sentinel value to prove zeroing later - - // Save the pointer to the first byte so we can compare it after reuse. - ptr := &buf[:1][0] - p.Put(buf) // return to pool - - // Request the same size again – we expect the pool to hand back the same slice. - reused := p.Get(200) - if len(reused) != 200 { - t.Fatalf("expected len=200, got %d", len(reused)) - } - - if cap(reused) != 4096 { - t.Fatalf("expected cap=4096, got %d", cap(reused)) - } - - // Pointer comparison: both should reference the same backing array. - if &reused[:1][0] != ptr { - t.Fatalf("expected to get the same buffer pointer back from pool") - } - - // Ensure every byte is zero (pool must clear before recycling). - for i, v := range reused { - if v != 0 { - t.Fatalf("expected buffer to be zeroed, found value %d at index %d", v, i) - } - } -} - -// TestPoolConcurrentAccess is a stress/race-detector test. It spawns 5 -// goroutines that each Get/Put 1000 times with different buffer sizes. -// Running with `go test -race` ensures no data races exist in the pool. -// -// Pattern explanation: -// - sync.WaitGroup tracks when all goroutines have finished. -// - A buffered error channel collects per-goroutine failures without -// blocking, so the main goroutine can report them after wg.Wait(). -func TestPoolConcurrentAccess(t *testing.T) { - t.Parallel() - - p := New() - var wg sync.WaitGroup - errCh := make(chan error, 5) // one slot per worker goroutine - - // worker performs 1000 Get→fill→Put cycles for a given buffer size. - worker := func(size int) { - defer wg.Done() - for i := 0; i < 1000; i++ { - buf := p.Get(size) - if len(buf) != size { - errCh <- fmt.Errorf("expected len=%d, got %d", size, len(buf)) - return - } - if cap(buf) < size { - errCh <- fmt.Errorf("expected cap >= %d, got %d", size, cap(buf)) - return - } - // Fill buffer to exercise memory – helps the race detector - // catch unsynchronized access. - for j := range buf { - buf[j] = byte(i) - } - p.Put(buf) - } - } - - // Spawn one goroutine per size class. - sizes := []int{64, 512, 2048, 8192, 40000} - for _, size := range sizes { - size := size // capture for goroutine closure - wg.Add(1) - go worker(size) - } - - wg.Wait() - close(errCh) - - // Report any errors collected from workers. - for err := range errCh { - t.Fatal(err) - } -} From b72b83a8378e7509b6fe7fa5f62efef6930302ad Mon Sep 17 00:00:00 2001 From: alxayo Date: Wed, 4 Mar 2026 13:06:07 +0200 Subject: [PATCH 15/24] refactor(rpc): lazy-init ConnectCommand.Extra map Only allocate the Extra map when there are actually extra fields in the connect object. In the common case (no extra fields), this avoids one map allocation per connect command. --- internal/rtmp/rpc/connect.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/rtmp/rpc/connect.go b/internal/rtmp/rpc/connect.go index ce691ef..dbc121b 100644 --- a/internal/rtmp/rpc/connect.go +++ b/internal/rtmp/rpc/connect.go @@ -91,12 +91,15 @@ func ParseConnectCommand(msg *chunk.Message) (*ConnectCommand, error) { // Capture any extra fields from the connect object (useful for auth tokens, // custom parameters, etc.) that we don't explicitly parse above. - extra := make(map[string]interface{}) + var extra map[string]interface{} for k, v := range obj { switch k { case "app", "flashVer", "tcUrl", "objectEncoding": continue // already extracted default: + if extra == nil { + extra = make(map[string]interface{}) + } extra[k] = v } } From 29e31f83a8576146d0c83d1032fad592b575ff94 Mon Sep 17 00:00:00 2001 From: alxayo Date: Wed, 4 Mar 2026 13:07:05 +0200 Subject: [PATCH 16/24] test(amf): standardize golden file helpers to use t.Helper and t.Fatalf Replace panic(err) with t.Fatalf in number_test.go and string_test.go golden helpers. Add t.Helper() to array_test.go helper. Remove duplicate goldenDir constants (goldenDirBoolean, goldenDirNull, goldenDirString) that shadowed the canonical constant from number_test.go. --- internal/rtmp/amf/array_test.go | 2 +- internal/rtmp/amf/boolean_test.go | 5 +---- internal/rtmp/amf/null_test.go | 4 +--- internal/rtmp/amf/number_test.go | 8 +++----- internal/rtmp/amf/string_test.go | 10 ++++------ 5 files changed, 10 insertions(+), 19 deletions(-) diff --git a/internal/rtmp/amf/array_test.go b/internal/rtmp/amf/array_test.go index c04e7a4..01d1929 100644 --- a/internal/rtmp/amf/array_test.go +++ b/internal/rtmp/amf/array_test.go @@ -19,8 +19,8 @@ import ( ) // readGoldenArray loads a golden binary vector for array tests. -// Reuses the goldenDir constant from number_test.go. func readGoldenArray(t *testing.T, name string) []byte { + t.Helper() p := filepath.Join(goldenDir, name) b, err := os.ReadFile(p) if err != nil { diff --git a/internal/rtmp/amf/boolean_test.go b/internal/rtmp/amf/boolean_test.go index 8cf0115..64a72b1 100644 --- a/internal/rtmp/amf/boolean_test.go +++ b/internal/rtmp/amf/boolean_test.go @@ -12,13 +12,10 @@ import ( "testing" ) -// Reuse goldenDir constant pattern from number_test.go (keep consistency even if duplicated). -const goldenDirBoolean = "../../../tests/golden" - // readGoldenBoolean loads a golden binary vector. func readGoldenBoolean(t *testing.T, name string) []byte { t.Helper() - p := filepath.Join(goldenDirBoolean, name) + p := filepath.Join(goldenDir, name) b, err := os.ReadFile(p) if err != nil { t.Fatalf("read golden %s: %v", name, err) diff --git a/internal/rtmp/amf/null_test.go b/internal/rtmp/amf/null_test.go index 55a3c93..3617aac 100644 --- a/internal/rtmp/amf/null_test.go +++ b/internal/rtmp/amf/null_test.go @@ -11,12 +11,10 @@ import ( "testing" ) -const goldenDirNull = "../../../tests/golden" - // readGoldenNull loads the golden file for null tests. func readGoldenNull(t *testing.T, name string) []byte { t.Helper() - p := filepath.Join(goldenDirNull, name) + p := filepath.Join(goldenDir, name) b, err := os.ReadFile(p) if err != nil { t.Fatalf("read golden %s: %v", name, err) diff --git a/internal/rtmp/amf/number_test.go b/internal/rtmp/amf/number_test.go index 8247dc3..2852223 100644 --- a/internal/rtmp/amf/number_test.go +++ b/internal/rtmp/amf/number_test.go @@ -23,15 +23,13 @@ import ( const goldenDir = "../../../tests/golden" // relative to this test file directory // readGolden loads a golden binary vector from tests/golden/. -// It panics on failure because missing golden files indicate a broken -// test environment, not a test failure. +// Missing golden files indicate a broken test environment. func readGolden(t *testing.T, name string) []byte { - // Using filepath.Join for Windows compatibility. + t.Helper() p := filepath.Join(goldenDir, name) b, err := os.ReadFile(p) if err != nil { - // Provide context but fail fast; golden vectors are required. - panic(err) + t.Fatalf("read golden %s: %v", name, err) } return b } diff --git a/internal/rtmp/amf/string_test.go b/internal/rtmp/amf/string_test.go index 612ac0c..be0237b 100644 --- a/internal/rtmp/amf/string_test.go +++ b/internal/rtmp/amf/string_test.go @@ -13,15 +13,13 @@ import ( "testing" ) -const goldenDirString = "../../../tests/golden" - -// readGoldenString loads a golden vector; mirrors the helper in number_test.go. +// readGoldenString loads a golden vector; reuses goldenDir from number_test.go. func readGoldenString(t *testing.T, name string) []byte { - // mirror helper pattern used in other tests - p := filepath.Join(goldenDirString, name) + t.Helper() + p := filepath.Join(goldenDir, name) b, err := os.ReadFile(p) if err != nil { - panic(err) + t.Fatalf("read golden %s: %v", name, err) } return b } From d3f722fe6b3833d29c3005eb4ea739876103fc86 Mon Sep 17 00:00:00 2001 From: alxayo Date: Wed, 4 Mar 2026 13:08:12 +0200 Subject: [PATCH 17/24] test(server): consolidate duplicate test doubles into helpers_test.go Move stubConn, capturingConn, stubPublisher, buildPublishMessage, and buildPlayMessage into a shared helpers_test.go file. These types were duplicated across publish_handler_test.go and play_handler_test.go. --- internal/rtmp/server/helpers_test.go | 37 ++++++++++++++++++++ internal/rtmp/server/play_handler_test.go | 20 ++--------- internal/rtmp/server/publish_handler_test.go | 16 +-------- 3 files changed, 40 insertions(+), 33 deletions(-) create mode 100644 internal/rtmp/server/helpers_test.go diff --git a/internal/rtmp/server/helpers_test.go b/internal/rtmp/server/helpers_test.go new file mode 100644 index 0000000..a2ddfd0 --- /dev/null +++ b/internal/rtmp/server/helpers_test.go @@ -0,0 +1,37 @@ +// helpers_test.go – shared test doubles and builders for the server package tests. +// +// These helpers are used across publish_handler_test.go, play_handler_test.go, +// and other server tests to avoid duplicating stub types and message builders. +package server + +import ( + "github.com/alxayo/go-rtmp/internal/rtmp/amf" + "github.com/alxayo/go-rtmp/internal/rtmp/chunk" + "github.com/alxayo/go-rtmp/internal/rtmp/rpc" +) + +// stubConn captures the last message sent via SendMessage for assertions. +type stubConn struct{ last *chunk.Message } + +func (s *stubConn) SendMessage(m *chunk.Message) error { s.last = m; return nil } + +// capturingConn records every message sent via SendMessage. +type capturingConn struct{ sent []*chunk.Message } + +func (c *capturingConn) SendMessage(m *chunk.Message) error { c.sent = append(c.sent, m); return nil } + +// stubPublisher is a minimal placeholder to mark a stream as published. +type stubPublisher struct{} + +// buildPublishMessage builds a minimal AMF0 "publish" command message +// for the given stream name. +func buildPublishMessage(streamName string) *chunk.Message { + payload, _ := amf.EncodeAll("publish", float64(0), nil, streamName, "live") + return &chunk.Message{TypeID: rpc.CommandMessageAMF0TypeIDForTest(), Payload: payload, MessageLength: uint32(len(payload)), MessageStreamID: 1} +} + +// buildPlayMessage constructs a minimal AMF0 "play" command message. +func buildPlayMessage(streamName string) *chunk.Message { + payload, _ := amf.EncodeAll("play", float64(0), nil, streamName) + return &chunk.Message{TypeID: rpc.CommandMessageAMF0TypeIDForTest(), Payload: payload, MessageLength: uint32(len(payload)), MessageStreamID: 1} +} diff --git a/internal/rtmp/server/play_handler_test.go b/internal/rtmp/server/play_handler_test.go index 3e098be..58cac82 100644 --- a/internal/rtmp/server/play_handler_test.go +++ b/internal/rtmp/server/play_handler_test.go @@ -7,32 +7,16 @@ // 4. If not found: sends onStatus Play.StreamNotFound. // // Key Go concepts: -// - capturingConn: records all sent messages in a slice for ordering assertions. -// - stubPublisher: minimal type to mark a stream as having a publisher. +// - capturingConn (helpers_test.go): records all sent messages. +// - stubPublisher (helpers_test.go): marks a stream as published. package server import ( "testing" "github.com/alxayo/go-rtmp/internal/rtmp/amf" - "github.com/alxayo/go-rtmp/internal/rtmp/chunk" - "github.com/alxayo/go-rtmp/internal/rtmp/rpc" ) -// capturingConn records every message sent via SendMessage. -type capturingConn struct{ sent []*chunk.Message } - -func (c *capturingConn) SendMessage(m *chunk.Message) error { c.sent = append(c.sent, m); return nil } - -// buildPlayMessage constructs a minimal AMF0 "play" command message. -func buildPlayMessage(streamName string) *chunk.Message { - payload, _ := amf.EncodeAll("play", float64(0), nil, streamName) - return &chunk.Message{TypeID: rpc.CommandMessageAMF0TypeIDForTest(), Payload: payload, MessageLength: uint32(len(payload)), MessageStreamID: 1} -} - -// stubPublisher is a minimal placeholder to mark a stream as published. -type stubPublisher struct{} - // TestHandlePlaySuccess creates a stream with a publisher, then plays it. // Expects 2 messages sent (StreamBegin + onStatus Play.Start) and 1 subscriber. func TestHandlePlaySuccess(t *testing.T) { diff --git a/internal/rtmp/server/publish_handler_test.go b/internal/rtmp/server/publish_handler_test.go index 6386b06..08ddaf9 100644 --- a/internal/rtmp/server/publish_handler_test.go +++ b/internal/rtmp/server/publish_handler_test.go @@ -7,7 +7,7 @@ // 4. Sends an "onStatus" NetStream.Publish.Start response. // // Key Go concepts: -// - stubConn: captures the last message sent, simulating a real connection. +// - stubConn (helpers_test.go): captures the last message sent. // - AMF decode of the onStatus payload to verify the response code. package server @@ -15,22 +15,8 @@ import ( "testing" "github.com/alxayo/go-rtmp/internal/rtmp/amf" - "github.com/alxayo/go-rtmp/internal/rtmp/chunk" - "github.com/alxayo/go-rtmp/internal/rtmp/rpc" ) -// stubConn captures the last message sent via SendMessage for assertions. -type stubConn struct{ last *chunk.Message } - -func (s *stubConn) SendMessage(m *chunk.Message) error { s.last = m; return nil } - -// buildPublishMessage builds a minimal AMF0 "publish" command message -// for the given stream name. -func buildPublishMessage(streamName string) *chunk.Message { - payload, _ := amf.EncodeAll("publish", float64(0), nil, streamName, "live") - return &chunk.Message{TypeID: rpc.CommandMessageAMF0TypeIDForTest(), Payload: payload, MessageLength: uint32(len(payload)), MessageStreamID: 1} -} - // TestHandlePublishSuccess publishes a stream and verifies: // the stream is registered, the publisher is set, and the onStatus // response contains NetStream.Publish.Start. From e791988319d78925053752752f1a13efc0d31e5b Mon Sep 17 00:00:00 2001 From: alxayo Date: Wed, 4 Mar 2026 13:08:58 +0200 Subject: [PATCH 18/24] test(amf): add t.Run for named subtests in edge case and marker tests TestNumber_EdgeCases_RoundTrip and TestDecodeValue_UnsupportedMarkers used bare loops without t.Run, producing unnamed subtest output on failure. Wrap each iteration in t.Run with descriptive names. --- internal/rtmp/amf/amf_test.go | 10 ++++++---- internal/rtmp/amf/number_test.go | 25 ++++++++++++++----------- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/internal/rtmp/amf/amf_test.go b/internal/rtmp/amf/amf_test.go index 4297264..7208655 100644 --- a/internal/rtmp/amf/amf_test.go +++ b/internal/rtmp/amf/amf_test.go @@ -104,10 +104,12 @@ func TestDecodeValue_UnsupportedMarkers(t *testing.T) { // Markers explicitly rejected: 0x06 (Undefined), 0x07 (Reference), 0x0B (Date), 0x11 (AMF3 switch) markers := []byte{0x06, 0x07, 0x0B, 0x11} for _, m := range markers { - _, err := DecodeValue(bytes.NewReader([]byte{m})) - if err == nil { - t.Fatalf("marker 0x%02x expected error", m) - } + t.Run(fmt.Sprintf("marker_0x%02x", m), func(t *testing.T) { + _, err := DecodeValue(bytes.NewReader([]byte{m})) + if err == nil { + t.Fatalf("marker 0x%02x expected error", m) + } + }) } } diff --git a/internal/rtmp/amf/number_test.go b/internal/rtmp/amf/number_test.go index 2852223..a240573 100644 --- a/internal/rtmp/amf/number_test.go +++ b/internal/rtmp/amf/number_test.go @@ -14,6 +14,7 @@ package amf import ( "bytes" + "fmt" "math" "os" "path/filepath" @@ -92,17 +93,19 @@ func TestDecodeNumber_Golden_1_5(t *testing.T) { func TestNumber_EdgeCases_RoundTrip(t *testing.T) { cases := []float64{1.0, -1.0, math.Inf(1), math.Inf(-1)} for _, in := range cases { - var buf bytes.Buffer - if err := EncodeNumber(&buf, in); err != nil { - t.Fatalf("encode %v: %v", in, err) - } - out, err := DecodeNumber(&buf) - if err != nil { - t.Fatalf("decode %v: %v", in, err) - } - if in != out { - t.Fatalf("mismatch: in=%v out=%v", in, out) - } + t.Run(fmt.Sprintf("%v", in), func(t *testing.T) { + var buf bytes.Buffer + if err := EncodeNumber(&buf, in); err != nil { + t.Fatalf("encode %v: %v", in, err) + } + out, err := DecodeNumber(&buf) + if err != nil { + t.Fatalf("decode %v: %v", in, err) + } + if in != out { + t.Fatalf("mismatch: in=%v out=%v", in, out) + } + }) } } From 77671616f46dbb5cbc40c07c2065bf444dd3e6d3 Mon Sep 17 00:00:00 2001 From: alxayo Date: Wed, 4 Mar 2026 13:10:29 +0200 Subject: [PATCH 19/24] test: add boundary and nil-argument edge case tests Add TestWriter_WriteMessage_BoundaryChunkSizes covering payloads at chunkSize-1, chunkSize, and chunkSize+1 bytes for both 128 and 4096 byte chunk sizes. Round-trips through Reader to verify correctness. Add TestHandlePublishNilArgs verifying that HandlePublish returns errors (not panics) for nil registry, nil connection, and nil message. --- internal/rtmp/chunk/writer_test.go | 39 ++++++++++++++++++++ internal/rtmp/server/publish_handler_test.go | 18 +++++++++ 2 files changed, 57 insertions(+) diff --git a/internal/rtmp/chunk/writer_test.go b/internal/rtmp/chunk/writer_test.go index e053af4..615c414 100644 --- a/internal/rtmp/chunk/writer_test.go +++ b/internal/rtmp/chunk/writer_test.go @@ -379,6 +379,45 @@ func TestWriter_ChunkReaderRoundTrip(t *testing.T) { } } +// TestWriter_WriteMessage_BoundaryChunkSizes verifies correct chunking at +// the exact chunk size boundary: payloads of chunkSize-1, chunkSize, and +// chunkSize+1 bytes. Off-by-one errors in the fragmentation loop would +// show up here. +func TestWriter_WriteMessage_BoundaryChunkSizes(t *testing.T) { + for _, cs := range []uint32{128, 4096} { + for _, delta := range []int{-1, 0, 1} { + size := int(cs) + delta + if size <= 0 { + continue + } + name := fmt.Sprintf("cs%d_payload%d", cs, size) + t.Run(name, func(t *testing.T) { + var sw simpleWriter + w := NewWriter(&sw, cs) + payload := bytes.Repeat([]byte{0xDD}, size) + msg := &Message{ + CSID: 6, Timestamp: 1000, TypeID: 9, + MessageStreamID: 1, + MessageLength: uint32(size), + Payload: payload, + } + if err := w.WriteMessage(msg); err != nil { + t.Fatalf("write: %v", err) + } + // Round-trip through Reader to verify correctness + r := NewReader(bytes.NewReader(sw.Bytes()), cs) + out, err := r.ReadMessage() + if err != nil { + t.Fatalf("read: %v", err) + } + if !bytes.Equal(out.Payload, payload) { + t.Fatalf("round-trip payload mismatch (len want=%d got=%d)", len(payload), len(out.Payload)) + } + }) + } + } +} + // --- Benchmarks --- // BenchmarkEncodeChunkHeader_FMT0 benchmarks header serialization for a full FMT0 header. diff --git a/internal/rtmp/server/publish_handler_test.go b/internal/rtmp/server/publish_handler_test.go index 08ddaf9..5452b9d 100644 --- a/internal/rtmp/server/publish_handler_test.go +++ b/internal/rtmp/server/publish_handler_test.go @@ -109,3 +109,21 @@ func TestHandlePublishWithQueryParams(t *testing.T) { t.Fatalf("stream should NOT be registered with query params in key") } } + +// TestHandlePublishNilArgs verifies that HandlePublish returns an error +// when called with nil arguments rather than panicking. +func TestHandlePublishNilArgs(t *testing.T) { + reg := NewRegistry() + sc := &stubConn{} + msg := buildPublishMessage("test") + + if _, err := HandlePublish(nil, sc, "app", msg); err == nil { + t.Fatal("expected error for nil registry") + } + if _, err := HandlePublish(reg, nil, "app", msg); err == nil { + t.Fatal("expected error for nil conn") + } + if _, err := HandlePublish(reg, sc, "app", nil); err == nil { + t.Fatal("expected error for nil message") + } +} From a2367fa4933f444f494309936d59232f743f54c3 Mon Sep 17 00:00:00 2001 From: alxayo Date: Wed, 4 Mar 2026 13:33:14 +0200 Subject: [PATCH 20/24] perf(amf): eliminate MultiReader allocations in nested value decoding decodeValueWithMarker previously reconstructed an io.MultiReader for every nested AMF value (3 heap allocations per value: []byte, Reader, MultiReader). For deeply nested objects this produced significant GC pressure. Rewrite decodeValueWithMarker to decode each type's payload directly after the marker has been consumed. Extract decodeObjectPayload, decodeStrictArrayPayload, and decodeStringPayload internal helpers. DecodeObject and DecodeStrictArray now delegate to these helpers after reading their marker byte. --- internal/rtmp/amf/array.go | 22 +------- internal/rtmp/amf/object.go | 103 ++++++++++++++++++++++++++---------- 2 files changed, 76 insertions(+), 49 deletions(-) diff --git a/internal/rtmp/amf/array.go b/internal/rtmp/amf/array.go index a23ce0f..bbed297 100644 --- a/internal/rtmp/amf/array.go +++ b/internal/rtmp/amf/array.go @@ -39,7 +39,7 @@ func EncodeStrictArray(w io.Writer, arr []interface{}) error { // Error cases include: // - Marker mismatch (decode.array.marker) // - Short reads for header or elements (decode.array.header.read / decode.array.element.read) -// - Unsupported nested type markers (bubbled from decodeValueWithMarker) +// - Unsupported nested type markers (bubbled from DecodeValue) func DecodeStrictArray(r io.Reader) ([]interface{}, error) { var marker [1]byte if _, err := io.ReadFull(r, marker[:]); err != nil { @@ -48,25 +48,7 @@ func DecodeStrictArray(r io.Reader) ([]interface{}, error) { if marker[0] != markerStrictArray { return nil, amferrors.NewAMFError("decode.array.marker", fmt.Errorf("expected 0x%02x got 0x%02x", markerStrictArray, marker[0])) } - var countBuf [4]byte - if _, err := io.ReadFull(r, countBuf[:]); err != nil { - return nil, amferrors.NewAMFError("decode.array.count.read", err) - } - count := binary.BigEndian.Uint32(countBuf[:]) - out := make([]interface{}, 0, count) - for i := uint32(0); i < count; i++ { - // Read marker for element then dispatch. - var elemMarker [1]byte - if _, err := io.ReadFull(r, elemMarker[:]); err != nil { - return nil, amferrors.NewAMFError("decode.array.element.marker.read", err) - } - val, err := decodeValueWithMarker(elemMarker[0], r) - if err != nil { - return nil, amferrors.NewAMFError("decode.array.element", fmt.Errorf("index %d: %w", i, err)) - } - out = append(out, val) - } - return out, nil + return decodeStrictArrayPayload(r) } // roundTripStrictArray is a helper for tests: encode then decode an array for round-trip verification. diff --git a/internal/rtmp/amf/object.go b/internal/rtmp/amf/object.go index 7b455ea..e4b1bad 100644 --- a/internal/rtmp/amf/object.go +++ b/internal/rtmp/amf/object.go @@ -1,10 +1,10 @@ package amf import ( - "bytes" "encoding/binary" "fmt" "io" + "math" "sort" amferrors "github.com/alxayo/go-rtmp/internal/errors" @@ -108,6 +108,61 @@ func DecodeObject(r io.Reader) (map[string]interface{}, error) { if mMarker[0] != markerObject { return nil, amferrors.NewAMFError("decode.object.marker", fmt.Errorf("expected 0x%02x got 0x%02x", markerObject, mMarker[0])) } + return decodeObjectPayload(r) +} + +// decodeValueWithMarker dispatches based on an already-consumed marker byte. +// It reads the remaining payload from r without re-reading the marker, avoiding +// the allocation overhead of io.MultiReader. +func decodeValueWithMarker(marker byte, r io.Reader) (interface{}, error) { + switch marker { + case markerNumber: + var num [8]byte + if _, err := io.ReadFull(r, num[:]); err != nil { + return nil, amferrors.NewAMFError("decode.number.read", err) + } + u := binary.BigEndian.Uint64(num[:]) + return math.Float64frombits(u), nil + case markerBoolean: + var b [1]byte + if _, err := io.ReadFull(r, b[:]); err != nil { + return nil, amferrors.NewAMFError("decode.boolean.read", err) + } + return b[0] != 0x00, nil + case markerString: + return decodeStringPayload(r) + case markerNull: + return nil, nil // null has no payload beyond the marker + case markerObject: + return decodeObjectPayload(r) + case markerStrictArray: + return decodeStrictArrayPayload(r) + default: + return nil, fmt.Errorf("unsupported marker 0x%02x", marker) + } +} + +// decodeStringPayload reads an AMF0 string payload (length + bytes) after the +// marker has already been consumed. +func decodeStringPayload(r io.Reader) (string, error) { + var ln [2]byte + if _, err := io.ReadFull(r, ln[:]); err != nil { + return "", amferrors.NewAMFError("decode.string.length.read", err) + } + l := binary.BigEndian.Uint16(ln[:]) + if l == 0 { + return "", nil + } + buf := make([]byte, l) + if _, err := io.ReadFull(r, buf); err != nil { + return "", amferrors.NewAMFError("decode.string.read", err) + } + return string(buf), nil +} + +// decodeObjectPayload reads an AMF0 object payload (key-value pairs + end marker) +// after the object marker has already been consumed. +func decodeObjectPayload(r io.Reader) (map[string]interface{}, error) { out := make(map[string]interface{}) for { var klenBuf [2]byte @@ -131,13 +186,8 @@ func DecodeObject(r io.Reader) (map[string]interface{}, error) { } key := string(keyBytes) - // Peek marker for value to dispatch. We read one byte, then re-create a reader with it prefixed. - var valMarker [1]byte - if _, err := io.ReadFull(r, valMarker[:]); err != nil { - return nil, amferrors.NewAMFError("decode.object.value.marker.read", err) - } - - val, err := decodeValueWithMarker(valMarker[0], r) + // Decode the value (reads marker internally). + val, err := DecodeValue(r) if err != nil { return nil, amferrors.NewAMFError("decode.object.value", fmt.Errorf("key '%s': %w", key, err)) } @@ -146,26 +196,21 @@ func DecodeObject(r io.Reader) (map[string]interface{}, error) { return out, nil } -// decodeValueWithMarker dispatches based on an already-consumed marker byte. It consumes the -// remaining payload from r appropriate to the marker. -func decodeValueWithMarker(marker byte, r io.Reader) (interface{}, error) { - switch marker { - case markerNumber: - // Reconstruct a reader including the marker to reuse existing decoder. - return DecodeNumber(io.MultiReader(bytes.NewReader([]byte{marker}), r)) - case markerBoolean: - return DecodeBoolean(io.MultiReader(bytes.NewReader([]byte{marker}), r)) - case markerString: - return DecodeString(io.MultiReader(bytes.NewReader([]byte{marker}), r)) - case markerNull: - v, err := DecodeNull(io.MultiReader(bytes.NewReader([]byte{marker}), r)) - return v, err - case markerObject: - // Nested object: reuse DecodeObject by reconstructing the marker. - return DecodeObject(io.MultiReader(bytes.NewReader([]byte{marker}), r)) - case markerStrictArray: - return DecodeStrictArray(io.MultiReader(bytes.NewReader([]byte{marker}), r)) - default: - return nil, fmt.Errorf("unsupported marker 0x%02x", marker) +// decodeStrictArrayPayload reads an AMF0 strict array payload (count + elements) +// after the array marker has already been consumed. +func decodeStrictArrayPayload(r io.Reader) ([]interface{}, error) { + var countBuf [4]byte + if _, err := io.ReadFull(r, countBuf[:]); err != nil { + return nil, amferrors.NewAMFError("decode.array.count.read", err) } + count := binary.BigEndian.Uint32(countBuf[:]) + out := make([]interface{}, 0, count) + for i := uint32(0); i < count; i++ { + val, err := DecodeValue(r) + if err != nil { + return nil, amferrors.NewAMFError("decode.array.element", fmt.Errorf("index %d: %w", i, err)) + } + out = append(out, val) + } + return out, nil } From cf898787a1d15641b6b622c76bb1fa9e597f2395 Mon Sep 17 00:00:00 2001 From: alxayo Date: Wed, 4 Mar 2026 13:34:04 +0200 Subject: [PATCH 21/24] test(media): add t.Run subtests and consolidate test helper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add t.Run with named subtests to TestParseAudioMessage_Errors and TestParseVideoMessage_ErrorCases for individually identifiable failures. Remove duplicate _tVidFatalf helper from video_test.go — reuse _tFatalf from audio_test.go since both are in the same package. --- internal/rtmp/media/audio_test.go | 8 ++++--- internal/rtmp/media/video_test.go | 37 +++++++++++++------------------ 2 files changed, 21 insertions(+), 24 deletions(-) diff --git a/internal/rtmp/media/audio_test.go b/internal/rtmp/media/audio_test.go index 3e981ee..9a71330 100644 --- a/internal/rtmp/media/audio_test.go +++ b/internal/rtmp/media/audio_test.go @@ -81,9 +81,11 @@ func TestParseAudioMessage_Errors(t *testing.T) { {"unsupported", []byte{15 << 4, 0x01}}, // 15 not supported } for _, tc := range cases { - if _, err := ParseAudioMessage(tc.in); err == nil { - _tFatalf(t, "expected error for case %s", tc.name) - } + t.Run(tc.name, func(t *testing.T) { + if _, err := ParseAudioMessage(tc.in); err == nil { + _tFatalf(t, "expected error for case %s", tc.name) + } + }) } } diff --git a/internal/rtmp/media/video_test.go b/internal/rtmp/media/video_test.go index 0f14a90..fd52907 100644 --- a/internal/rtmp/media/video_test.go +++ b/internal/rtmp/media/video_test.go @@ -14,13 +14,6 @@ package media import "testing" -// _tVidFatalf is a test helper (like _tFatalf in audio_test.go) that -// marks the caller as failure location. -func _tVidFatalf(t *testing.T, format string, args ...interface{}) { - t.Helper() - t.Fatalf(format, args...) -} - // TestParseVideoMessage_AVCSequenceHeader verifies parsing of an H.264 // sequence header (frameType=1 keyframe, codecID=7 AVC, avcPacketType=0). // The sequence header contains SPS/PPS data used to initialize the decoder. @@ -29,19 +22,19 @@ func TestParseVideoMessage_AVCSequenceHeader(t *testing.T) { data := []byte{(1 << 4) | 7, 0x00, 0x17, 0x34, 0x56} // 0x00 = sequence header; rest pretend SPS/PPS bytes m, err := ParseVideoMessage(data) if err != nil { - _tVidFatalf(t, "unexpected error: %v", err) + _tFatalf(t, "unexpected error: %v", err) } if m.Codec != VideoCodecAVC { - _tVidFatalf(t, "codec mismatch want AVC got %s", m.Codec) + _tFatalf(t, "codec mismatch want AVC got %s", m.Codec) } if m.FrameType != VideoFrameTypeKey { - _tVidFatalf(t, "frameType mismatch want keyframe got %s", m.FrameType) + _tFatalf(t, "frameType mismatch want keyframe got %s", m.FrameType) } if m.PacketType != AVCPacketTypeSequenceHeader { - _tVidFatalf(t, "packetType mismatch want sequence_header got %s", m.PacketType) + _tFatalf(t, "packetType mismatch want sequence_header got %s", m.PacketType) } if len(m.Payload) != 3 || m.Payload[0] != 0x17 { - _tVidFatalf(t, "payload mismatch: %+v", m.Payload) + _tFatalf(t, "payload mismatch: %+v", m.Payload) } } @@ -52,13 +45,13 @@ func TestParseVideoMessage_AVCKeyframeNALU(t *testing.T) { data := []byte{(1 << 4) | 7, 0x01, 0xAA, 0xBB, 0xCC} m, err := ParseVideoMessage(data) if err != nil { - _tVidFatalf(t, "unexpected error: %v", err) + _tFatalf(t, "unexpected error: %v", err) } if m.Codec != VideoCodecAVC || m.PacketType != AVCPacketTypeNALU || m.FrameType != VideoFrameTypeKey { - _tVidFatalf(t, "unexpected metadata: %+v", m) + _tFatalf(t, "unexpected metadata: %+v", m) } if len(m.Payload) != 3 || m.Payload[2] != 0xCC { - _tVidFatalf(t, "payload mismatch: %+v", m.Payload) + _tFatalf(t, "payload mismatch: %+v", m.Payload) } } @@ -69,13 +62,13 @@ func TestParseVideoMessage_AVCInterNALU(t *testing.T) { data := []byte{(2 << 4) | 7, 0x01, 0x01, 0x02} m, err := ParseVideoMessage(data) if err != nil { - _tVidFatalf(t, "unexpected error: %v", err) + _tFatalf(t, "unexpected error: %v", err) } if m.FrameType != VideoFrameTypeInter || m.PacketType != AVCPacketTypeNALU { - _tVidFatalf(t, "unexpected metadata: %+v", m) + _tFatalf(t, "unexpected metadata: %+v", m) } if len(m.Payload) != 2 || m.Payload[0] != 0x01 { - _tVidFatalf(t, "payload mismatch: %+v", m.Payload) + _tFatalf(t, "payload mismatch: %+v", m.Payload) } } @@ -91,8 +84,10 @@ func TestParseVideoMessage_ErrorCases(t *testing.T) { {"unsupportedCodec", []byte{(1 << 4) | 5, 0x00}}, // codec 5 (On2 VP6) -> unsupported } for _, tc := range cases { - if _, err := ParseVideoMessage(tc.in); err == nil { - _tVidFatalf(t, "expected error for case %s", tc.name) - } + t.Run(tc.name, func(t *testing.T) { + if _, err := ParseVideoMessage(tc.in); err == nil { + _tFatalf(t, "expected error for case %s", tc.name) + } + }) } } From b808da253b67d23f1132c3ea9cce4c9c962200c9 Mon Sep 17 00:00:00 2001 From: alxayo Date: Wed, 4 Mar 2026 13:34:38 +0200 Subject: [PATCH 22/24] perf(hooks): pre-allocate hook slice capacity for stdio hook TriggerEvent allocated a hooks slice then used append for the stdio hook, causing a reallocation when stdio was enabled. Pre-allocate with capacity for the extra element to avoid the wasted allocation. --- internal/rtmp/server/hooks/manager.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/internal/rtmp/server/hooks/manager.go b/internal/rtmp/server/hooks/manager.go index bc75ebf..5eb6cab 100644 --- a/internal/rtmp/server/hooks/manager.go +++ b/internal/rtmp/server/hooks/manager.go @@ -97,8 +97,13 @@ func (hm *HookManager) TriggerEvent(ctx context.Context, event Event) { // Get hooks for this event type hm.mu.RLock() - hooks := make([]Hook, len(hm.hooks[event.Type])) - copy(hooks, hm.hooks[event.Type]) + registered := hm.hooks[event.Type] + extra := 0 + if hm.stdioHook != nil { + extra = 1 + } + hooks := make([]Hook, len(registered), len(registered)+extra) + copy(hooks, registered) hm.mu.RUnlock() // Add stdio hook if enabled From cde99b6376eab58f4bd3a42bb16559d8497c6a7d Mon Sep 17 00:00:00 2001 From: alxayo Date: Wed, 4 Mar 2026 13:48:41 +0200 Subject: [PATCH 23/24] docs: fix stale comments and remove bufpool references - amf.go: update DecodeValue comment to reflect that decodeValueWithMarker now decodes payloads directly (no longer uses io.MultiReader) - amf.go: clarify unsupported AMF0 marker comment with specific marker names - docs/implementation.md: remove bufpool from package structure tree - docs/architecture.md: remove bufpool from package map table - docs/testing-guide.md: remove bufpool test command and test matrix entry - CHANGELOG.md: remove bufpool from server infrastructure list --- CHANGELOG.md | 1 - docs/architecture.md | 1 - docs/implementation.md | 1 - docs/testing-guide.md | 2 -- internal/rtmp/amf/amf.go | 7 ++++--- 5 files changed, 4 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e207ad..9dbdffb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -93,7 +93,6 @@ First feature-complete release of the RTMP server. Supports end-to-end streaming - **Connection lifecycle**: Handshake → control burst → command exchange → media streaming - **Stream registry**: Thread-safe map of stream keys to publisher/subscriber lists - **Structured logging**: `log/slog` with configurable levels (debug/info/warn/error) -- **Buffer pool**: Memory-efficient buffer reuse (`internal/bufpool`) - **Domain errors**: Typed error wrappers (`HandshakeError`, `ChunkError`, `AMFError`, `ProtocolError`, `TimeoutError`) #### Testing & Tooling diff --git a/docs/architecture.md b/docs/architecture.md index 1ee1882..28c0831 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -128,7 +128,6 @@ Each incoming media message is routed through three paths: | `internal/rtmp/relay` | Multi-destination relay to external servers | `DestinationManager`, `Destination` | | `internal/rtmp/metrics` | Expvar counters for live monitoring | `ConnectionsActive`, `ConnectionsTotal`, `BytesIngested` | | `internal/rtmp/client` | Minimal RTMP client for testing | `Client` | -| `internal/bufpool` | Memory pool for chunk buffers | `Pool` | | `internal/errors` | Domain-specific error types | `ProtocolError`, `ChunkError`, `AMFError` | | `internal/logger` | Structured logging with dynamic level | `Init()`, `Logger()`, `WithConn()` | diff --git a/docs/implementation.md b/docs/implementation.md index fa2fdba..7d13742 100644 --- a/docs/implementation.md +++ b/docs/implementation.md @@ -6,7 +6,6 @@ A code-level walkthrough of the go-rtmp server. Read [Architecture](architecture ``` internal/ -├── bufpool/ Buffer pool to reduce garbage collection pressure ├── errors/ Typed error wrappers (HandshakeError, ChunkError, etc.) ├── logger/ Structured JSON logging with runtime level changes └── rtmp/ diff --git a/docs/testing-guide.md b/docs/testing-guide.md index fb78219..c6c3d9c 100644 --- a/docs/testing-guide.md +++ b/docs/testing-guide.md @@ -15,7 +15,6 @@ This runs tests across all packages. Most complete in under 15 seconds. Some pac ### Core Utilities ```bash -go test ./internal/bufpool/ # Buffer pool allocation and concurrency go test ./internal/errors/ # Error type classification and unwrapping go test ./internal/logger/ # Log level parsing and field extraction ``` @@ -161,6 +160,5 @@ See [wireshark_rtmp_capture_guide.md](wireshark_rtmp_capture_guide.md) for detai | Play handler | `server/play_handler_test.go` | Subscriber addition, sequence header delivery | | Media logging | `server/media_logger_test.go` | Packet counting, codec detection, stats | | Audio/video parsing | `media/*_test.go` | Codec detection, frame type classification | -| Buffer pool | `bufpool/pool_test.go` | Allocation, reuse, concurrent access safety | | Event hooks | `server/hooks/hooks_test.go` | Hook registration, execution, concurrency pool, cleanup | | Integration | `tests/integration/*_test.go` | Full publish→play flow, multi-subscriber relay | diff --git a/internal/rtmp/amf/amf.go b/internal/rtmp/amf/amf.go index c26f202..37a037d 100644 --- a/internal/rtmp/amf/amf.go +++ b/internal/rtmp/amf/amf.go @@ -60,8 +60,8 @@ func DecodeValue(r io.Reader) (interface{}, error) { if _, err := io.ReadFull(r, marker[:]); err != nil { return nil, amferrors.NewAMFError("decode.value.marker.read", err) } - // Fast path for supported markers via existing helper (object.go) which - // expects the marker already consumed and reconstructs a reader with it. + // Dispatch to helper which decodes the payload directly after the + // marker has been consumed (no intermediate reader allocation). switch marker[0] { case markerNumber, markerBoolean, markerString, markerNull, markerObject, markerStrictArray: v, err := decodeValueWithMarker(marker[0], r) @@ -73,7 +73,8 @@ func DecodeValue(r io.Reader) (interface{}, error) { if unsupportedMarker(marker[0]) { return nil, amferrors.NewAMFError("decode.value.unsupported", fmt.Errorf("unsupported marker 0x%02x", marker[0])) } - // Any other marker (including 0x04, 0x08, 0x09 as standalone) we treat as unsupported per task scope. + // Any other AMF0 marker (0x04 MovieClip, 0x08 ECMA Array, 0x09 Object End) + // is unsupported per project scope. return nil, amferrors.NewAMFError("decode.value.unsupported", fmt.Errorf("unsupported marker 0x%02x", marker[0])) } From b97a0a41c7d54729b1f3243f312154c6501aa316 Mon Sep 17 00:00:00 2001 From: alxayo Date: Wed, 4 Mar 2026 14:28:35 +0200 Subject: [PATCH 24/24] docs: update CHANGELOG with feature/006-expvar-metrics changes Add complete changelog section covering all 21 commits on the branch: - Expvar metrics feature (counters, CLI flag, HTTP endpoint) - Performance optimizations (AMF MultiReader, chunk writer buffer, RPC lazy-init, hook manager pre-alloc) - Dead code removal (bufpool package, ErrForbidden sentinel) - Test improvements (golden helpers, t.Run subtests, edge cases, consolidated test doubles) - Update feature branch table and comparison links --- CHANGELOG.md | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9dbdffb..e4c32dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,31 @@ All notable changes to go-rtmp are documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] — feature/006-expvar-metrics + +### Added +- **Expvar metrics**: Live server counters via `expvar` (HTTP `/debug/vars` endpoint) tracking connections, publishers, subscribers, streams, audio/video messages, bytes ingested, relay stats, uptime, and Go version ([`671f2a6`](https://github.com/alxayo/rtmp-go/commit/671f2a6)) +- **`-metrics-addr` CLI flag**: Optional HTTP address (e.g. `:8080`) to expose the metrics endpoint; disabled by default ([`7f446c5`](https://github.com/alxayo/rtmp-go/commit/7f446c5)) +- **Metrics integration test**: End-to-end test validating `/debug/vars` HTTP endpoint serves all `rtmp_*` keys ([`a22a35d`](https://github.com/alxayo/rtmp-go/commit/a22a35d)) +- **Edge case tests**: Chunk writer boundary tests (chunkSize ±1) and publish handler nil-argument tests ([`7767161`](https://github.com/alxayo/rtmp-go/commit/7767161)) +- **Spec 006**: Expvar metrics feature specification and implementation plan ([`fa23693`](https://github.com/alxayo/rtmp-go/commit/fa23693)) + +### Changed +- **AMF0 decoding optimization**: Eliminated `io.MultiReader` allocations in nested value decoding by inlining payload reads in `decodeValueWithMarker`; new internal helpers `decodeObjectPayload`, `decodeStrictArrayPayload`, `decodeStringPayload` ([`a2367fa`](https://github.com/alxayo/rtmp-go/commit/a2367fa)) +- **Chunk writer optimization**: Added reusable scratch buffer to `Writer` struct, eliminating per-chunk `make()` allocation in `writeChunk` ([`215aa96`](https://github.com/alxayo/rtmp-go/commit/215aa96)) +- **RPC lazy-init**: `ConnectCommand.Extra` map only allocated when extra fields are present ([`b72b83a`](https://github.com/alxayo/rtmp-go/commit/b72b83a)) +- **Hook manager optimization**: `TriggerEvent` pre-allocates hook slice capacity for stdio hook ([`b808da2`](https://github.com/alxayo/rtmp-go/commit/b808da2)) +- **AMF golden helpers standardized**: All golden file test helpers use `t.Helper()` + `t.Fatalf()` consistently; removed duplicate `goldenDir` constants ([`29e31f8`](https://github.com/alxayo/rtmp-go/commit/29e31f8)) +- **Server test doubles consolidated**: Moved shared stubs (`stubConn`, `capturingConn`, `stubPublisher`) into `helpers_test.go` ([`d3f722f`](https://github.com/alxayo/rtmp-go/commit/d3f722f)) +- **Media test helper consolidated**: Removed duplicate `_tVidFatalf` — reuses `_tFatalf` from audio_test.go; added `t.Run` subtests to error case loops ([`cf89878`](https://github.com/alxayo/rtmp-go/commit/cf89878)) +- **AMF subtests**: Added `t.Run` with named subtests to `TestNumber_EdgeCases_RoundTrip` and `TestDecodeValue_UnsupportedMarkers` ([`e791988`](https://github.com/alxayo/rtmp-go/commit/e791988)) + +### Removed +- **Dead `bufpool` package**: `internal/bufpool/` was implemented but never imported; removed 263 lines of dead code ([`e4b37aa`](https://github.com/alxayo/rtmp-go/commit/e4b37aa)) +- **Dead `ErrForbidden` sentinel**: Auth sentinel error declared but never returned by any code path ([`8eaa72e`](https://github.com/alxayo/rtmp-go/commit/8eaa72e)) + +--- + ## [Unreleased] — feature/005-error-handling-benchmarks ### Added @@ -133,11 +158,12 @@ First feature-complete release of the RTMP server. Supports end-to-end streaming | Branch | Spec | Description | |--------|------|-------------| +| `feature/006-expvar-metrics` | [specs/006](specs/006-expvar-metrics/spec.md) | Expvar metrics, performance optimizations, dead code removal | | `feature/005-error-handling-benchmarks` | [specs/005](specs/005-error-handling-benchmarks/spec.md) | Error handling, connection cleanup, TCP deadlines, performance benchmarks | | `feature/004-token-auth` | [specs/004](specs/004-token-auth/spec.md) | Token-based stream key authentication with 4 validator backends | | `003-multi-destination-relay` | [specs/003](specs/003-multi-destination-relay/) | Multi-destination relay to external RTMP servers | | `T001-init-go-module` | [specs/001](specs/001-rtmp-server-implementation/spec.md) | Core RTMP server implementation (handshake through media streaming) | -[Unreleased]: https://github.com/alxayo/rtmp-go/compare/v0.1.1...feature/005-error-handling-benchmarks +[Unreleased]: https://github.com/alxayo/rtmp-go/compare/v0.1.1...feature/006-expvar-metrics [v0.1.1]: https://github.com/alxayo/rtmp-go/compare/v0.1.0...v0.1.1 [v0.1.0]: https://github.com/alxayo/rtmp-go/releases/tag/v0.1.0