diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index f0ebf4f..8411cbd 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -16,6 +16,7 @@ internal/rtmp/ │ ├── auth/ # Token-based authentication (Validator interface + backends) │ └── hooks/ # Event hooks (webhooks, shell scripts, stdio) ├── relay/ # Multi-destination relay with late-join support +├── metrics/ # Expvar counters (connections, publishers, subscribers, media) └── media/ # Audio/video message handling + FLV recording ``` diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e207ad..e4c32dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,31 @@ All notable changes to go-rtmp are documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] — feature/006-expvar-metrics + +### Added +- **Expvar metrics**: Live server counters via `expvar` (HTTP `/debug/vars` endpoint) tracking connections, publishers, subscribers, streams, audio/video messages, bytes ingested, relay stats, uptime, and Go version ([`671f2a6`](https://github.com/alxayo/rtmp-go/commit/671f2a6)) +- **`-metrics-addr` CLI flag**: Optional HTTP address (e.g. `:8080`) to expose the metrics endpoint; disabled by default ([`7f446c5`](https://github.com/alxayo/rtmp-go/commit/7f446c5)) +- **Metrics integration test**: End-to-end test validating `/debug/vars` HTTP endpoint serves all `rtmp_*` keys ([`a22a35d`](https://github.com/alxayo/rtmp-go/commit/a22a35d)) +- **Edge case tests**: Chunk writer boundary tests (chunkSize ±1) and publish handler nil-argument tests ([`7767161`](https://github.com/alxayo/rtmp-go/commit/7767161)) +- **Spec 006**: Expvar metrics feature specification and implementation plan ([`fa23693`](https://github.com/alxayo/rtmp-go/commit/fa23693)) + +### Changed +- **AMF0 decoding optimization**: Eliminated `io.MultiReader` allocations in nested value decoding by inlining payload reads in `decodeValueWithMarker`; new internal helpers `decodeObjectPayload`, `decodeStrictArrayPayload`, `decodeStringPayload` ([`a2367fa`](https://github.com/alxayo/rtmp-go/commit/a2367fa)) +- **Chunk writer optimization**: Added reusable scratch buffer to `Writer` struct, eliminating per-chunk `make()` allocation in `writeChunk` ([`215aa96`](https://github.com/alxayo/rtmp-go/commit/215aa96)) +- **RPC lazy-init**: `ConnectCommand.Extra` map only allocated when extra fields are present ([`b72b83a`](https://github.com/alxayo/rtmp-go/commit/b72b83a)) +- **Hook manager optimization**: `TriggerEvent` pre-allocates hook slice capacity for stdio hook ([`b808da2`](https://github.com/alxayo/rtmp-go/commit/b808da2)) +- **AMF golden helpers standardized**: All golden file test helpers use `t.Helper()` + `t.Fatalf()` consistently; removed duplicate `goldenDir` constants ([`29e31f8`](https://github.com/alxayo/rtmp-go/commit/29e31f8)) +- **Server test doubles consolidated**: Moved shared stubs (`stubConn`, `capturingConn`, `stubPublisher`) into `helpers_test.go` ([`d3f722f`](https://github.com/alxayo/rtmp-go/commit/d3f722f)) +- **Media test helper consolidated**: Removed duplicate `_tVidFatalf` — reuses `_tFatalf` from audio_test.go; added `t.Run` subtests to error case loops ([`cf89878`](https://github.com/alxayo/rtmp-go/commit/cf89878)) +- **AMF subtests**: Added `t.Run` with named subtests to `TestNumber_EdgeCases_RoundTrip` and `TestDecodeValue_UnsupportedMarkers` ([`e791988`](https://github.com/alxayo/rtmp-go/commit/e791988)) + +### Removed +- **Dead `bufpool` package**: `internal/bufpool/` was implemented but never imported; removed 263 lines of dead code ([`e4b37aa`](https://github.com/alxayo/rtmp-go/commit/e4b37aa)) +- **Dead `ErrForbidden` sentinel**: Auth sentinel error declared but never returned by any code path ([`8eaa72e`](https://github.com/alxayo/rtmp-go/commit/8eaa72e)) + +--- + ## [Unreleased] — feature/005-error-handling-benchmarks ### Added @@ -93,7 +118,6 @@ First feature-complete release of the RTMP server. Supports end-to-end streaming - **Connection lifecycle**: Handshake → control burst → command exchange → media streaming - **Stream registry**: Thread-safe map of stream keys to publisher/subscriber lists - **Structured logging**: `log/slog` with configurable levels (debug/info/warn/error) -- **Buffer pool**: Memory-efficient buffer reuse (`internal/bufpool`) - **Domain errors**: Typed error wrappers (`HandshakeError`, `ChunkError`, `AMFError`, `ProtocolError`, `TimeoutError`) #### Testing & Tooling @@ -134,11 +158,12 @@ First feature-complete release of the RTMP server. Supports end-to-end streaming | Branch | Spec | Description | |--------|------|-------------| +| `feature/006-expvar-metrics` | [specs/006](specs/006-expvar-metrics/spec.md) | Expvar metrics, performance optimizations, dead code removal | | `feature/005-error-handling-benchmarks` | [specs/005](specs/005-error-handling-benchmarks/spec.md) | Error handling, connection cleanup, TCP deadlines, performance benchmarks | | `feature/004-token-auth` | [specs/004](specs/004-token-auth/spec.md) | Token-based stream key authentication with 4 validator backends | | `003-multi-destination-relay` | [specs/003](specs/003-multi-destination-relay/) | Multi-destination relay to external RTMP servers | | `T001-init-go-module` | [specs/001](specs/001-rtmp-server-implementation/spec.md) | Core RTMP server implementation (handshake through media streaming) | -[Unreleased]: https://github.com/alxayo/rtmp-go/compare/v0.1.1...feature/005-error-handling-benchmarks +[Unreleased]: https://github.com/alxayo/rtmp-go/compare/v0.1.1...feature/006-expvar-metrics [v0.1.1]: https://github.com/alxayo/rtmp-go/compare/v0.1.0...v0.1.1 [v0.1.0]: https://github.com/alxayo/rtmp-go/releases/tag/v0.1.0 diff --git a/README.md b/README.md index 284c6ad..343afcd 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,7 @@ See [docs/getting-started.md](docs/getting-started.md) for the full guide with C | **Media Logging** | Per-connection codec detection and bitrate stats | | **Event Hooks** | Webhooks, shell scripts, and stdio notifications on RTMP events | | **Authentication** | Pluggable token-based validation for publish/play (static tokens, file, webhook) | +| **Metrics** | Expvar counters for connections, publishers, subscribers, media (HTTP `/debug/vars`) | | **Connection Cleanup** | TCP deadline enforcement (read 90s, write 30s), disconnect handlers, zombie detection | ## Architecture @@ -64,6 +65,7 @@ internal/rtmp/ │ └── hooks/ Event hook system (webhooks, shell, stdio) ├── media/ Audio/video parsing, codec detection, FLV recording ├── relay/ Multi-destination forwarding +├── metrics/ Expvar counters for live monitoring └── client/ Minimal test client ``` @@ -116,6 +118,7 @@ Integration tests in `tests/integration/` exercise the full publish → subscrib -hook-stdio-format Stdio hook output: json | env (default disabled) -hook-timeout Hook execution timeout (default 30s) -hook-concurrency Max concurrent hook executions (default 10) +-metrics-addr HTTP address for metrics endpoint (e.g. :8080). Empty = disabled -version Print version and exit ``` @@ -130,13 +133,13 @@ Integration tests in `tests/integration/` exercise the full publish → subscrib ### Recently Completed - Enhanced error handling: disconnect handlers, TCP deadline enforcement (read 90s, write 30s), relay client cleanup - Performance benchmarks for chunk parsing, AMF0 encoding, and array operations +- Expvar metrics: live counters for connections, publishers, subscribers, media bytes (HTTP `/debug/vars`) ### In Progress - Fuzz testing for AMF0 and chunk parsing (bounds safety) ### Planned - **RTMPS** — TLS/SSL encrypted connections -- **Expvar metrics** — live counters for connections, publishers, subscribers - **Configurable backpressure** — drop or disconnect policy for slow subscribers - **DVR / time-shift** — seek into live stream history - **Transcoding** — server-side codec conversion (e.g. H.265 → H.264) diff --git a/cmd/rtmp-server/flags.go b/cmd/rtmp-server/flags.go index 4c28e8f..de02af4 100644 --- a/cmd/rtmp-server/flags.go +++ b/cmd/rtmp-server/flags.go @@ -31,6 +31,9 @@ type cliConfig struct { hookTimeout string // hook execution timeout (e.g. "30s") hookConcurrency int // max concurrent hook executions + // Metrics + metricsAddr string // HTTP address for expvar metrics (e.g. ":8080"); empty = disabled + // Authentication authMode string // "none", "token", "file", "callback" authTokens []string // "streamKey=token" pairs (for mode=token) @@ -62,6 +65,9 @@ func parseFlags(args []string) (*cliConfig, error) { fs.StringVar(&cfg.hookTimeout, "hook-timeout", "30s", "Hook execution timeout") fs.IntVar(&cfg.hookConcurrency, "hook-concurrency", 10, "Max concurrent hook executions") + // Metrics + fs.StringVar(&cfg.metricsAddr, "metrics-addr", "", "HTTP address for metrics endpoint (e.g. :8080 or 127.0.0.1:8080). Empty = disabled") + // Authentication flags fs.StringVar(&cfg.authMode, "auth-mode", "none", "Authentication mode: none|token|file|callback") fs.Var(&authTokens, "auth-token", `Stream token: "streamKey=token" (repeatable, for -auth-mode=token)`) diff --git a/cmd/rtmp-server/main.go b/cmd/rtmp-server/main.go index 7e25570..a64a44b 100644 --- a/cmd/rtmp-server/main.go +++ b/cmd/rtmp-server/main.go @@ -2,7 +2,9 @@ package main import ( "context" + _ "expvar" // Register /debug/vars handler on DefaultServeMux "fmt" + "net/http" "os" "os/signal" "strings" @@ -10,6 +12,7 @@ import ( "time" "github.com/alxayo/go-rtmp/internal/logger" + _ "github.com/alxayo/go-rtmp/internal/rtmp/metrics" // Register expvar RTMP counters srv "github.com/alxayo/go-rtmp/internal/rtmp/server" "github.com/alxayo/go-rtmp/internal/rtmp/server/auth" ) @@ -62,6 +65,16 @@ func main() { log.Info("server started", "addr", server.Addr().String(), "version", version, "auth_mode", cfg.authMode) + // Start HTTP metrics server if configured + if cfg.metricsAddr != "" { + go func() { + log.Info("metrics HTTP server listening", "addr", cfg.metricsAddr) + if err := http.ListenAndServe(cfg.metricsAddr, nil); err != nil && err != http.ErrServerClosed { + log.Error("metrics HTTP server error", "error", err) + } + }() + } + // If using file-based auth, listen for SIGHUP to reload the token file if cfg.authMode == "file" { if fv, ok := authValidator.(*auth.FileValidator); ok { diff --git a/docs/architecture.md b/docs/architecture.md index eb32d2b..28c0831 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -126,8 +126,8 @@ Each incoming media message is routed through three paths: | `internal/rtmp/server/hooks` | Event notification (webhooks, shell, stdio) | `HookManager`, `Event`, `Hook` | | `internal/rtmp/media` | Audio/video parsing, codec detection, FLV recording | `Recorder`, `CodecDetector`, `Stream` | | `internal/rtmp/relay` | Multi-destination relay to external servers | `DestinationManager`, `Destination` | +| `internal/rtmp/metrics` | Expvar counters for live monitoring | `ConnectionsActive`, `ConnectionsTotal`, `BytesIngested` | | `internal/rtmp/client` | Minimal RTMP client for testing | `Client` | -| `internal/bufpool` | Memory pool for chunk buffers | `Pool` | | `internal/errors` | Domain-specific error types | `ProtocolError`, `ChunkError`, `AMFError` | | `internal/logger` | Structured logging with dynamic level | `Init()`, `Logger()`, `WithConn()` | diff --git a/docs/design.md b/docs/design.md index f8d4f3b..2af14b1 100644 --- a/docs/design.md +++ b/docs/design.md @@ -83,6 +83,17 @@ Tokens are passed by clients via URL query parameters in the stream name field ( The default mode is `none` (accept all requests), preserving backward compatibility. +### Expvar Metrics + +The server uses Go's `expvar` package for live monitoring. Expvar was chosen because: + +- **Zero dependencies**: part of the standard library +- **Thread-safe**: `expvar.Int` uses atomic int64 internally +- **HTTP-ready**: registers a handler on `DefaultServeMux` at `/debug/vars` +- **JSON output**: human- and machine-readable + +Metrics are organized as gauges (go up and down: active connections, publishers, subscribers, streams) and counters (monotonically increasing: total connections, media messages, bytes ingested, relay stats). The HTTP endpoint is opt-in via `-metrics-addr` so it has zero overhead when disabled. + ## Concurrency Model | Resource | Protection | Why | diff --git a/docs/getting-started.md b/docs/getting-started.md index 83e1125..63c998d 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -65,6 +65,15 @@ The server will forward all incoming media to the specified destinations. Available event types: `connection_accept`, `connection_close`, `publish_start`, `play_start`, `codec_detected`, `auth_failed`. +### With Metrics + +```bash +# Enable metrics endpoint on port 8080 +./rtmp-server -listen :1935 -metrics-addr :8080 +``` + +Query metrics at `http://localhost:8080/debug/vars` — returns JSON with all RTMP counters (connections, publishers, subscribers, media bytes, relay stats, uptime). + ### With Authentication ```bash @@ -112,6 +121,7 @@ OBS Studio: set **Server** to `rtmp://localhost:1935/live` and **Stream Key** to | `-hook-stdio-format` | (disabled) | Stdio output format: `json` or `env` | | `-hook-timeout` | `30s` | Hook execution timeout | | `-hook-concurrency` | `10` | Max concurrent hook executions | +| `-metrics-addr` | (disabled) | HTTP address for metrics endpoint (e.g. `:8080`). Empty = disabled | | `-version` | | Print version and exit | ## Test with FFmpeg diff --git a/docs/implementation.md b/docs/implementation.md index f718acb..7d13742 100644 --- a/docs/implementation.md +++ b/docs/implementation.md @@ -6,7 +6,6 @@ A code-level walkthrough of the go-rtmp server. Read [Architecture](architecture ``` internal/ -├── bufpool/ Buffer pool to reduce garbage collection pressure ├── errors/ Typed error wrappers (HandshakeError, ChunkError, etc.) ├── logger/ Structured JSON logging with runtime level changes └── rtmp/ @@ -21,6 +20,7 @@ internal/ │ └── hooks/ Event hooks (webhooks, shell scripts, stdio output) ├── media/ Audio/video parsing, codec detection, FLV recording ├── relay/ Multi-destination forwarding to external RTMP servers + ├── metrics/ Expvar counters for connections, publishers, subscribers └── client/ Minimal RTMP client for testing ``` diff --git a/docs/testing-guide.md b/docs/testing-guide.md index fb78219..c6c3d9c 100644 --- a/docs/testing-guide.md +++ b/docs/testing-guide.md @@ -15,7 +15,6 @@ This runs tests across all packages. Most complete in under 15 seconds. Some pac ### Core Utilities ```bash -go test ./internal/bufpool/ # Buffer pool allocation and concurrency go test ./internal/errors/ # Error type classification and unwrapping go test ./internal/logger/ # Log level parsing and field extraction ``` @@ -161,6 +160,5 @@ See [wireshark_rtmp_capture_guide.md](wireshark_rtmp_capture_guide.md) for detai | Play handler | `server/play_handler_test.go` | Subscriber addition, sequence header delivery | | Media logging | `server/media_logger_test.go` | Packet counting, codec detection, stats | | Audio/video parsing | `media/*_test.go` | Codec detection, frame type classification | -| Buffer pool | `bufpool/pool_test.go` | Allocation, reuse, concurrent access safety | | Event hooks | `server/hooks/hooks_test.go` | Hook registration, execution, concurrency pool, cleanup | | Integration | `tests/integration/*_test.go` | Full publish→play flow, multi-subscriber relay | diff --git a/internal/bufpool/pool.go b/internal/bufpool/pool.go deleted file mode 100644 index b7f9ceb..0000000 --- a/internal/bufpool/pool.go +++ /dev/null @@ -1,93 +0,0 @@ -package bufpool - -import "sync" - -// sizeClasses defines the predefined buffer sizes available in the pool. -// These are tuned for RTMP workloads: -// - 128 bytes: default RTMP chunk size (spec default) -// - 4096 bytes: recommended chunk size after server negotiation -// - 65536 bytes: maximum chunk size allowed by the protocol -var sizeClasses = []int{128, 4096, 65536} - -// classPool pairs a size class with its Go sync.Pool. -// sync.Pool is a built-in Go structure that caches allocated objects -// for reuse, reducing garbage collector pressure. -type classPool struct { - size int - pool *sync.Pool -} - -// Pool provides sized byte slices backed by reusable buffers to reduce GC churn. -type Pool struct { - pools []classPool -} - -// defaultPool is the package-level pool used by the convenience Get/Put functions. -// Most callers should use the top-level Get() and Put() instead of creating their own pool. -var defaultPool = New() - -// Get acquires a buffer from the package-level default pool. -func Get(size int) []byte { - return defaultPool.Get(size) -} - -// Put releases a buffer back to the package-level default pool. -func Put(buf []byte) { - defaultPool.Put(buf) -} - -// New creates a buffer pool with predefined size classes tailored for RTMP chunking workloads. -func New() *Pool { - pools := make([]classPool, len(sizeClasses)) - for i, classSize := range sizeClasses { - size := classSize - pools[i] = classPool{ - size: size, - pool: &sync.Pool{ - New: func() any { - return make([]byte, size) - }, - }, - } - } - return &Pool{pools: pools} -} - -// Get returns a byte slice whose length matches the requested size and whose capacity is the -// nearest predefined size class that can accommodate the request. Requests larger than the -// maximum size class allocate a fresh slice without pooling. -func (p *Pool) Get(size int) []byte { - if p == nil || size <= 0 { - return nil - } - - for i := range p.pools { - class := &p.pools[i] - if size <= class.size { - buf := class.pool.Get().([]byte) - return buf[:size] - } - } - - return make([]byte, size) -} - -// Put returns the provided buffer to the pool if its capacity matches a predefined size class. -// Buffers that do not match any class are discarded. The buffer is zeroed before reuse to avoid -// leaking data across callers. -func (p *Pool) Put(buf []byte) { - if p == nil || buf == nil { - return - } - - capBuf := cap(buf) - for i := range p.pools { - class := &p.pools[i] - if capBuf == class.size { - full := buf[:class.size] - clear(full) - class.pool.Put(full) - return - } - } -} diff --git a/internal/bufpool/pool_test.go b/internal/bufpool/pool_test.go deleted file mode 100644 index 4ac1b2d..0000000 --- a/internal/bufpool/pool_test.go +++ /dev/null @@ -1,170 +0,0 @@ -// Package bufpool tests verify the byte-slice pool used to reduce garbage-collection -// pressure when allocating temporary buffers for RTMP chunk reading/writing. -// -// Key concepts for beginners: -// - A "pool" recycles byte slices so the Go garbage collector doesn't have to -// allocate and free them on every RTMP message. -// - Buffers are bucketed into size classes (128, 4096, 65536, …). A request for -// 64 bytes returns a buffer whose *capacity* is rounded up to the next bucket (128). -// - sync.Pool (used internally) is safe for concurrent access. -package bufpool - -import ( - "fmt" - "sync" - "testing" -) - -// TestPoolGetReturnsSizedBuffer uses a table-driven pattern to verify that -// Pool.Get returns a byte slice whose length matches the requested size and -// whose capacity is rounded up to the nearest size-class bucket. -// -// Table-driven tests are a Go convention: each row in the "tests" slice -// describes one scenario (input + expected output). The loop runs each row -// as a named sub-test via t.Run so failures show exactly which case broke. -func TestPoolGetReturnsSizedBuffer(t *testing.T) { - // t.Parallel() allows this test to run concurrently with other tests, - // speeding up the overall test suite. - t.Parallel() - - p := New() - - // Each entry defines the requested size and the expected capacity after - // rounding up to the pool's size-class bucket. - tests := []struct { - name string - requestSize int - expectCap int - }{ - {name: "small", requestSize: 64, expectCap: 128}, - {name: "exact small", requestSize: 128, expectCap: 128}, - {name: "medium", requestSize: 1024, expectCap: 4096}, - {name: "large", requestSize: 5000, expectCap: 65536}, - {name: "oversized", requestSize: 131072, expectCap: 131072}, - {name: "zero", requestSize: 0, expectCap: 0}, - } - - for _, tc := range tests { - tc := tc // capture loop variable for parallel sub-tests (required in Go < 1.22) - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - buf := p.Get(tc.requestSize) - - // Zero-size is a special case: the pool returns an empty (nil) slice. - if tc.requestSize == 0 { - if len(buf) != 0 || cap(buf) != 0 { - t.Fatalf("expected zero-length buffer, got len=%d cap=%d", len(buf), cap(buf)) - } - return - } - - // len must equal the requested size (usable bytes). - if len(buf) != tc.requestSize { - t.Fatalf("expected len=%d, got %d", tc.requestSize, len(buf)) - } - - // cap must match the size-class bucket. - if cap(buf) != tc.expectCap { - t.Fatalf("expected cap=%d, got %d", tc.expectCap, cap(buf)) - } - }) - } -} - -// TestPoolPutReusesBuffer verifies that returning a buffer to the pool and -// requesting the same size again yields the *same* underlying memory (pointer -// equality). It also checks that the buffer is zeroed on return so stale data -// from one RTMP message doesn't leak into another. -func TestPoolPutReusesBuffer(t *testing.T) { - t.Parallel() - - p := New() - - // Get a 200-byte buffer (capacity will be 4096 – the next size class). - buf := p.Get(200) - if len(buf) != 200 { - t.Fatalf("expected len=200, got %d", len(buf)) - } - buf[0] = 42 // write sentinel value to prove zeroing later - - // Save the pointer to the first byte so we can compare it after reuse. - ptr := &buf[:1][0] - p.Put(buf) // return to pool - - // Request the same size again – we expect the pool to hand back the same slice. - reused := p.Get(200) - if len(reused) != 200 { - t.Fatalf("expected len=200, got %d", len(reused)) - } - - if cap(reused) != 4096 { - t.Fatalf("expected cap=4096, got %d", cap(reused)) - } - - // Pointer comparison: both should reference the same backing array. - if &reused[:1][0] != ptr { - t.Fatalf("expected to get the same buffer pointer back from pool") - } - - // Ensure every byte is zero (pool must clear before recycling). - for i, v := range reused { - if v != 0 { - t.Fatalf("expected buffer to be zeroed, found value %d at index %d", v, i) - } - } -} - -// TestPoolConcurrentAccess is a stress/race-detector test. It spawns 5 -// goroutines that each Get/Put 1000 times with different buffer sizes. -// Running with `go test -race` ensures no data races exist in the pool. -// -// Pattern explanation: -// - sync.WaitGroup tracks when all goroutines have finished. -// - A buffered error channel collects per-goroutine failures without -// blocking, so the main goroutine can report them after wg.Wait(). -func TestPoolConcurrentAccess(t *testing.T) { - t.Parallel() - - p := New() - var wg sync.WaitGroup - errCh := make(chan error, 5) // one slot per worker goroutine - - // worker performs 1000 Get→fill→Put cycles for a given buffer size. - worker := func(size int) { - defer wg.Done() - for i := 0; i < 1000; i++ { - buf := p.Get(size) - if len(buf) != size { - errCh <- fmt.Errorf("expected len=%d, got %d", size, len(buf)) - return - } - if cap(buf) < size { - errCh <- fmt.Errorf("expected cap >= %d, got %d", size, cap(buf)) - return - } - // Fill buffer to exercise memory – helps the race detector - // catch unsynchronized access. - for j := range buf { - buf[j] = byte(i) - } - p.Put(buf) - } - } - - // Spawn one goroutine per size class. - sizes := []int{64, 512, 2048, 8192, 40000} - for _, size := range sizes { - size := size // capture for goroutine closure - wg.Add(1) - go worker(size) - } - - wg.Wait() - close(errCh) - - // Report any errors collected from workers. - for err := range errCh { - t.Fatal(err) - } -} diff --git a/internal/rtmp/amf/amf.go b/internal/rtmp/amf/amf.go index c26f202..37a037d 100644 --- a/internal/rtmp/amf/amf.go +++ b/internal/rtmp/amf/amf.go @@ -60,8 +60,8 @@ func DecodeValue(r io.Reader) (interface{}, error) { if _, err := io.ReadFull(r, marker[:]); err != nil { return nil, amferrors.NewAMFError("decode.value.marker.read", err) } - // Fast path for supported markers via existing helper (object.go) which - // expects the marker already consumed and reconstructs a reader with it. + // Dispatch to helper which decodes the payload directly after the + // marker has been consumed (no intermediate reader allocation). switch marker[0] { case markerNumber, markerBoolean, markerString, markerNull, markerObject, markerStrictArray: v, err := decodeValueWithMarker(marker[0], r) @@ -73,7 +73,8 @@ func DecodeValue(r io.Reader) (interface{}, error) { if unsupportedMarker(marker[0]) { return nil, amferrors.NewAMFError("decode.value.unsupported", fmt.Errorf("unsupported marker 0x%02x", marker[0])) } - // Any other marker (including 0x04, 0x08, 0x09 as standalone) we treat as unsupported per task scope. + // Any other AMF0 marker (0x04 MovieClip, 0x08 ECMA Array, 0x09 Object End) + // is unsupported per project scope. return nil, amferrors.NewAMFError("decode.value.unsupported", fmt.Errorf("unsupported marker 0x%02x", marker[0])) } diff --git a/internal/rtmp/amf/amf_test.go b/internal/rtmp/amf/amf_test.go index 4297264..7208655 100644 --- a/internal/rtmp/amf/amf_test.go +++ b/internal/rtmp/amf/amf_test.go @@ -104,10 +104,12 @@ func TestDecodeValue_UnsupportedMarkers(t *testing.T) { // Markers explicitly rejected: 0x06 (Undefined), 0x07 (Reference), 0x0B (Date), 0x11 (AMF3 switch) markers := []byte{0x06, 0x07, 0x0B, 0x11} for _, m := range markers { - _, err := DecodeValue(bytes.NewReader([]byte{m})) - if err == nil { - t.Fatalf("marker 0x%02x expected error", m) - } + t.Run(fmt.Sprintf("marker_0x%02x", m), func(t *testing.T) { + _, err := DecodeValue(bytes.NewReader([]byte{m})) + if err == nil { + t.Fatalf("marker 0x%02x expected error", m) + } + }) } } diff --git a/internal/rtmp/amf/array.go b/internal/rtmp/amf/array.go index a23ce0f..bbed297 100644 --- a/internal/rtmp/amf/array.go +++ b/internal/rtmp/amf/array.go @@ -39,7 +39,7 @@ func EncodeStrictArray(w io.Writer, arr []interface{}) error { // Error cases include: // - Marker mismatch (decode.array.marker) // - Short reads for header or elements (decode.array.header.read / decode.array.element.read) -// - Unsupported nested type markers (bubbled from decodeValueWithMarker) +// - Unsupported nested type markers (bubbled from DecodeValue) func DecodeStrictArray(r io.Reader) ([]interface{}, error) { var marker [1]byte if _, err := io.ReadFull(r, marker[:]); err != nil { @@ -48,25 +48,7 @@ func DecodeStrictArray(r io.Reader) ([]interface{}, error) { if marker[0] != markerStrictArray { return nil, amferrors.NewAMFError("decode.array.marker", fmt.Errorf("expected 0x%02x got 0x%02x", markerStrictArray, marker[0])) } - var countBuf [4]byte - if _, err := io.ReadFull(r, countBuf[:]); err != nil { - return nil, amferrors.NewAMFError("decode.array.count.read", err) - } - count := binary.BigEndian.Uint32(countBuf[:]) - out := make([]interface{}, 0, count) - for i := uint32(0); i < count; i++ { - // Read marker for element then dispatch. - var elemMarker [1]byte - if _, err := io.ReadFull(r, elemMarker[:]); err != nil { - return nil, amferrors.NewAMFError("decode.array.element.marker.read", err) - } - val, err := decodeValueWithMarker(elemMarker[0], r) - if err != nil { - return nil, amferrors.NewAMFError("decode.array.element", fmt.Errorf("index %d: %w", i, err)) - } - out = append(out, val) - } - return out, nil + return decodeStrictArrayPayload(r) } // roundTripStrictArray is a helper for tests: encode then decode an array for round-trip verification. diff --git a/internal/rtmp/amf/array_test.go b/internal/rtmp/amf/array_test.go index c04e7a4..01d1929 100644 --- a/internal/rtmp/amf/array_test.go +++ b/internal/rtmp/amf/array_test.go @@ -19,8 +19,8 @@ import ( ) // readGoldenArray loads a golden binary vector for array tests. -// Reuses the goldenDir constant from number_test.go. func readGoldenArray(t *testing.T, name string) []byte { + t.Helper() p := filepath.Join(goldenDir, name) b, err := os.ReadFile(p) if err != nil { diff --git a/internal/rtmp/amf/boolean_test.go b/internal/rtmp/amf/boolean_test.go index 8cf0115..64a72b1 100644 --- a/internal/rtmp/amf/boolean_test.go +++ b/internal/rtmp/amf/boolean_test.go @@ -12,13 +12,10 @@ import ( "testing" ) -// Reuse goldenDir constant pattern from number_test.go (keep consistency even if duplicated). -const goldenDirBoolean = "../../../tests/golden" - // readGoldenBoolean loads a golden binary vector. func readGoldenBoolean(t *testing.T, name string) []byte { t.Helper() - p := filepath.Join(goldenDirBoolean, name) + p := filepath.Join(goldenDir, name) b, err := os.ReadFile(p) if err != nil { t.Fatalf("read golden %s: %v", name, err) diff --git a/internal/rtmp/amf/null_test.go b/internal/rtmp/amf/null_test.go index 55a3c93..3617aac 100644 --- a/internal/rtmp/amf/null_test.go +++ b/internal/rtmp/amf/null_test.go @@ -11,12 +11,10 @@ import ( "testing" ) -const goldenDirNull = "../../../tests/golden" - // readGoldenNull loads the golden file for null tests. func readGoldenNull(t *testing.T, name string) []byte { t.Helper() - p := filepath.Join(goldenDirNull, name) + p := filepath.Join(goldenDir, name) b, err := os.ReadFile(p) if err != nil { t.Fatalf("read golden %s: %v", name, err) diff --git a/internal/rtmp/amf/number_test.go b/internal/rtmp/amf/number_test.go index 8247dc3..a240573 100644 --- a/internal/rtmp/amf/number_test.go +++ b/internal/rtmp/amf/number_test.go @@ -14,6 +14,7 @@ package amf import ( "bytes" + "fmt" "math" "os" "path/filepath" @@ -23,15 +24,13 @@ import ( const goldenDir = "../../../tests/golden" // relative to this test file directory // readGolden loads a golden binary vector from tests/golden/. -// It panics on failure because missing golden files indicate a broken -// test environment, not a test failure. +// Missing golden files indicate a broken test environment. func readGolden(t *testing.T, name string) []byte { - // Using filepath.Join for Windows compatibility. + t.Helper() p := filepath.Join(goldenDir, name) b, err := os.ReadFile(p) if err != nil { - // Provide context but fail fast; golden vectors are required. - panic(err) + t.Fatalf("read golden %s: %v", name, err) } return b } @@ -94,17 +93,19 @@ func TestDecodeNumber_Golden_1_5(t *testing.T) { func TestNumber_EdgeCases_RoundTrip(t *testing.T) { cases := []float64{1.0, -1.0, math.Inf(1), math.Inf(-1)} for _, in := range cases { - var buf bytes.Buffer - if err := EncodeNumber(&buf, in); err != nil { - t.Fatalf("encode %v: %v", in, err) - } - out, err := DecodeNumber(&buf) - if err != nil { - t.Fatalf("decode %v: %v", in, err) - } - if in != out { - t.Fatalf("mismatch: in=%v out=%v", in, out) - } + t.Run(fmt.Sprintf("%v", in), func(t *testing.T) { + var buf bytes.Buffer + if err := EncodeNumber(&buf, in); err != nil { + t.Fatalf("encode %v: %v", in, err) + } + out, err := DecodeNumber(&buf) + if err != nil { + t.Fatalf("decode %v: %v", in, err) + } + if in != out { + t.Fatalf("mismatch: in=%v out=%v", in, out) + } + }) } } diff --git a/internal/rtmp/amf/object.go b/internal/rtmp/amf/object.go index 7b455ea..e4b1bad 100644 --- a/internal/rtmp/amf/object.go +++ b/internal/rtmp/amf/object.go @@ -1,10 +1,10 @@ package amf import ( - "bytes" "encoding/binary" "fmt" "io" + "math" "sort" amferrors "github.com/alxayo/go-rtmp/internal/errors" @@ -108,6 +108,61 @@ func DecodeObject(r io.Reader) (map[string]interface{}, error) { if mMarker[0] != markerObject { return nil, amferrors.NewAMFError("decode.object.marker", fmt.Errorf("expected 0x%02x got 0x%02x", markerObject, mMarker[0])) } + return decodeObjectPayload(r) +} + +// decodeValueWithMarker dispatches based on an already-consumed marker byte. +// It reads the remaining payload from r without re-reading the marker, avoiding +// the allocation overhead of io.MultiReader. +func decodeValueWithMarker(marker byte, r io.Reader) (interface{}, error) { + switch marker { + case markerNumber: + var num [8]byte + if _, err := io.ReadFull(r, num[:]); err != nil { + return nil, amferrors.NewAMFError("decode.number.read", err) + } + u := binary.BigEndian.Uint64(num[:]) + return math.Float64frombits(u), nil + case markerBoolean: + var b [1]byte + if _, err := io.ReadFull(r, b[:]); err != nil { + return nil, amferrors.NewAMFError("decode.boolean.read", err) + } + return b[0] != 0x00, nil + case markerString: + return decodeStringPayload(r) + case markerNull: + return nil, nil // null has no payload beyond the marker + case markerObject: + return decodeObjectPayload(r) + case markerStrictArray: + return decodeStrictArrayPayload(r) + default: + return nil, fmt.Errorf("unsupported marker 0x%02x", marker) + } +} + +// decodeStringPayload reads an AMF0 string payload (length + bytes) after the +// marker has already been consumed. +func decodeStringPayload(r io.Reader) (string, error) { + var ln [2]byte + if _, err := io.ReadFull(r, ln[:]); err != nil { + return "", amferrors.NewAMFError("decode.string.length.read", err) + } + l := binary.BigEndian.Uint16(ln[:]) + if l == 0 { + return "", nil + } + buf := make([]byte, l) + if _, err := io.ReadFull(r, buf); err != nil { + return "", amferrors.NewAMFError("decode.string.read", err) + } + return string(buf), nil +} + +// decodeObjectPayload reads an AMF0 object payload (key-value pairs + end marker) +// after the object marker has already been consumed. +func decodeObjectPayload(r io.Reader) (map[string]interface{}, error) { out := make(map[string]interface{}) for { var klenBuf [2]byte @@ -131,13 +186,8 @@ func DecodeObject(r io.Reader) (map[string]interface{}, error) { } key := string(keyBytes) - // Peek marker for value to dispatch. We read one byte, then re-create a reader with it prefixed. - var valMarker [1]byte - if _, err := io.ReadFull(r, valMarker[:]); err != nil { - return nil, amferrors.NewAMFError("decode.object.value.marker.read", err) - } - - val, err := decodeValueWithMarker(valMarker[0], r) + // Decode the value (reads marker internally). + val, err := DecodeValue(r) if err != nil { return nil, amferrors.NewAMFError("decode.object.value", fmt.Errorf("key '%s': %w", key, err)) } @@ -146,26 +196,21 @@ func DecodeObject(r io.Reader) (map[string]interface{}, error) { return out, nil } -// decodeValueWithMarker dispatches based on an already-consumed marker byte. It consumes the -// remaining payload from r appropriate to the marker. -func decodeValueWithMarker(marker byte, r io.Reader) (interface{}, error) { - switch marker { - case markerNumber: - // Reconstruct a reader including the marker to reuse existing decoder. - return DecodeNumber(io.MultiReader(bytes.NewReader([]byte{marker}), r)) - case markerBoolean: - return DecodeBoolean(io.MultiReader(bytes.NewReader([]byte{marker}), r)) - case markerString: - return DecodeString(io.MultiReader(bytes.NewReader([]byte{marker}), r)) - case markerNull: - v, err := DecodeNull(io.MultiReader(bytes.NewReader([]byte{marker}), r)) - return v, err - case markerObject: - // Nested object: reuse DecodeObject by reconstructing the marker. - return DecodeObject(io.MultiReader(bytes.NewReader([]byte{marker}), r)) - case markerStrictArray: - return DecodeStrictArray(io.MultiReader(bytes.NewReader([]byte{marker}), r)) - default: - return nil, fmt.Errorf("unsupported marker 0x%02x", marker) +// decodeStrictArrayPayload reads an AMF0 strict array payload (count + elements) +// after the array marker has already been consumed. +func decodeStrictArrayPayload(r io.Reader) ([]interface{}, error) { + var countBuf [4]byte + if _, err := io.ReadFull(r, countBuf[:]); err != nil { + return nil, amferrors.NewAMFError("decode.array.count.read", err) } + count := binary.BigEndian.Uint32(countBuf[:]) + out := make([]interface{}, 0, count) + for i := uint32(0); i < count; i++ { + val, err := DecodeValue(r) + if err != nil { + return nil, amferrors.NewAMFError("decode.array.element", fmt.Errorf("index %d: %w", i, err)) + } + out = append(out, val) + } + return out, nil } diff --git a/internal/rtmp/amf/string_test.go b/internal/rtmp/amf/string_test.go index 612ac0c..be0237b 100644 --- a/internal/rtmp/amf/string_test.go +++ b/internal/rtmp/amf/string_test.go @@ -13,15 +13,13 @@ import ( "testing" ) -const goldenDirString = "../../../tests/golden" - -// readGoldenString loads a golden vector; mirrors the helper in number_test.go. +// readGoldenString loads a golden vector; reuses goldenDir from number_test.go. func readGoldenString(t *testing.T, name string) []byte { - // mirror helper pattern used in other tests - p := filepath.Join(goldenDirString, name) + t.Helper() + p := filepath.Join(goldenDir, name) b, err := os.ReadFile(p) if err != nil { - panic(err) + t.Fatalf("read golden %s: %v", name, err) } return b } diff --git a/internal/rtmp/chunk/writer.go b/internal/rtmp/chunk/writer.go index ccb0fa4..e363bf3 100644 --- a/internal/rtmp/chunk/writer.go +++ b/internal/rtmp/chunk/writer.go @@ -145,6 +145,7 @@ type Writer struct { w io.Writer chunkSize uint32 // outbound chunk size (default 128 if zero) lastHeaders map[uint32]*ChunkHeader // per-CSID state for FMT compression + scratch []byte // reusable buffer for writeChunk to avoid per-call allocations } // NewWriter creates a new chunk Writer. @@ -247,7 +248,7 @@ func (w *Writer) WriteMessage(msg *Message) error { if uint32(len(toSend)) > cs { toSend = toSend[:cs] } - if err := writeChunk(w.w, hdr, toSend); err != nil { + if err := w.writeChunk(hdr, toSend); err != nil { return err } written := uint32(len(toSend)) @@ -282,7 +283,7 @@ func (w *Writer) WriteMessage(msg *Message) error { if end > uint32(len(msg.Payload)) { return fmt.Errorf("writer: bounds (end=%d > len=%d)", end, len(msg.Payload)) } - if err := writeChunk(w.w, hdr3, msg.Payload[start:end]); err != nil { + if err := w.writeChunk(hdr3, msg.Payload[start:end]); err != nil { return err } written = end @@ -290,13 +291,19 @@ func (w *Writer) WriteMessage(msg *Message) error { return nil } -// writeChunk concatenates the header and payload into a single buffer and writes -// it in one call. This ensures the chunk is sent atomically — the header and -// payload bytes won't be split across separate TCP packets. -func writeChunk(w io.Writer, header []byte, payload []byte) error { - buf := make([]byte, 0, len(header)+len(payload)) - buf = append(buf, header...) - buf = append(buf, payload...) - _, err := w.Write(buf) +// writeChunk concatenates the header and payload into the Writer's scratch +// buffer and writes it in one call. This ensures the chunk is sent atomically +// (header and payload bytes won't be split across separate TCP packets) while +// reusing the buffer across calls to avoid per-chunk allocations. +func (w *Writer) writeChunk(header []byte, payload []byte) error { + need := len(header) + len(payload) + if cap(w.scratch) < need { + w.scratch = make([]byte, need) + } else { + w.scratch = w.scratch[:need] + } + copy(w.scratch, header) + copy(w.scratch[len(header):], payload) + _, err := w.w.Write(w.scratch) return err } diff --git a/internal/rtmp/chunk/writer_test.go b/internal/rtmp/chunk/writer_test.go index e053af4..615c414 100644 --- a/internal/rtmp/chunk/writer_test.go +++ b/internal/rtmp/chunk/writer_test.go @@ -379,6 +379,45 @@ func TestWriter_ChunkReaderRoundTrip(t *testing.T) { } } +// TestWriter_WriteMessage_BoundaryChunkSizes verifies correct chunking at +// the exact chunk size boundary: payloads of chunkSize-1, chunkSize, and +// chunkSize+1 bytes. Off-by-one errors in the fragmentation loop would +// show up here. +func TestWriter_WriteMessage_BoundaryChunkSizes(t *testing.T) { + for _, cs := range []uint32{128, 4096} { + for _, delta := range []int{-1, 0, 1} { + size := int(cs) + delta + if size <= 0 { + continue + } + name := fmt.Sprintf("cs%d_payload%d", cs, size) + t.Run(name, func(t *testing.T) { + var sw simpleWriter + w := NewWriter(&sw, cs) + payload := bytes.Repeat([]byte{0xDD}, size) + msg := &Message{ + CSID: 6, Timestamp: 1000, TypeID: 9, + MessageStreamID: 1, + MessageLength: uint32(size), + Payload: payload, + } + if err := w.WriteMessage(msg); err != nil { + t.Fatalf("write: %v", err) + } + // Round-trip through Reader to verify correctness + r := NewReader(bytes.NewReader(sw.Bytes()), cs) + out, err := r.ReadMessage() + if err != nil { + t.Fatalf("read: %v", err) + } + if !bytes.Equal(out.Payload, payload) { + t.Fatalf("round-trip payload mismatch (len want=%d got=%d)", len(payload), len(out.Payload)) + } + }) + } + } +} + // --- Benchmarks --- // BenchmarkEncodeChunkHeader_FMT0 benchmarks header serialization for a full FMT0 header. diff --git a/internal/rtmp/media/audio_test.go b/internal/rtmp/media/audio_test.go index 3e981ee..9a71330 100644 --- a/internal/rtmp/media/audio_test.go +++ b/internal/rtmp/media/audio_test.go @@ -81,9 +81,11 @@ func TestParseAudioMessage_Errors(t *testing.T) { {"unsupported", []byte{15 << 4, 0x01}}, // 15 not supported } for _, tc := range cases { - if _, err := ParseAudioMessage(tc.in); err == nil { - _tFatalf(t, "expected error for case %s", tc.name) - } + t.Run(tc.name, func(t *testing.T) { + if _, err := ParseAudioMessage(tc.in); err == nil { + _tFatalf(t, "expected error for case %s", tc.name) + } + }) } } diff --git a/internal/rtmp/media/video_test.go b/internal/rtmp/media/video_test.go index 0f14a90..fd52907 100644 --- a/internal/rtmp/media/video_test.go +++ b/internal/rtmp/media/video_test.go @@ -14,13 +14,6 @@ package media import "testing" -// _tVidFatalf is a test helper (like _tFatalf in audio_test.go) that -// marks the caller as failure location. -func _tVidFatalf(t *testing.T, format string, args ...interface{}) { - t.Helper() - t.Fatalf(format, args...) -} - // TestParseVideoMessage_AVCSequenceHeader verifies parsing of an H.264 // sequence header (frameType=1 keyframe, codecID=7 AVC, avcPacketType=0). // The sequence header contains SPS/PPS data used to initialize the decoder. @@ -29,19 +22,19 @@ func TestParseVideoMessage_AVCSequenceHeader(t *testing.T) { data := []byte{(1 << 4) | 7, 0x00, 0x17, 0x34, 0x56} // 0x00 = sequence header; rest pretend SPS/PPS bytes m, err := ParseVideoMessage(data) if err != nil { - _tVidFatalf(t, "unexpected error: %v", err) + _tFatalf(t, "unexpected error: %v", err) } if m.Codec != VideoCodecAVC { - _tVidFatalf(t, "codec mismatch want AVC got %s", m.Codec) + _tFatalf(t, "codec mismatch want AVC got %s", m.Codec) } if m.FrameType != VideoFrameTypeKey { - _tVidFatalf(t, "frameType mismatch want keyframe got %s", m.FrameType) + _tFatalf(t, "frameType mismatch want keyframe got %s", m.FrameType) } if m.PacketType != AVCPacketTypeSequenceHeader { - _tVidFatalf(t, "packetType mismatch want sequence_header got %s", m.PacketType) + _tFatalf(t, "packetType mismatch want sequence_header got %s", m.PacketType) } if len(m.Payload) != 3 || m.Payload[0] != 0x17 { - _tVidFatalf(t, "payload mismatch: %+v", m.Payload) + _tFatalf(t, "payload mismatch: %+v", m.Payload) } } @@ -52,13 +45,13 @@ func TestParseVideoMessage_AVCKeyframeNALU(t *testing.T) { data := []byte{(1 << 4) | 7, 0x01, 0xAA, 0xBB, 0xCC} m, err := ParseVideoMessage(data) if err != nil { - _tVidFatalf(t, "unexpected error: %v", err) + _tFatalf(t, "unexpected error: %v", err) } if m.Codec != VideoCodecAVC || m.PacketType != AVCPacketTypeNALU || m.FrameType != VideoFrameTypeKey { - _tVidFatalf(t, "unexpected metadata: %+v", m) + _tFatalf(t, "unexpected metadata: %+v", m) } if len(m.Payload) != 3 || m.Payload[2] != 0xCC { - _tVidFatalf(t, "payload mismatch: %+v", m.Payload) + _tFatalf(t, "payload mismatch: %+v", m.Payload) } } @@ -69,13 +62,13 @@ func TestParseVideoMessage_AVCInterNALU(t *testing.T) { data := []byte{(2 << 4) | 7, 0x01, 0x01, 0x02} m, err := ParseVideoMessage(data) if err != nil { - _tVidFatalf(t, "unexpected error: %v", err) + _tFatalf(t, "unexpected error: %v", err) } if m.FrameType != VideoFrameTypeInter || m.PacketType != AVCPacketTypeNALU { - _tVidFatalf(t, "unexpected metadata: %+v", m) + _tFatalf(t, "unexpected metadata: %+v", m) } if len(m.Payload) != 2 || m.Payload[0] != 0x01 { - _tVidFatalf(t, "payload mismatch: %+v", m.Payload) + _tFatalf(t, "payload mismatch: %+v", m.Payload) } } @@ -91,8 +84,10 @@ func TestParseVideoMessage_ErrorCases(t *testing.T) { {"unsupportedCodec", []byte{(1 << 4) | 5, 0x00}}, // codec 5 (On2 VP6) -> unsupported } for _, tc := range cases { - if _, err := ParseVideoMessage(tc.in); err == nil { - _tVidFatalf(t, "expected error for case %s", tc.name) - } + t.Run(tc.name, func(t *testing.T) { + if _, err := ParseVideoMessage(tc.in); err == nil { + _tFatalf(t, "expected error for case %s", tc.name) + } + }) } } diff --git a/internal/rtmp/metrics/metrics.go b/internal/rtmp/metrics/metrics.go new file mode 100644 index 0000000..4de169f --- /dev/null +++ b/internal/rtmp/metrics/metrics.go @@ -0,0 +1,79 @@ +package metrics + +// Expvar Metrics +// ============== +// Package-level expvar counters for live server observability. +// All variables use atomic int64 internally (via expvar.Int) and are +// safe for concurrent access from any goroutine. +// +// Gauges (values go up and down): +// - ConnectionsActive, StreamsActive, PublishersActive, SubscribersActive +// +// Counters (monotonically increasing): +// - ConnectionsTotal, PublishersTotal, SubscribersTotal +// - MessagesAudio, MessagesVideo, BytesIngested +// - RelayMessagesSent, RelayMessagesDropped, RelayBytesSent + +import ( + "expvar" + "runtime" + "time" +) + +// startTime records when the package was initialized (≈ process start). +var startTime = time.Now() + +// ── Connection metrics ────────────────────────────────────────────── + +var ( + ConnectionsActive = expvar.NewInt("rtmp_connections_active") + ConnectionsTotal = expvar.NewInt("rtmp_connections_total") +) + +// ── Stream metrics ────────────────────────────────────────────────── + +var ( + StreamsActive = expvar.NewInt("rtmp_streams_active") +) + +// ── Publisher metrics ─────────────────────────────────────────────── + +var ( + PublishersActive = expvar.NewInt("rtmp_publishers_active") + PublishersTotal = expvar.NewInt("rtmp_publishers_total") +) + +// ── Subscriber metrics ────────────────────────────────────────────── + +var ( + SubscribersActive = expvar.NewInt("rtmp_subscribers_active") + SubscribersTotal = expvar.NewInt("rtmp_subscribers_total") +) + +// ── Media metrics ─────────────────────────────────────────────────── + +var ( + MessagesAudio = expvar.NewInt("rtmp_messages_audio") + MessagesVideo = expvar.NewInt("rtmp_messages_video") + BytesIngested = expvar.NewInt("rtmp_bytes_ingested") +) + +// ── Relay metrics ─────────────────────────────────────────────────── + +var ( + RelayMessagesSent = expvar.NewInt("rtmp_relay_messages_sent") + RelayMessagesDropped = expvar.NewInt("rtmp_relay_messages_dropped") + RelayBytesSent = expvar.NewInt("rtmp_relay_bytes_sent") +) + +func init() { + expvar.Publish("rtmp_uptime_seconds", expvar.Func(func() interface{} { + return int64(time.Since(startTime).Seconds()) + })) + + expvar.Publish("rtmp_server_info", expvar.Func(func() interface{} { + return map[string]string{ + "go_version": runtime.Version(), + } + })) +} diff --git a/internal/rtmp/metrics/metrics_test.go b/internal/rtmp/metrics/metrics_test.go new file mode 100644 index 0000000..8077296 --- /dev/null +++ b/internal/rtmp/metrics/metrics_test.go @@ -0,0 +1,128 @@ +package metrics + +import ( + "encoding/json" + "expvar" + "net/http" + "net/http/httptest" + "runtime" + "strings" + "testing" +) + +func TestCountersInitializedToZero(t *testing.T) { + counters := []*expvar.Int{ + ConnectionsActive, ConnectionsTotal, + StreamsActive, + PublishersActive, PublishersTotal, + SubscribersActive, SubscribersTotal, + MessagesAudio, MessagesVideo, BytesIngested, + RelayMessagesSent, RelayMessagesDropped, RelayBytesSent, + } + for _, c := range counters { + if v := c.Value(); v != 0 { + t.Errorf("counter should be 0, got %d", v) + } + } +} + +func TestGaugeAddAndSubtract(t *testing.T) { + // Use ConnectionsActive as a representative gauge. + defer ConnectionsActive.Set(0) + + ConnectionsActive.Add(1) + ConnectionsActive.Add(1) + ConnectionsActive.Add(1) + if v := ConnectionsActive.Value(); v != 3 { + t.Fatalf("expected 3, got %d", v) + } + + ConnectionsActive.Add(-1) + if v := ConnectionsActive.Value(); v != 2 { + t.Fatalf("expected 2, got %d", v) + } +} + +func TestCounterMonotonic(t *testing.T) { + defer ConnectionsTotal.Set(0) + + for i := 0; i < 100; i++ { + ConnectionsTotal.Add(1) + } + if v := ConnectionsTotal.Value(); v != 100 { + t.Fatalf("expected 100, got %d", v) + } +} + +func TestUptimePositive(t *testing.T) { + v := expvar.Get("rtmp_uptime_seconds") + if v == nil { + t.Fatal("rtmp_uptime_seconds not registered") + } + // The func returns int64; expvar serializes it as a number. + raw := v.String() + if raw == "" { + t.Fatal("empty uptime string") + } + // Uptime should be ≥ 0 (could be 0 if test runs within the first second) + var uptime int64 + if err := json.Unmarshal([]byte(raw), &uptime); err != nil { + t.Fatalf("failed to parse uptime: %v", err) + } + if uptime < 0 { + t.Fatalf("uptime should be >= 0, got %d", uptime) + } +} + +func TestServerInfoContainsGoVersion(t *testing.T) { + v := expvar.Get("rtmp_server_info") + if v == nil { + t.Fatal("rtmp_server_info not registered") + } + raw := v.String() + var info map[string]string + if err := json.Unmarshal([]byte(raw), &info); err != nil { + t.Fatalf("failed to parse server_info: %v", err) + } + goVer, ok := info["go_version"] + if !ok { + t.Fatal("go_version key missing from server_info") + } + if goVer != runtime.Version() { + t.Fatalf("expected %s, got %s", runtime.Version(), goVer) + } +} + +func TestExpvarHandlerContainsRTMPKeys(t *testing.T) { + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/debug/vars", nil) + expvar.Handler().ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", rec.Code) + } + + body := rec.Body.String() + expectedKeys := []string{ + "rtmp_connections_active", + "rtmp_connections_total", + "rtmp_streams_active", + "rtmp_publishers_active", + "rtmp_publishers_total", + "rtmp_subscribers_active", + "rtmp_subscribers_total", + "rtmp_messages_audio", + "rtmp_messages_video", + "rtmp_bytes_ingested", + "rtmp_relay_messages_sent", + "rtmp_relay_messages_dropped", + "rtmp_relay_bytes_sent", + "rtmp_uptime_seconds", + "rtmp_server_info", + } + for _, key := range expectedKeys { + if !strings.Contains(body, key) { + t.Errorf("expvar output missing key %q", key) + } + } +} diff --git a/internal/rtmp/relay/destination.go b/internal/rtmp/relay/destination.go index 664c397..367a708 100644 --- a/internal/rtmp/relay/destination.go +++ b/internal/rtmp/relay/destination.go @@ -9,6 +9,7 @@ import ( "time" "github.com/alxayo/go-rtmp/internal/rtmp/chunk" + "github.com/alxayo/go-rtmp/internal/rtmp/metrics" ) // RTMPClient defines the interface for connecting to a remote RTMP server @@ -160,6 +161,7 @@ func (d *Destination) SendMessage(msg *chunk.Message) error { d.mu.Lock() d.Metrics.MessagesDropped++ d.mu.Unlock() + metrics.RelayMessagesDropped.Add(1) return fmt.Errorf("destination not connected (status: %v)", status) } @@ -179,6 +181,7 @@ func (d *Destination) SendMessage(msg *chunk.Message) error { d.LastError = err d.Metrics.MessagesDropped++ d.mu.Unlock() + metrics.RelayMessagesDropped.Add(1) d.logger.Error("relay send failed", "type_id", msg.TypeID, "error", err) return fmt.Errorf("send message: %w", err) } @@ -188,6 +191,8 @@ func (d *Destination) SendMessage(msg *chunk.Message) error { d.Metrics.BytesSent += uint64(len(msg.Payload)) d.Metrics.LastSentTime = time.Now() d.mu.Unlock() + metrics.RelayMessagesSent.Add(1) + metrics.RelayBytesSent.Add(int64(len(msg.Payload))) return nil } diff --git a/internal/rtmp/rpc/connect.go b/internal/rtmp/rpc/connect.go index ce691ef..dbc121b 100644 --- a/internal/rtmp/rpc/connect.go +++ b/internal/rtmp/rpc/connect.go @@ -91,12 +91,15 @@ func ParseConnectCommand(msg *chunk.Message) (*ConnectCommand, error) { // Capture any extra fields from the connect object (useful for auth tokens, // custom parameters, etc.) that we don't explicitly parse above. - extra := make(map[string]interface{}) + var extra map[string]interface{} for k, v := range obj { switch k { case "app", "flashVer", "tcUrl", "objectEncoding": continue // already extracted default: + if extra == nil { + extra = make(map[string]interface{}) + } extra[k] = v } } diff --git a/internal/rtmp/server/auth/auth.go b/internal/rtmp/server/auth/auth.go index cb324bc..feecfcd 100644 --- a/internal/rtmp/server/auth/auth.go +++ b/internal/rtmp/server/auth/auth.go @@ -60,5 +60,4 @@ type Request struct { var ( ErrUnauthorized = errors.New("authentication failed: invalid credentials") ErrTokenMissing = errors.New("authentication failed: token missing") - ErrForbidden = errors.New("authentication failed: access denied") ) diff --git a/internal/rtmp/server/auth/auth_test.go b/internal/rtmp/server/auth/auth_test.go index d60bc88..7d2c315 100644 --- a/internal/rtmp/server/auth/auth_test.go +++ b/internal/rtmp/server/auth/auth_test.go @@ -16,7 +16,6 @@ func TestSentinelErrors(t *testing.T) { }{ {"ErrUnauthorized", ErrUnauthorized}, {"ErrTokenMissing", ErrTokenMissing}, - {"ErrForbidden", ErrForbidden}, } for _, s := range sentinels { t.Run(s.name, func(t *testing.T) { diff --git a/internal/rtmp/server/helpers_test.go b/internal/rtmp/server/helpers_test.go new file mode 100644 index 0000000..a2ddfd0 --- /dev/null +++ b/internal/rtmp/server/helpers_test.go @@ -0,0 +1,37 @@ +// helpers_test.go – shared test doubles and builders for the server package tests. +// +// These helpers are used across publish_handler_test.go, play_handler_test.go, +// and other server tests to avoid duplicating stub types and message builders. +package server + +import ( + "github.com/alxayo/go-rtmp/internal/rtmp/amf" + "github.com/alxayo/go-rtmp/internal/rtmp/chunk" + "github.com/alxayo/go-rtmp/internal/rtmp/rpc" +) + +// stubConn captures the last message sent via SendMessage for assertions. +type stubConn struct{ last *chunk.Message } + +func (s *stubConn) SendMessage(m *chunk.Message) error { s.last = m; return nil } + +// capturingConn records every message sent via SendMessage. +type capturingConn struct{ sent []*chunk.Message } + +func (c *capturingConn) SendMessage(m *chunk.Message) error { c.sent = append(c.sent, m); return nil } + +// stubPublisher is a minimal placeholder to mark a stream as published. +type stubPublisher struct{} + +// buildPublishMessage builds a minimal AMF0 "publish" command message +// for the given stream name. +func buildPublishMessage(streamName string) *chunk.Message { + payload, _ := amf.EncodeAll("publish", float64(0), nil, streamName, "live") + return &chunk.Message{TypeID: rpc.CommandMessageAMF0TypeIDForTest(), Payload: payload, MessageLength: uint32(len(payload)), MessageStreamID: 1} +} + +// buildPlayMessage constructs a minimal AMF0 "play" command message. +func buildPlayMessage(streamName string) *chunk.Message { + payload, _ := amf.EncodeAll("play", float64(0), nil, streamName) + return &chunk.Message{TypeID: rpc.CommandMessageAMF0TypeIDForTest(), Payload: payload, MessageLength: uint32(len(payload)), MessageStreamID: 1} +} diff --git a/internal/rtmp/server/hooks/manager.go b/internal/rtmp/server/hooks/manager.go index bc75ebf..5eb6cab 100644 --- a/internal/rtmp/server/hooks/manager.go +++ b/internal/rtmp/server/hooks/manager.go @@ -97,8 +97,13 @@ func (hm *HookManager) TriggerEvent(ctx context.Context, event Event) { // Get hooks for this event type hm.mu.RLock() - hooks := make([]Hook, len(hm.hooks[event.Type])) - copy(hooks, hm.hooks[event.Type]) + registered := hm.hooks[event.Type] + extra := 0 + if hm.stdioHook != nil { + extra = 1 + } + hooks := make([]Hook, len(registered), len(registered)+extra) + copy(hooks, registered) hm.mu.RUnlock() // Add stdio hook if enabled diff --git a/internal/rtmp/server/media_logger.go b/internal/rtmp/server/media_logger.go index 7bcb8e9..d7bf542 100644 --- a/internal/rtmp/server/media_logger.go +++ b/internal/rtmp/server/media_logger.go @@ -14,6 +14,7 @@ import ( "github.com/alxayo/go-rtmp/internal/rtmp/chunk" "github.com/alxayo/go-rtmp/internal/rtmp/media" + "github.com/alxayo/go-rtmp/internal/rtmp/metrics" ) // MediaLogger tracks and logs media packet statistics for a connection. @@ -87,9 +88,11 @@ func (ml *MediaLogger) ProcessMessage(msg *chunk.Message) { // Update counters ml.totalBytes += uint64(len(msg.Payload)) + metrics.BytesIngested.Add(int64(len(msg.Payload))) if msg.TypeID == 8 { ml.audioCount++ + metrics.MessagesAudio.Add(1) // Detect audio codec on first packet if ml.audioCodec == "" && len(msg.Payload) > 0 { if am, err := media.ParseAudioMessage(msg.Payload); err == nil { @@ -101,6 +104,7 @@ func (ml *MediaLogger) ProcessMessage(msg *chunk.Message) { } } else if msg.TypeID == 9 { ml.videoCount++ + metrics.MessagesVideo.Add(1) // Detect video codec on first packet if ml.videoCodec == "" && len(msg.Payload) > 0 { if vm, err := media.ParseVideoMessage(msg.Payload); err == nil { diff --git a/internal/rtmp/server/play_handler_test.go b/internal/rtmp/server/play_handler_test.go index 3e098be..58cac82 100644 --- a/internal/rtmp/server/play_handler_test.go +++ b/internal/rtmp/server/play_handler_test.go @@ -7,32 +7,16 @@ // 4. If not found: sends onStatus Play.StreamNotFound. // // Key Go concepts: -// - capturingConn: records all sent messages in a slice for ordering assertions. -// - stubPublisher: minimal type to mark a stream as having a publisher. +// - capturingConn (helpers_test.go): records all sent messages. +// - stubPublisher (helpers_test.go): marks a stream as published. package server import ( "testing" "github.com/alxayo/go-rtmp/internal/rtmp/amf" - "github.com/alxayo/go-rtmp/internal/rtmp/chunk" - "github.com/alxayo/go-rtmp/internal/rtmp/rpc" ) -// capturingConn records every message sent via SendMessage. -type capturingConn struct{ sent []*chunk.Message } - -func (c *capturingConn) SendMessage(m *chunk.Message) error { c.sent = append(c.sent, m); return nil } - -// buildPlayMessage constructs a minimal AMF0 "play" command message. -func buildPlayMessage(streamName string) *chunk.Message { - payload, _ := amf.EncodeAll("play", float64(0), nil, streamName) - return &chunk.Message{TypeID: rpc.CommandMessageAMF0TypeIDForTest(), Payload: payload, MessageLength: uint32(len(payload)), MessageStreamID: 1} -} - -// stubPublisher is a minimal placeholder to mark a stream as published. -type stubPublisher struct{} - // TestHandlePlaySuccess creates a stream with a publisher, then plays it. // Expects 2 messages sent (StreamBegin + onStatus Play.Start) and 1 subscriber. func TestHandlePlaySuccess(t *testing.T) { diff --git a/internal/rtmp/server/publish_handler.go b/internal/rtmp/server/publish_handler.go index e8df508..e172093 100644 --- a/internal/rtmp/server/publish_handler.go +++ b/internal/rtmp/server/publish_handler.go @@ -14,11 +14,13 @@ import ( rtmperrors "github.com/alxayo/go-rtmp/internal/errors" "github.com/alxayo/go-rtmp/internal/rtmp/chunk" + "github.com/alxayo/go-rtmp/internal/rtmp/metrics" "github.com/alxayo/go-rtmp/internal/rtmp/rpc" ) // sender is the minimal interface required from a connection for this task. -// *conn.Connection satisfies it. We keep it tiny so tests can use a stub. +// The connection type from internal/rtmp/conn satisfies it. We keep it tiny +// so tests can use a stub. type sender interface { SendMessage(*chunk.Message) error } @@ -76,6 +78,7 @@ func PublisherDisconnected(reg *Registry, streamKey string, pub sender) { s.mu.Lock() if s.Publisher == pub { s.Publisher = nil + metrics.PublishersActive.Add(-1) } s.mu.Unlock() } diff --git a/internal/rtmp/server/publish_handler_test.go b/internal/rtmp/server/publish_handler_test.go index 6386b06..5452b9d 100644 --- a/internal/rtmp/server/publish_handler_test.go +++ b/internal/rtmp/server/publish_handler_test.go @@ -7,7 +7,7 @@ // 4. Sends an "onStatus" NetStream.Publish.Start response. // // Key Go concepts: -// - stubConn: captures the last message sent, simulating a real connection. +// - stubConn (helpers_test.go): captures the last message sent. // - AMF decode of the onStatus payload to verify the response code. package server @@ -15,22 +15,8 @@ import ( "testing" "github.com/alxayo/go-rtmp/internal/rtmp/amf" - "github.com/alxayo/go-rtmp/internal/rtmp/chunk" - "github.com/alxayo/go-rtmp/internal/rtmp/rpc" ) -// stubConn captures the last message sent via SendMessage for assertions. -type stubConn struct{ last *chunk.Message } - -func (s *stubConn) SendMessage(m *chunk.Message) error { s.last = m; return nil } - -// buildPublishMessage builds a minimal AMF0 "publish" command message -// for the given stream name. -func buildPublishMessage(streamName string) *chunk.Message { - payload, _ := amf.EncodeAll("publish", float64(0), nil, streamName, "live") - return &chunk.Message{TypeID: rpc.CommandMessageAMF0TypeIDForTest(), Payload: payload, MessageLength: uint32(len(payload)), MessageStreamID: 1} -} - // TestHandlePublishSuccess publishes a stream and verifies: // the stream is registered, the publisher is set, and the onStatus // response contains NetStream.Publish.Start. @@ -123,3 +109,21 @@ func TestHandlePublishWithQueryParams(t *testing.T) { t.Fatalf("stream should NOT be registered with query params in key") } } + +// TestHandlePublishNilArgs verifies that HandlePublish returns an error +// when called with nil arguments rather than panicking. +func TestHandlePublishNilArgs(t *testing.T) { + reg := NewRegistry() + sc := &stubConn{} + msg := buildPublishMessage("test") + + if _, err := HandlePublish(nil, sc, "app", msg); err == nil { + t.Fatal("expected error for nil registry") + } + if _, err := HandlePublish(reg, nil, "app", msg); err == nil { + t.Fatal("expected error for nil conn") + } + if _, err := HandlePublish(reg, sc, "app", nil); err == nil { + t.Fatal("expected error for nil message") + } +} diff --git a/internal/rtmp/server/registry.go b/internal/rtmp/server/registry.go index c697a39..5ad43b3 100644 --- a/internal/rtmp/server/registry.go +++ b/internal/rtmp/server/registry.go @@ -19,6 +19,7 @@ import ( "github.com/alxayo/go-rtmp/internal/rtmp/chunk" "github.com/alxayo/go-rtmp/internal/rtmp/media" + "github.com/alxayo/go-rtmp/internal/rtmp/metrics" ) // ErrPublisherExists is returned when trying to set a second publisher. @@ -80,6 +81,7 @@ func (r *Registry) CreateStream(key string) (*Stream, bool) { } s := &Stream{Key: key, StartTime: time.Now(), Subscribers: make([]media.Subscriber, 0)} r.streams[key] = s + metrics.StreamsActive.Add(1) return s, true } @@ -99,6 +101,7 @@ func (r *Registry) DeleteStream(key string) bool { defer r.mu.Unlock() if _, ok := r.streams[key]; ok { delete(r.streams, key) + metrics.StreamsActive.Add(-1) return true } return false @@ -115,6 +118,8 @@ func (s *Stream) SetPublisher(pub interface{}) error { return ErrPublisherExists } s.Publisher = pub + metrics.PublishersActive.Add(1) + metrics.PublishersTotal.Add(1) return nil } @@ -125,6 +130,8 @@ func (s *Stream) AddSubscriber(sub media.Subscriber) { } s.mu.Lock() s.Subscribers = append(s.Subscribers, sub) + metrics.SubscribersActive.Add(1) + metrics.SubscribersTotal.Add(1) s.mu.Unlock() } @@ -144,6 +151,7 @@ func (s *Stream) RemoveSubscriber(sub media.Subscriber) { s.Subscribers[i] = s.Subscribers[last] s.Subscribers[last] = nil s.Subscribers = s.Subscribers[:last] + metrics.SubscribersActive.Add(-1) break } } diff --git a/internal/rtmp/server/server.go b/internal/rtmp/server/server.go index c133ae0..53ed624 100644 --- a/internal/rtmp/server/server.go +++ b/internal/rtmp/server/server.go @@ -18,6 +18,7 @@ import ( "github.com/alxayo/go-rtmp/internal/logger" "github.com/alxayo/go-rtmp/internal/rtmp/client" iconn "github.com/alxayo/go-rtmp/internal/rtmp/conn" + "github.com/alxayo/go-rtmp/internal/rtmp/metrics" "github.com/alxayo/go-rtmp/internal/rtmp/relay" "github.com/alxayo/go-rtmp/internal/rtmp/server/auth" "github.com/alxayo/go-rtmp/internal/rtmp/server/hooks" @@ -173,6 +174,8 @@ func (s *Server) acceptLoop() { s.mu.Lock() s.conns[c.ID()] = c s.mu.Unlock() + metrics.ConnectionsActive.Add(1) + metrics.ConnectionsTotal.Add(1) s.log.Info("connection registered", "conn_id", c.ID(), "remote", raw.RemoteAddr().String()) // Trigger connection accept hook event @@ -265,6 +268,7 @@ func (s *Server) RemoveConnection(id string) { s.mu.Lock() delete(s.conns, id) s.mu.Unlock() + metrics.ConnectionsActive.Add(-1) } // singleConnListener wraps a single pre-accepted net.Conn as a net.Listener. diff --git a/specs/006-expvar-metrics/plan.md b/specs/006-expvar-metrics/plan.md new file mode 100644 index 0000000..2243b07 --- /dev/null +++ b/specs/006-expvar-metrics/plan.md @@ -0,0 +1,358 @@ +# Feature 006: Expvar Metrics — Implementation Plan + +**Branch**: `feature/006-expvar-metrics` +**Date**: 2026-03-04 + +## Task Dependency Graph + +``` +T001 (metrics package) + ├──► T002 (CLI flag + HTTP listener) + ├──► T003 (instrument connections) + ├──► T004 (instrument registry) + │ └──► T005 (instrument publisher disconnect) + ├──► T006 (instrument media logger) + └──► T007 (instrument relay) + └──► T008 (integration test) +``` + +T001 is the foundation — all other tasks depend on it. T003–T007 are +independent of each other and can be implemented in any order. T008 depends +on all instrumentation being complete. + +--- + +## Tasks + +### T001: Create `internal/rtmp/metrics` package + +**Priority**: 1 (blocking) +**Estimated complexity**: Low +**Files**: +- `internal/rtmp/metrics/metrics.go` — Create new +- `internal/rtmp/metrics/metrics_test.go` — Create new + +**What to do**: +1. Create package with all `expvar.Int` declarations (14 counters) +2. Register `expvar.Func` for `rtmp_uptime_seconds` and `rtmp_server_info` +3. Write tests: + - All counters initialize to 0 + - `Add(1)` / `Add(-1)` work correctly for gauge-style metrics + - `rtmp_uptime_seconds` returns `> 0` + - `rtmp_server_info` returns map with `go_version` key + - Counters appear in `expvar.Handler` JSON output + +**Acceptance criteria**: +- `go test -race ./internal/rtmp/metrics/...` passes +- All 14 `rtmp_*` variables registered and queryable + +**Commit**: `feat(metrics): add expvar metrics package with counter declarations` + +--- + +### T002: Add `-metrics-addr` CLI flag and HTTP listener + +**Priority**: 2 +**Estimated complexity**: Low +**Files**: +- `cmd/rtmp-server/flags.go` — Edit +- `cmd/rtmp-server/main.go` — Edit + +**What to do**: +1. Add `metricsAddr string` to `cliConfig` struct +2. Register `-metrics-addr` flag with `fs.StringVar`, default `""` (disabled) +3. In `main()`, after `server.Start()`, if `metricsAddr != ""`: + - Import `net/http` and blank-import `expvar` for handler registration + - Start `http.ListenAndServe(cfg.metricsAddr, nil)` in a goroutine + - Log the metrics address on startup +4. On shutdown signal, no special cleanup needed (HTTP server stops with process) + +**Key decisions**: +- Use `http.DefaultServeMux` — `expvar` auto-registers on it +- Run HTTP server in a goroutine — non-blocking, dies with process +- No TLS or auth — this is an internal debug endpoint + +**Acceptance criteria**: +- `./rtmp-server -metrics-addr :8080` starts HTTP listener +- `curl http://localhost:8080/debug/vars` returns valid JSON with runtime vars +- Default (no flag) = no HTTP listener started +- `./rtmp-server -h` shows the new flag + +**Commit**: `feat(metrics): add -metrics-addr CLI flag and HTTP metrics endpoint` + +--- + +### T003: Instrument connection lifecycle in server.go + +**Priority**: 3 +**Estimated complexity**: Low +**Files**: +- `internal/rtmp/server/server.go` — Edit + +**What to do**: +1. Add `import "github.com/alxayo/go-rtmp/internal/rtmp/metrics"` +2. In `acceptLoop()`, after line `s.conns[c.ID()] = c` (inside `s.mu.Lock()` block): + ```go + metrics.ConnectionsActive.Add(1) + metrics.ConnectionsTotal.Add(1) + ``` +3. In `RemoveConnection()`, after `delete(s.conns, id)`: + ```go + metrics.ConnectionsActive.Add(-1) + ``` + +**Edge cases**: +- `RemoveConnection` is called exactly once per connection (from disconnect handler) +- The `Stop()` method calls `clear(s.conns)` but doesn't go through + `RemoveConnection` — this is fine because metrics don't need to be accurate + during shutdown (the process is ending) + +**Acceptance criteria**: +- Accept 3 connections → `ConnectionsActive == 3`, `ConnectionsTotal == 3` +- Disconnect 1 → `ConnectionsActive == 2`, `ConnectionsTotal == 3` +- `go test -race ./internal/rtmp/server/...` passes + +**Commit**: `feat(metrics): instrument connection accept and remove` + +--- + +### T004: Instrument stream registry + +**Priority**: 3 +**Estimated complexity**: Low +**Files**: +- `internal/rtmp/server/registry.go` — Edit + +**What to do**: +1. Add `import "github.com/alxayo/go-rtmp/internal/rtmp/metrics"` +2. In `CreateStream()`, after the new stream is inserted into the map + (the `created == true` path, after `r.streams[key] = s`): + ```go + metrics.StreamsActive.Add(1) + ``` +3. In `DeleteStream()`, after `delete(r.streams, key)` inside the `if ok` block: + ```go + metrics.StreamsActive.Add(-1) + ``` +4. In `SetPublisher()`, after `s.Publisher = pub`: + ```go + metrics.PublishersActive.Add(1) + metrics.PublishersTotal.Add(1) + ``` +5. In `AddSubscriber()`, after `s.Subscribers = append(...)`: + ```go + metrics.SubscribersActive.Add(1) + metrics.SubscribersTotal.Add(1) + ``` +6. In `RemoveSubscriber()`, inside the `if existing == sub` block, + after the swap-delete: + ```go + metrics.SubscribersActive.Add(-1) + ``` + +**Edge cases**: +- `CreateStream` has a double-check pattern (fast RLock read, then Lock write). + Only increment when `created == true` (second return value). +- `RemoveSubscriber` only decrements when a match is actually found. +- `SetPublisher` returns `ErrPublisherExists` without incrementing when + a publisher is already set. + +**Acceptance criteria**: +- Create 2 streams → `StreamsActive == 2` +- Delete 1 → `StreamsActive == 1` +- Set publisher → `PublishersActive == 1`, `PublishersTotal == 1` +- Add 3 subs → `SubscribersActive == 3` +- Remove 1 sub → `SubscribersActive == 2`, `SubscribersTotal == 3` +- `go test -race ./internal/rtmp/server/...` passes + +**Commit**: `feat(metrics): instrument stream registry (streams, publishers, subscribers)` + +--- + +### T005: Instrument publisher disconnect + +**Priority**: 4 (depends on T004) +**Estimated complexity**: Low +**Files**: +- `internal/rtmp/server/publish_handler.go` — Edit + +**What to do**: +1. Add `import "github.com/alxayo/go-rtmp/internal/rtmp/metrics"` +2. In `PublisherDisconnected()`, inside the `if s.Publisher == pub` block, + after `s.Publisher = nil`: + ```go + metrics.PublishersActive.Add(-1) + ``` + +**Edge cases**: +- Only decrement when `s.Publisher == pub` — if a different publisher took + over, the old one's disconnect should not affect the counter. +- `PublisherDisconnected` is called from the connection's `onDisconnect` + handler, which fires exactly once when the readLoop exits. + +**Acceptance criteria**: +- Publish → `PublishersActive == 1` +- Disconnect publisher → `PublishersActive == 0` +- `PublishersTotal` remains unchanged +- `go test -race ./internal/rtmp/server/...` passes + +**Commit**: `feat(metrics): instrument publisher disconnect` + +--- + +### T006: Instrument media logger + +**Priority**: 3 +**Estimated complexity**: Low +**Files**: +- `internal/rtmp/server/media_logger.go` — Edit + +**What to do**: +1. Add `import "github.com/alxayo/go-rtmp/internal/rtmp/metrics"` +2. In `ProcessMessage()`, after the existing counter increments + (`ml.audioCount++` or `ml.videoCount++`), add: + ```go + metrics.BytesIngested.Add(int64(len(msg.Payload))) + ``` +3. In the `if msg.TypeID == 8` audio branch, after `ml.audioCount++`: + ```go + metrics.MessagesAudio.Add(1) + ``` +4. In the `else` (video) path, after `ml.videoCount++` (for TypeID == 9): + ```go + metrics.MessagesVideo.Add(1) + ``` + +**Placement note**: The `ProcessMessage` method is already guarded by +`ml.mu.Lock()`. The `expvar.Int.Add()` call is atomic and does not need +this lock — it works correctly whether called inside or outside the lock. +We place it inside the existing lock scope to keep the code co-located +with the existing counter increments. + +**Acceptance criteria**: +- Feed 10 audio + 5 video messages → `MessagesAudio == 10`, `MessagesVideo == 5` +- `BytesIngested == sum of all payload sizes` +- `go test -race ./internal/rtmp/server/...` passes + +**Commit**: `feat(metrics): instrument media logger for audio/video/byte counters` + +--- + +### T007: Instrument relay destination + +**Priority**: 3 +**Estimated complexity**: Low +**Files**: +- `internal/rtmp/relay/destination.go` — Edit + +**What to do**: +1. Add `import "github.com/alxayo/go-rtmp/internal/rtmp/metrics"` +2. In `SendMessage()`, in the "not connected" early-return path, + after `d.Metrics.MessagesDropped++`: + ```go + metrics.RelayMessagesDropped.Add(1) + ``` +3. In `SendMessage()`, in the send-error path, + after `d.Metrics.MessagesDropped++`: + ```go + metrics.RelayMessagesDropped.Add(1) + ``` +4. In `SendMessage()`, in the success path, + after `d.Metrics.MessagesSent++` and `d.Metrics.BytesSent += ...`: + ```go + metrics.RelayMessagesSent.Add(1) + metrics.RelayBytesSent.Add(int64(len(msg.Payload))) + ``` + +**Note**: The expvar counters are global aggregates across all destinations. +Per-destination metrics remain available via the existing +`DestinationManager.GetMetrics()` method. The expvar counters provide a +quick global overview without needing to enumerate destinations. + +**Acceptance criteria**: +- Send 5 messages successfully → `RelayMessagesSent == 5` +- Force 2 drops → `RelayMessagesDropped == 2` +- `RelayBytesSent == sum of successful payload sizes` +- `go test -race ./internal/rtmp/relay/...` passes + +**Commit**: `feat(metrics): instrument relay destination send/drop counters` + +--- + +### T008: Integration test — metrics HTTP endpoint + +**Priority**: 5 (depends on all above) +**Estimated complexity**: Medium +**Files**: +- `tests/integration/metrics_test.go` — Create new + +**What to do**: +1. Start a server with `-metrics-addr` on port 0 (OS-assigned) +2. HTTP GET `/debug/vars` → assert 200 +3. Parse JSON response +4. Assert all 14 `rtmp_*` keys exist +5. Assert initial gauge values are 0 +6. Assert `rtmp_uptime_seconds > 0` +7. Assert `rtmp_server_info` contains `go_version` + +**Acceptance criteria**: +- `go test -race ./tests/integration/ -run TestMetrics -count=1` passes +- No flaky timing issues (uptime check uses `> 0`, not exact value) + +**Commit**: `test(metrics): add integration test for metrics HTTP endpoint` + +--- + +## Execution Order + +| Step | Task | Description | +|------|------|-------------| +| 1 | T001 | Create metrics package (foundation) | +| 2 | T002 | CLI flag + HTTP listener | +| 3 | T003 | Instrument connections | +| 4 | T004 | Instrument registry | +| 5 | T005 | Instrument publisher disconnect | +| 6 | T006 | Instrument media logger | +| 7 | T007 | Instrument relay | +| 8 | T008 | Integration test | + +Each task gets its own commit. This allows easy bisection and clean review. + +--- + +## Test Strategy + +### Unit Tests (per task) +Each instrumentation point gets tested by: +1. Resetting relevant expvar counters (via `Set(0)`) to isolate tests +2. Performing the action that should increment +3. Asserting the expected counter value + +### Integration Test (T008) +Full HTTP endpoint test that validates the complete pipeline: server → expvar → +HTTP → JSON → all expected keys present. + +### Race Detection +All tests run with `-race` to verify the atomic operations are correct. + +### Manual Verification +```bash +go build -o rtmp-server.exe ./cmd/rtmp-server +./rtmp-server.exe -listen :1935 -metrics-addr :8080 -log-level debug +# In another terminal: +ffmpeg -re -i test.mp4 -c copy -f flv rtmp://localhost:1935/live/test +# In another terminal: +curl -s http://localhost:8080/debug/vars | jq . | grep rtmp_ +``` + +--- + +## Risk Assessment + +| Risk | Likelihood | Impact | Mitigation | +|------|-----------|--------|------------| +| Counter drift (inc without matching dec) | Low | Low | Each inc/dec pair is co-located; reviewed in T003-T005 | +| Race conditions on expvar access | Very Low | Medium | `expvar.Int` uses `sync/atomic` internally | +| HTTP listener port conflict | Low | Low | Disabled by default; user chooses port | +| Performance impact on hot path | Very Low | Medium | `atomic.AddInt64` is ~1ns; negligible vs I/O | +| Shutdown metrics inaccuracy | Low | None | Metrics don't need to be accurate during shutdown | diff --git a/specs/006-expvar-metrics/spec.md b/specs/006-expvar-metrics/spec.md new file mode 100644 index 0000000..ec6b9f8 --- /dev/null +++ b/specs/006-expvar-metrics/spec.md @@ -0,0 +1,488 @@ +# Feature 006: Expvar Metrics — Live Counters for Connections, Publishers, Subscribers + +**Feature**: 006-expvar-metrics +**Status**: Draft +**Date**: 2026-03-04 +**Branch**: `feature/006-expvar-metrics` + +## Overview + +Add real-time server metrics via Go's standard `expvar` package. An optional HTTP +endpoint exposes live JSON counters for connections, publishers, subscribers, +streams, media throughput, and relay health. Zero external dependencies. + +### Design Constraints + +- **Zero external dependencies** (stdlib `expvar` + `net/http` only) +- **Opt-in**: disabled by default; enabled via `-metrics-addr` CLI flag +- **Goroutine-safe**: `expvar.Int` uses `sync/atomic` internally — no additional locking +- **Minimal footprint**: one new package, ~15 one-liner instrumentation points in existing code +- **Non-intrusive**: metrics calls never block or affect RTMP data path performance + +--- + +## What It Provides + +### Metrics Endpoint + +When enabled, an HTTP server listens on the configured address and serves: + +- `GET /debug/vars` — standard expvar JSON output (all Go runtime + RTMP counters) + +This endpoint is compatible with: +- **curl / jq** — direct JSON inspection +- **Prometheus** — via `expvar_exporter` or similar scrapers +- **Grafana** — via Prometheus or JSON data sources +- **Custom dashboards** — any HTTP/JSON consumer + +### Metrics Catalog + +| Metric Name | Type | Description | Source | +|---|---|---|---| +| `rtmp_connections_active` | Gauge | Currently open TCP connections | `Server.conns` map | +| `rtmp_connections_total` | Counter | Total connections accepted since startup | `Server.acceptLoop` | +| `rtmp_streams_active` | Gauge | Currently active publish streams | `Registry.streams` map | +| `rtmp_publishers_active` | Gauge | Currently active publishers | `Stream.SetPublisher` / `PublisherDisconnected` | +| `rtmp_publishers_total` | Counter | Total publish sessions since startup | `Stream.SetPublisher` | +| `rtmp_subscribers_active` | Gauge | Currently active subscribers | `Stream.AddSubscriber` / `RemoveSubscriber` | +| `rtmp_subscribers_total` | Counter | Total play sessions since startup | `Stream.AddSubscriber` | +| `rtmp_messages_audio` | Counter | Total audio messages received | `MediaLogger.ProcessMessage` | +| `rtmp_messages_video` | Counter | Total video messages received | `MediaLogger.ProcessMessage` | +| `rtmp_bytes_ingested` | Counter | Total media bytes received (audio + video) | `MediaLogger.ProcessMessage` | +| `rtmp_relay_messages_sent` | Counter | Total relay messages delivered | `Destination.SendMessage` | +| `rtmp_relay_messages_dropped` | Counter | Total relay messages dropped | `Destination.SendMessage` | +| `rtmp_relay_bytes_sent` | Counter | Total relay bytes transmitted | `Destination.SendMessage` | +| `rtmp_uptime_seconds` | Gauge (func) | Seconds since server start | Computed on each request | +| `rtmp_server_info` | Map (func) | Static info: listen addr, Go version | Computed on each request | + +**Gauge** = value goes up and down (active counts). +**Counter** = monotonically increasing (totals). +Both are implemented as `expvar.Int` (atomic int64). `Add(1)` to increment, `Add(-1)` to decrement gauges. + +--- + +## Protocol / API + +### HTTP Response Format + +`GET /debug/vars` returns JSON (standard expvar format): + +```json +{ + "cmdline": ["./rtmp-server", "-listen", ":1935", "-metrics-addr", ":8080"], + "memstats": { ... }, + "rtmp_connections_active": 3, + "rtmp_connections_total": 47, + "rtmp_streams_active": 1, + "rtmp_publishers_active": 1, + "rtmp_publishers_total": 5, + "rtmp_subscribers_active": 2, + "rtmp_subscribers_total": 12, + "rtmp_messages_audio": 145023, + "rtmp_messages_video": 87412, + "rtmp_bytes_ingested": 523948172, + "rtmp_relay_messages_sent": 87410, + "rtmp_relay_messages_dropped": 2, + "rtmp_relay_bytes_sent": 261974086, + "rtmp_uptime_seconds": 3847, + "rtmp_server_info": { + "listen_addr": ":1935", + "go_version": "go1.21.0" + } +} +``` + +### CLI Usage + +```bash +# Start with metrics disabled (default — no change to existing behavior) +./rtmp-server -listen :1935 + +# Start with metrics enabled on port 8080 +./rtmp-server -listen :1935 -metrics-addr :8080 + +# Start with metrics on localhost only (production hardening) +./rtmp-server -listen :1935 -metrics-addr 127.0.0.1:8080 + +# Query live metrics +curl -s http://localhost:8080/debug/vars | jq '{ + connections: .rtmp_connections_active, + publishers: .rtmp_publishers_active, + subscribers: .rtmp_subscribers_active, + streams: .rtmp_streams_active, + uptime: .rtmp_uptime_seconds +}' +``` + +--- + +## Architecture + +### New Package + +``` +internal/rtmp/metrics/ +├── metrics.go # Package-level expvar variable declarations, init, helper funcs +└── metrics_test.go # Unit tests for counter behavior +``` + +The metrics package is a thin, stateless wrapper around `expvar`. It declares +package-level variables and provides no business logic — the instrumentation +is done at the call sites in existing packages. + +### Modified Files + +| File | Change | +|------|--------| +| `internal/rtmp/server/server.go` | Instrument `acceptLoop` (connection accepted), `RemoveConnection` (connection removed) | +| `internal/rtmp/server/registry.go` | Instrument `CreateStream`, `DeleteStream`, `SetPublisher`, `AddSubscriber`, `RemoveSubscriber` | +| `internal/rtmp/server/publish_handler.go` | Instrument `PublisherDisconnected` | +| `internal/rtmp/server/media_logger.go` | Instrument `ProcessMessage` for audio/video/byte counters | +| `internal/rtmp/relay/destination.go` | Instrument `SendMessage` for relay sent/dropped/bytes | +| `cmd/rtmp-server/flags.go` | Add `-metrics-addr` flag | +| `cmd/rtmp-server/main.go` | Start HTTP metrics server when flag is set | + +### Data Flow + +``` + ┌─────────────────────┐ + RTMP Client ──TCP──► │ RTMP Server │ + │ │ + acceptLoop ─────────► │ metrics.Connections │◄── expvar.Int (atomic) + registry ─────────► │ metrics.Streams │◄── expvar.Int (atomic) + SetPublisher ───────► │ metrics.Publishers │◄── expvar.Int (atomic) + AddSubscriber ──────► │ metrics.Subscribers │◄── expvar.Int (atomic) + ProcessMessage ─────► │ metrics.Messages │◄── expvar.Int (atomic) + SendMessage ───────► │ metrics.Relay │◄── expvar.Int (atomic) + │ │ + └─────────┬────────────┘ + │ + HTTP :8080│ /debug/vars + ▼ + ┌─────────────────────┐ + │ JSON Response │ + │ (curl / Prometheus) │ + └─────────────────────┘ +``` + +All instrumentation points call `expvar.Int.Add()` which is a single atomic +operation — no mutexes, no allocations, no blocking. The RTMP data path +remains unaffected. + +--- + +## Detailed Design + +### T001: Create metrics package with expvar declarations + +**File**: `internal/rtmp/metrics/metrics.go` + +```go +package metrics + +import ( + "expvar" + "runtime" + "time" +) + +// Server start time — set once during init. +var startTime time.Time + +func init() { + startTime = time.Now() +} + +// ── Connection metrics ────────────────────────────────────────────── +var ( + ConnectionsActive = expvar.NewInt("rtmp_connections_active") + ConnectionsTotal = expvar.NewInt("rtmp_connections_total") +) + +// ── Stream metrics ────────────────────────────────────────────────── +var ( + StreamsActive = expvar.NewInt("rtmp_streams_active") +) + +// ── Publisher metrics ─────────────────────────────────────────────── +var ( + PublishersActive = expvar.NewInt("rtmp_publishers_active") + PublishersTotal = expvar.NewInt("rtmp_publishers_total") +) + +// ── Subscriber metrics ────────────────────────────────────────────── +var ( + SubscribersActive = expvar.NewInt("rtmp_subscribers_active") + SubscribersTotal = expvar.NewInt("rtmp_subscribers_total") +) + +// ── Media metrics ─────────────────────────────────────────────────── +var ( + MessagesAudio = expvar.NewInt("rtmp_messages_audio") + MessagesVideo = expvar.NewInt("rtmp_messages_video") + BytesIngested = expvar.NewInt("rtmp_bytes_ingested") +) + +// ── Relay metrics ─────────────────────────────────────────────────── +var ( + RelayMessagesSent = expvar.NewInt("rtmp_relay_messages_sent") + RelayMessagesDropped = expvar.NewInt("rtmp_relay_messages_dropped") + RelayBytesSent = expvar.NewInt("rtmp_relay_bytes_sent") +) + +func init() { + // Uptime: computed on each request, always current + expvar.Publish("rtmp_uptime_seconds", expvar.Func(func() interface{} { + return int64(time.Since(startTime).Seconds()) + })) + + // Server info: static metadata + expvar.Publish("rtmp_server_info", expvar.Func(func() interface{} { + return map[string]string{ + "go_version": runtime.Version(), + } + })) +} +``` + +**Why package-level vars**: Allows any package to import `metrics` and call +`metrics.ConnectionsActive.Add(1)` without passing state around. The expvar +registry is process-global by design. + +**Tests** (`metrics_test.go`): +- Verify initial values are 0 +- Verify Add(1), Add(-1) for gauges +- Verify monotonic Add(1) for counters +- Verify uptime is > 0 +- Verify server_info returns a map with go_version + +--- + +### T002: Add `-metrics-addr` CLI flag + +**File**: `cmd/rtmp-server/flags.go` + +Add field to `cliConfig`: +```go +metricsAddr string // HTTP address for expvar/pprof (default "" = disabled) +``` + +Add flag registration: +```go +fs.StringVar(&cfg.metricsAddr, "metrics-addr", "", + "HTTP address for metrics endpoint (e.g. :8080 or 127.0.0.1:8080). Empty = disabled") +``` + +**File**: `cmd/rtmp-server/main.go` + +After `server.Start()`, start the metrics HTTP listener: +```go +if cfg.metricsAddr != "" { + go func() { + // Import expvar for side-effect: registers /debug/vars on DefaultServeMux + log.Info("metrics HTTP server listening", "addr", cfg.metricsAddr) + if err := http.ListenAndServe(cfg.metricsAddr, nil); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Error("metrics HTTP server error", "error", err) + } + }() +} +``` + +**Tests**: Verify flag parsing includes new field, verify default is empty string. + +--- + +### T003: Instrument connection lifecycle + +**File**: `internal/rtmp/server/server.go` + +In `acceptLoop`, after `s.conns[c.ID()] = c`: +```go +metrics.ConnectionsActive.Add(1) +metrics.ConnectionsTotal.Add(1) +``` + +In `RemoveConnection`, after `delete(s.conns, id)`: +```go +metrics.ConnectionsActive.Add(-1) +``` + +**Tests**: Table-driven integration test — accept N connections, verify +`ConnectionsActive == N`, disconnect one, verify `ConnectionsActive == N-1`. +Verify `ConnectionsTotal` only increases. + +--- + +### T004: Instrument stream registry + +**File**: `internal/rtmp/server/registry.go` + +In `CreateStream`, when a new stream is created (the `created == true` path): +```go +metrics.StreamsActive.Add(1) +``` + +In `DeleteStream`, after `delete(r.streams, key)`: +```go +metrics.StreamsActive.Add(-1) +``` + +In `SetPublisher`, after `s.Publisher = pub`: +```go +metrics.PublishersActive.Add(1) +metrics.PublishersTotal.Add(1) +``` + +In `AddSubscriber`, after `append`: +```go +metrics.SubscribersActive.Add(1) +metrics.SubscribersTotal.Add(1) +``` + +In `RemoveSubscriber`, after the swap-delete succeeds (inside the `if existing == sub` block): +```go +metrics.SubscribersActive.Add(-1) +``` + +**Tests**: Create stream → verify `StreamsActive == 1`. Set publisher → verify +`PublishersActive == 1`. Add 3 subscribers → verify `SubscribersActive == 3`. +Remove 1 → verify `SubscribersActive == 2`. Delete stream → verify +`StreamsActive == 0`. + +--- + +### T005: Instrument publisher disconnect + +**File**: `internal/rtmp/server/publish_handler.go` + +In `PublisherDisconnected`, after `s.Publisher = nil` (inside the `if s.Publisher == pub` block): +```go +metrics.PublishersActive.Add(-1) +``` + +**Tests**: Publish → disconnect → verify `PublishersActive == 0`, +`PublishersTotal == 1`. + +--- + +### T006: Instrument media logger + +**File**: `internal/rtmp/server/media_logger.go` + +In `ProcessMessage`, after the existing counter increments: +```go +metrics.BytesIngested.Add(int64(len(msg.Payload))) + +if msg.TypeID == 8 { + metrics.MessagesAudio.Add(1) +} else { + metrics.MessagesVideo.Add(1) +} +``` + +These calls are placed after existing `ml.audioCount++` / `ml.videoCount++` +to keep the two counter systems consistent. + +**Tests**: Feed N audio + M video messages → verify `MessagesAudio == N`, +`MessagesVideo == M`, `BytesIngested == sum(payload sizes)`. + +--- + +### T007: Instrument relay destination + +**File**: `internal/rtmp/relay/destination.go` + +In `SendMessage`, after the existing `d.Metrics.MessagesSent++`: +```go +metrics.RelayMessagesSent.Add(1) +metrics.RelayBytesSent.Add(int64(len(msg.Payload))) +``` + +In the two drop paths (not connected + send error), after `d.Metrics.MessagesDropped++`: +```go +metrics.RelayMessagesDropped.Add(1) +``` + +**Tests**: Mock client, send messages → verify `RelayMessagesSent` and +`RelayBytesSent`. Force error → verify `RelayMessagesDropped`. + +--- + +### T008: Integration test — full metrics endpoint + +**File**: `tests/integration/metrics_test.go` + +End-to-end test: +1. Start server with `-metrics-addr` on a random port +2. Verify `GET /debug/vars` returns 200 with valid JSON +3. Verify all `rtmp_*` keys are present +4. Verify initial values are 0 +5. Verify `rtmp_uptime_seconds > 0` +6. Verify `rtmp_server_info` contains `go_version` + +--- + +## Scope Boundaries + +### In Scope +- `expvar` package-level counters (atomic int64) +- HTTP endpoint for JSON metrics export +- CLI flag to enable/disable +- Per-category counters (connections, streams, publishers, subscribers, media, relay) +- Uptime and server info computed vars +- Unit tests for each instrumentation point +- Integration test for HTTP endpoint + +### Out of Scope (future enhancements) +- **Prometheus `/metrics` endpoint** — use prometheus expvar exporter instead +- **Per-stream metric breakdowns** — would require `expvar.Map` nesting +- **Histograms / percentiles** — expvar only supports counters/gauges +- **Authentication on metrics endpoint** — bind to localhost for security +- **pprof endpoint** — separate feature, could share the same HTTP listener +- **Metric retention / history** — expvar is live-only, no time series storage +- **Dashboard / UI** — consumers use their own tools (Grafana, etc.) + +--- + +## Security Considerations + +- **Metrics endpoint binds to configurable address** — users should bind to + `127.0.0.1:8080` in production to prevent external access +- **No sensitive data exposed** — only numeric counters and Go version string +- **No authentication** — standard for internal debug endpoints; network-level + access control is the appropriate mitigation +- **Disabled by default** — no listener starts unless `-metrics-addr` is explicitly set + +--- + +## Usage Examples + +### Monitor during load test +```bash +# Terminal 1: Start server with metrics +./rtmp-server -listen :1935 -metrics-addr :8080 -log-level info + +# Terminal 2: Publish a test stream +ffmpeg -re -i test.mp4 -c copy -f flv rtmp://localhost:1935/live/test + +# Terminal 3: Watch metrics live (refresh every 2s) +watch -n2 'curl -s http://localhost:8080/debug/vars | jq "{ + conns: .rtmp_connections_active, + publishers: .rtmp_publishers_active, + subscribers: .rtmp_subscribers_active, + streams: .rtmp_streams_active, + audio_msgs: .rtmp_messages_audio, + video_msgs: .rtmp_messages_video, + bytes_in: .rtmp_bytes_ingested, + uptime: .rtmp_uptime_seconds +}"' +``` + +### Alerting on subscriber count +```bash +# Simple threshold check +SUBS=$(curl -s http://localhost:8080/debug/vars | jq .rtmp_subscribers_active) +if [ "$SUBS" -gt 100 ]; then + echo "WARNING: $SUBS subscribers active" +fi +``` diff --git a/tests/integration/metrics_test.go b/tests/integration/metrics_test.go new file mode 100644 index 0000000..04ddd77 --- /dev/null +++ b/tests/integration/metrics_test.go @@ -0,0 +1,111 @@ +// Package integration – end-to-end tests for the RTMP server. +// +// metrics_test.go validates the expvar metrics HTTP endpoint. +// It starts an HTTP listener (mirroring what main.go does with -metrics-addr), +// then queries /debug/vars and verifies all rtmp_* keys are present with +// correct initial values. +package integration + +import ( + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "testing" + "time" + + _ "github.com/alxayo/go-rtmp/internal/rtmp/metrics" // Register expvar counters + srv "github.com/alxayo/go-rtmp/internal/rtmp/server" +) + +// TestMetricsEndpoint verifies the expvar HTTP endpoint serves all RTMP counters. +func TestMetricsEndpoint(t *testing.T) { + // Start RTMP server on ephemeral port + s := srv.New(srv.Config{ + ListenAddr: "127.0.0.1:0", + ChunkSize: 4096, + }) + if err := s.Start(); err != nil { + t.Fatalf("server start: %v", err) + } + defer s.Stop() + + // Start HTTP metrics server on ephemeral port (mirrors main.go behavior) + metricsLn, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("metrics listen: %v", err) + } + defer metricsLn.Close() + metricsAddr := metricsLn.Addr().String() + + httpSrv := &http.Server{Handler: http.DefaultServeMux} + go func() { _ = httpSrv.Serve(metricsLn) }() + defer httpSrv.Close() + + // Give the HTTP server a moment to start + time.Sleep(50 * time.Millisecond) + + // Query /debug/vars + resp, err := http.Get(fmt.Sprintf("http://%s/debug/vars", metricsAddr)) + if err != nil { + t.Fatalf("GET /debug/vars: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Fatalf("expected 200, got %d", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("read body: %v", err) + } + + var vars map[string]json.RawMessage + if err := json.Unmarshal(body, &vars); err != nil { + t.Fatalf("parse JSON: %v", err) + } + + // Verify all RTMP counter keys exist + expectedKeys := []string{ + "rtmp_connections_active", + "rtmp_connections_total", + "rtmp_streams_active", + "rtmp_publishers_active", + "rtmp_publishers_total", + "rtmp_subscribers_active", + "rtmp_subscribers_total", + "rtmp_messages_audio", + "rtmp_messages_video", + "rtmp_bytes_ingested", + "rtmp_relay_messages_sent", + "rtmp_relay_messages_dropped", + "rtmp_relay_bytes_sent", + "rtmp_uptime_seconds", + "rtmp_server_info", + } + for _, key := range expectedKeys { + if _, ok := vars[key]; !ok { + t.Errorf("missing key %q in /debug/vars output", key) + } + } + + // Verify uptime is >= 0 + var uptime int64 + if err := json.Unmarshal(vars["rtmp_uptime_seconds"], &uptime); err != nil { + t.Fatalf("parse uptime: %v", err) + } + if uptime < 0 { + t.Errorf("uptime should be >= 0, got %d", uptime) + } + + // Verify server_info contains go_version + var serverInfo map[string]string + if err := json.Unmarshal(vars["rtmp_server_info"], &serverInfo); err != nil { + t.Fatalf("parse server_info: %v", err) + } + if _, ok := serverInfo["go_version"]; !ok { + t.Error("server_info missing go_version key") + } +}