Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
fa23693
docs(metrics): add feature 006 spec for expvar metrics
alxayo Mar 4, 2026
de10704
docs(metrics): add implementation plan with 8 tasks
alxayo Mar 4, 2026
671f2a6
feat(metrics): add expvar metrics package with counter declarations
alxayo Mar 4, 2026
7f446c5
feat(metrics): add -metrics-addr CLI flag and HTTP metrics endpoint
alxayo Mar 4, 2026
6ae1631
feat(metrics): instrument connection accept and remove
alxayo Mar 4, 2026
5e165fd
feat(metrics): instrument stream registry (streams, publishers, subsc…
alxayo Mar 4, 2026
f6204bb
feat(metrics): instrument publisher disconnect
alxayo Mar 4, 2026
627f213
feat(metrics): instrument media logger for audio/video/byte counters
alxayo Mar 4, 2026
2d10d83
feat(metrics): instrument relay destination send/drop counters
alxayo Mar 4, 2026
a22a35d
test(metrics): add integration test for metrics HTTP endpoint
alxayo Mar 4, 2026
8eaa72e
refactor(auth): remove unused ErrForbidden sentinel error and fix cro…
alxayo Mar 4, 2026
6f6c08e
docs: add metrics package to architecture trees, CLI flags, and desig…
alxayo Mar 4, 2026
215aa96
perf(chunk): reuse scratch buffer in Writer to reduce per-chunk alloc…
alxayo Mar 4, 2026
e4b37aa
refactor: remove unused bufpool package
alxayo Mar 4, 2026
b72b83a
refactor(rpc): lazy-init ConnectCommand.Extra map
alxayo Mar 4, 2026
29e31f8
test(amf): standardize golden file helpers to use t.Helper and t.Fatalf
alxayo Mar 4, 2026
d3f722f
test(server): consolidate duplicate test doubles into helpers_test.go
alxayo Mar 4, 2026
e791988
test(amf): add t.Run for named subtests in edge case and marker tests
alxayo Mar 4, 2026
7767161
test: add boundary and nil-argument edge case tests
alxayo Mar 4, 2026
a2367fa
perf(amf): eliminate MultiReader allocations in nested value decoding
alxayo Mar 4, 2026
cf89878
test(media): add t.Run subtests and consolidate test helper
alxayo Mar 4, 2026
b808da2
perf(hooks): pre-allocate hook slice capacity for stdio hook
alxayo Mar 4, 2026
cde99b6
docs: fix stale comments and remove bufpool references
alxayo Mar 4, 2026
b97a0a4
docs: update CHANGELOG with feature/006-expvar-metrics changes
alxayo Mar 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ internal/rtmp/
│ ├── auth/ # Token-based authentication (Validator interface + backends)
│ └── hooks/ # Event hooks (webhooks, shell scripts, stdio)
├── relay/ # Multi-destination relay with late-join support
├── metrics/ # Expvar counters (connections, publishers, subscribers, media)
└── media/ # Audio/video message handling + FLV recording
```

Expand Down
29 changes: 27 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,31 @@ All notable changes to go-rtmp are documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased] — feature/006-expvar-metrics

### Added
- **Expvar metrics**: Live server counters via `expvar` (HTTP `/debug/vars` endpoint) tracking connections, publishers, subscribers, streams, audio/video messages, bytes ingested, relay stats, uptime, and Go version ([`671f2a6`](https://github.com/alxayo/rtmp-go/commit/671f2a6))
- **`-metrics-addr` CLI flag**: Optional HTTP address (e.g. `:8080`) to expose the metrics endpoint; disabled by default ([`7f446c5`](https://github.com/alxayo/rtmp-go/commit/7f446c5))
- **Metrics integration test**: End-to-end test validating `/debug/vars` HTTP endpoint serves all `rtmp_*` keys ([`a22a35d`](https://github.com/alxayo/rtmp-go/commit/a22a35d))
- **Edge case tests**: Chunk writer boundary tests (chunkSize ±1) and publish handler nil-argument tests ([`7767161`](https://github.com/alxayo/rtmp-go/commit/7767161))
- **Spec 006**: Expvar metrics feature specification and implementation plan ([`fa23693`](https://github.com/alxayo/rtmp-go/commit/fa23693))

### Changed
- **AMF0 decoding optimization**: Eliminated `io.MultiReader` allocations in nested value decoding by inlining payload reads in `decodeValueWithMarker`; new internal helpers `decodeObjectPayload`, `decodeStrictArrayPayload`, `decodeStringPayload` ([`a2367fa`](https://github.com/alxayo/rtmp-go/commit/a2367fa))
- **Chunk writer optimization**: Added reusable scratch buffer to `Writer` struct, eliminating per-chunk `make()` allocation in `writeChunk` ([`215aa96`](https://github.com/alxayo/rtmp-go/commit/215aa96))
- **RPC lazy-init**: `ConnectCommand.Extra` map only allocated when extra fields are present ([`b72b83a`](https://github.com/alxayo/rtmp-go/commit/b72b83a))
- **Hook manager optimization**: `TriggerEvent` pre-allocates hook slice capacity for stdio hook ([`b808da2`](https://github.com/alxayo/rtmp-go/commit/b808da2))
- **AMF golden helpers standardized**: All golden file test helpers use `t.Helper()` + `t.Fatalf()` consistently; removed duplicate `goldenDir` constants ([`29e31f8`](https://github.com/alxayo/rtmp-go/commit/29e31f8))
- **Server test doubles consolidated**: Moved shared stubs (`stubConn`, `capturingConn`, `stubPublisher`) into `helpers_test.go` ([`d3f722f`](https://github.com/alxayo/rtmp-go/commit/d3f722f))
- **Media test helper consolidated**: Removed duplicate `_tVidFatalf` — reuses `_tFatalf` from audio_test.go; added `t.Run` subtests to error case loops ([`cf89878`](https://github.com/alxayo/rtmp-go/commit/cf89878))
- **AMF subtests**: Added `t.Run` with named subtests to `TestNumber_EdgeCases_RoundTrip` and `TestDecodeValue_UnsupportedMarkers` ([`e791988`](https://github.com/alxayo/rtmp-go/commit/e791988))

### Removed
- **Dead `bufpool` package**: `internal/bufpool/` was implemented but never imported; removed 263 lines of dead code ([`e4b37aa`](https://github.com/alxayo/rtmp-go/commit/e4b37aa))
- **Dead `ErrForbidden` sentinel**: Auth sentinel error declared but never returned by any code path ([`8eaa72e`](https://github.com/alxayo/rtmp-go/commit/8eaa72e))

---

## [Unreleased] — feature/005-error-handling-benchmarks

### Added
Expand Down Expand Up @@ -93,7 +118,6 @@ First feature-complete release of the RTMP server. Supports end-to-end streaming
- **Connection lifecycle**: Handshake → control burst → command exchange → media streaming
- **Stream registry**: Thread-safe map of stream keys to publisher/subscriber lists
- **Structured logging**: `log/slog` with configurable levels (debug/info/warn/error)
- **Buffer pool**: Memory-efficient buffer reuse (`internal/bufpool`)
- **Domain errors**: Typed error wrappers (`HandshakeError`, `ChunkError`, `AMFError`, `ProtocolError`, `TimeoutError`)

#### Testing & Tooling
Expand Down Expand Up @@ -134,11 +158,12 @@ First feature-complete release of the RTMP server. Supports end-to-end streaming

| Branch | Spec | Description |
|--------|------|-------------|
| `feature/006-expvar-metrics` | [specs/006](specs/006-expvar-metrics/spec.md) | Expvar metrics, performance optimizations, dead code removal |
| `feature/005-error-handling-benchmarks` | [specs/005](specs/005-error-handling-benchmarks/spec.md) | Error handling, connection cleanup, TCP deadlines, performance benchmarks |
| `feature/004-token-auth` | [specs/004](specs/004-token-auth/spec.md) | Token-based stream key authentication with 4 validator backends |
| `003-multi-destination-relay` | [specs/003](specs/003-multi-destination-relay/) | Multi-destination relay to external RTMP servers |
| `T001-init-go-module` | [specs/001](specs/001-rtmp-server-implementation/spec.md) | Core RTMP server implementation (handshake through media streaming) |

[Unreleased]: https://github.com/alxayo/rtmp-go/compare/v0.1.1...feature/005-error-handling-benchmarks
[Unreleased]: https://github.com/alxayo/rtmp-go/compare/v0.1.1...feature/006-expvar-metrics
[v0.1.1]: https://github.com/alxayo/rtmp-go/compare/v0.1.0...v0.1.1
[v0.1.0]: https://github.com/alxayo/rtmp-go/releases/tag/v0.1.0
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ See [docs/getting-started.md](docs/getting-started.md) for the full guide with C
| **Media Logging** | Per-connection codec detection and bitrate stats |
| **Event Hooks** | Webhooks, shell scripts, and stdio notifications on RTMP events |
| **Authentication** | Pluggable token-based validation for publish/play (static tokens, file, webhook) |
| **Metrics** | Expvar counters for connections, publishers, subscribers, media (HTTP `/debug/vars`) |
| **Connection Cleanup** | TCP deadline enforcement (read 90s, write 30s), disconnect handlers, zombie detection |

## Architecture
Expand All @@ -64,6 +65,7 @@ internal/rtmp/
│ └── hooks/ Event hook system (webhooks, shell, stdio)
├── media/ Audio/video parsing, codec detection, FLV recording
├── relay/ Multi-destination forwarding
├── metrics/ Expvar counters for live monitoring
└── client/ Minimal test client
```

Expand Down Expand Up @@ -116,6 +118,7 @@ Integration tests in `tests/integration/` exercise the full publish → subscrib
-hook-stdio-format Stdio hook output: json | env (default disabled)
-hook-timeout Hook execution timeout (default 30s)
-hook-concurrency Max concurrent hook executions (default 10)
-metrics-addr HTTP address for metrics endpoint (e.g. :8080). Empty = disabled
-version Print version and exit
```

Expand All @@ -130,13 +133,13 @@ Integration tests in `tests/integration/` exercise the full publish → subscrib
### Recently Completed
- Enhanced error handling: disconnect handlers, TCP deadline enforcement (read 90s, write 30s), relay client cleanup
- Performance benchmarks for chunk parsing, AMF0 encoding, and array operations
- Expvar metrics: live counters for connections, publishers, subscribers, media bytes (HTTP `/debug/vars`)

### In Progress
- Fuzz testing for AMF0 and chunk parsing (bounds safety)

### Planned
- **RTMPS** — TLS/SSL encrypted connections
- **Expvar metrics** — live counters for connections, publishers, subscribers
- **Configurable backpressure** — drop or disconnect policy for slow subscribers
- **DVR / time-shift** — seek into live stream history
- **Transcoding** — server-side codec conversion (e.g. H.265 → H.264)
Expand Down
6 changes: 6 additions & 0 deletions cmd/rtmp-server/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type cliConfig struct {
hookTimeout string // hook execution timeout (e.g. "30s")
hookConcurrency int // max concurrent hook executions

// Metrics
metricsAddr string // HTTP address for expvar metrics (e.g. ":8080"); empty = disabled

// Authentication
authMode string // "none", "token", "file", "callback"
authTokens []string // "streamKey=token" pairs (for mode=token)
Expand Down Expand Up @@ -62,6 +65,9 @@ func parseFlags(args []string) (*cliConfig, error) {
fs.StringVar(&cfg.hookTimeout, "hook-timeout", "30s", "Hook execution timeout")
fs.IntVar(&cfg.hookConcurrency, "hook-concurrency", 10, "Max concurrent hook executions")

// Metrics
fs.StringVar(&cfg.metricsAddr, "metrics-addr", "", "HTTP address for metrics endpoint (e.g. :8080 or 127.0.0.1:8080). Empty = disabled")

// Authentication flags
fs.StringVar(&cfg.authMode, "auth-mode", "none", "Authentication mode: none|token|file|callback")
fs.Var(&authTokens, "auth-token", `Stream token: "streamKey=token" (repeatable, for -auth-mode=token)`)
Expand Down
13 changes: 13 additions & 0 deletions cmd/rtmp-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package main

import (
"context"
_ "expvar" // Register /debug/vars handler on DefaultServeMux
"fmt"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"

"github.com/alxayo/go-rtmp/internal/logger"
_ "github.com/alxayo/go-rtmp/internal/rtmp/metrics" // Register expvar RTMP counters
srv "github.com/alxayo/go-rtmp/internal/rtmp/server"
"github.com/alxayo/go-rtmp/internal/rtmp/server/auth"
)
Expand Down Expand Up @@ -62,6 +65,16 @@ func main() {

log.Info("server started", "addr", server.Addr().String(), "version", version, "auth_mode", cfg.authMode)

// Start HTTP metrics server if configured
if cfg.metricsAddr != "" {
go func() {
log.Info("metrics HTTP server listening", "addr", cfg.metricsAddr)
if err := http.ListenAndServe(cfg.metricsAddr, nil); err != nil && err != http.ErrServerClosed {
log.Error("metrics HTTP server error", "error", err)
}
}()
}

// If using file-based auth, listen for SIGHUP to reload the token file
if cfg.authMode == "file" {
if fv, ok := authValidator.(*auth.FileValidator); ok {
Expand Down
2 changes: 1 addition & 1 deletion docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ Each incoming media message is routed through three paths:
| `internal/rtmp/server/hooks` | Event notification (webhooks, shell, stdio) | `HookManager`, `Event`, `Hook` |
| `internal/rtmp/media` | Audio/video parsing, codec detection, FLV recording | `Recorder`, `CodecDetector`, `Stream` |
| `internal/rtmp/relay` | Multi-destination relay to external servers | `DestinationManager`, `Destination` |
| `internal/rtmp/metrics` | Expvar counters for live monitoring | `ConnectionsActive`, `ConnectionsTotal`, `BytesIngested` |
| `internal/rtmp/client` | Minimal RTMP client for testing | `Client` |
| `internal/bufpool` | Memory pool for chunk buffers | `Pool` |
| `internal/errors` | Domain-specific error types | `ProtocolError`, `ChunkError`, `AMFError` |
| `internal/logger` | Structured logging with dynamic level | `Init()`, `Logger()`, `WithConn()` |

Expand Down
11 changes: 11 additions & 0 deletions docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ Tokens are passed by clients via URL query parameters in the stream name field (

The default mode is `none` (accept all requests), preserving backward compatibility.

### Expvar Metrics

The server uses Go's `expvar` package for live monitoring. Expvar was chosen because:

- **Zero dependencies**: part of the standard library
- **Thread-safe**: `expvar.Int` uses atomic int64 internally
- **HTTP-ready**: registers a handler on `DefaultServeMux` at `/debug/vars`
- **JSON output**: human- and machine-readable

Metrics are organized as gauges (go up and down: active connections, publishers, subscribers, streams) and counters (monotonically increasing: total connections, media messages, bytes ingested, relay stats). The HTTP endpoint is opt-in via `-metrics-addr` so it has zero overhead when disabled.

## Concurrency Model

| Resource | Protection | Why |
Expand Down
10 changes: 10 additions & 0 deletions docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ The server will forward all incoming media to the specified destinations.

Available event types: `connection_accept`, `connection_close`, `publish_start`, `play_start`, `codec_detected`, `auth_failed`.

### With Metrics

```bash
# Enable metrics endpoint on port 8080
./rtmp-server -listen :1935 -metrics-addr :8080
```

Query metrics at `http://localhost:8080/debug/vars` — returns JSON with all RTMP counters (connections, publishers, subscribers, media bytes, relay stats, uptime).

### With Authentication

```bash
Expand Down Expand Up @@ -112,6 +121,7 @@ OBS Studio: set **Server** to `rtmp://localhost:1935/live` and **Stream Key** to
| `-hook-stdio-format` | (disabled) | Stdio output format: `json` or `env` |
| `-hook-timeout` | `30s` | Hook execution timeout |
| `-hook-concurrency` | `10` | Max concurrent hook executions |
| `-metrics-addr` | (disabled) | HTTP address for metrics endpoint (e.g. `:8080`). Empty = disabled |
| `-version` | | Print version and exit |

## Test with FFmpeg
Expand Down
2 changes: 1 addition & 1 deletion docs/implementation.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ A code-level walkthrough of the go-rtmp server. Read [Architecture](architecture

```
internal/
├── bufpool/ Buffer pool to reduce garbage collection pressure
├── errors/ Typed error wrappers (HandshakeError, ChunkError, etc.)
├── logger/ Structured JSON logging with runtime level changes
└── rtmp/
Expand All @@ -21,6 +20,7 @@ internal/
│ └── hooks/ Event hooks (webhooks, shell scripts, stdio output)
├── media/ Audio/video parsing, codec detection, FLV recording
├── relay/ Multi-destination forwarding to external RTMP servers
├── metrics/ Expvar counters for connections, publishers, subscribers
└── client/ Minimal RTMP client for testing
```

Expand Down
2 changes: 0 additions & 2 deletions docs/testing-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ This runs tests across all packages. Most complete in under 15 seconds. Some pac
### Core Utilities

```bash
go test ./internal/bufpool/ # Buffer pool allocation and concurrency
go test ./internal/errors/ # Error type classification and unwrapping
go test ./internal/logger/ # Log level parsing and field extraction
```
Expand Down Expand Up @@ -161,6 +160,5 @@ See [wireshark_rtmp_capture_guide.md](wireshark_rtmp_capture_guide.md) for detai
| Play handler | `server/play_handler_test.go` | Subscriber addition, sequence header delivery |
| Media logging | `server/media_logger_test.go` | Packet counting, codec detection, stats |
| Audio/video parsing | `media/*_test.go` | Codec detection, frame type classification |
| Buffer pool | `bufpool/pool_test.go` | Allocation, reuse, concurrent access safety |
| Event hooks | `server/hooks/hooks_test.go` | Hook registration, execution, concurrency pool, cleanup |
| Integration | `tests/integration/*_test.go` | Full publish→play flow, multi-subscriber relay |
93 changes: 0 additions & 93 deletions internal/bufpool/pool.go

This file was deleted.

Loading
Loading