diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 239d75c..f0ebf4f 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -43,11 +43,11 @@ ffplay rtmp://localhost:1935/live/test # Subsc All multi-byte integers are **big-endian** except MSID in chunk headers (little-endian quirk). Use `encoding/binary.BigEndian` consistently and verify against golden vectors in `tests/golden/*.bin`. ### Concurrency Pattern -Each connection runs **one readLoop goroutine** with context cancellation. Use bounded channels for backpressure: +Each connection runs **one readLoop goroutine** with context cancellation and TCP deadline enforcement (read 90s, write 30s for zombie detection). Use bounded channels for backpressure: ```go outboundQueue := make(chan *chunk.Message, 100) // Bounded queue ``` -Protect shared state (stream registry) with `sync.RWMutex`. +Protect shared state (stream registry) with `sync.RWMutex`. On disconnect, a handler fires to clean up publisher/subscriber registrations and close relay clients. ### Error Handling Use domain-specific error wrappers from `internal/errors`: @@ -80,6 +80,7 @@ tests := []struct{ name, file string; want interface{} }{ | Control | CSID=2, MSID=0, types 1-6 | | AMF0 | Object ends with 0x00 0x00 0x09 | | Media | TypeID 8=audio, 9=video; cache sequence headers for late-join | +| Deadlines | Read 90s, Write 30s; reset on each I/O (zombie detection) | ## Key Files for Onboarding diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..2e207ad --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,144 @@ +# Changelog + +All notable changes to go-rtmp are documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [Unreleased] — feature/005-error-handling-benchmarks + +### Added +- **Disconnect handlers**: Each connection fires a cleanup callback when the read loop exits, ensuring publisher/subscriber registrations are removed and relay clients are closed ([`524281f`](https://github.com/alxayo/rtmp-go/commit/524281f)) +- **TCP deadline enforcement**: Read deadline (90s) and write deadline (30s) detect zombie connections and prevent resource leaks ([`524281f`](https://github.com/alxayo/rtmp-go/commit/524281f)) +- **Lifecycle hook events**: `EventConnectionClose`, `EventPublishStop`, `EventPlayStop`, and `EventSubscriberCount` fire on disconnect with session metadata (duration, packet counts, codecs) ([`2ed5fd2`](https://github.com/alxayo/rtmp-go/commit/2ed5fd2)) +- **Performance benchmarks**: Chunk header parsing, AMF0 number/string/object encode/decode, and strict array benchmarks ([`34058ee`](https://github.com/alxayo/rtmp-go/commit/34058ee)) +- **Registry tests**: Codec caching, subscriber removal, `BroadcastMessage` relay, and sequence header caching tests ([`fc4d3c7`](https://github.com/alxayo/rtmp-go/commit/fc4d3c7)) +- **Spec 005**: Error handling, connection cleanup & performance benchmarks specification ([`6274f77`](https://github.com/alxayo/rtmp-go/commit/6274f77)) + +### Fixed +- **Relay client leak**: Relay client connections are now properly closed when publisher disconnects ([`69365fe`](https://github.com/alxayo/rtmp-go/commit/69365fe)) +- **Server shutdown deadlock**: Server no longer hangs during shutdown when connections are active; force exit after timeout ([`92415d0`](https://github.com/alxayo/rtmp-go/commit/92415d0), [`69365fe`](https://github.com/alxayo/rtmp-go/commit/69365fe)) + +### Changed +- **Simplified `attachCommandHandling`**: Replaced variadic `srv ...*Server` parameter with direct `*Server`, removing 7 redundant nil-checks ([`919e2a9`](https://github.com/alxayo/rtmp-go/commit/919e2a9)) + +### Removed +- **Dead `Session` type**: Unused `Session` and `SessionState` types removed from `conn` package ([`524281f`](https://github.com/alxayo/rtmp-go/commit/524281f)) +- **Dead `RunCLI` function**: Speculative future code removed from `client` package ([`919e2a9`](https://github.com/alxayo/rtmp-go/commit/919e2a9)) +- **Dead `Marshal`/`Unmarshal` wrappers**: Test-only exported functions removed from `amf` package ([`919e2a9`](https://github.com/alxayo/rtmp-go/commit/919e2a9)) + +--- + +## [v0.1.1] — 2026-03-03 + +### Added +- **Token-based authentication** ([PR #4](https://github.com/alxayo/rtmp-go/pull/4)): Pluggable `auth.Validator` interface with four backends: + - `TokenValidator`: In-memory map of streamKey → token pairs (CLI flag `-auth-token`) + - `FileValidator`: JSON token file with live reload via SIGHUP (`-auth-file`) + - `CallbackValidator`: External HTTP webhook for auth decisions (`-auth-callback`) + - `AllowAllValidator`: Default mode, accepts all requests (`-auth-mode=none`) +- Authentication CLI flags: `-auth-mode`, `-auth-token`, `-auth-file`, `-auth-callback`, `-auth-callback-timeout` ([`f32a74a`](https://github.com/alxayo/rtmp-go/commit/f32a74a)) +- URL query parameter parsing for stream names: clients pass tokens via `streamName?token=secret` ([`f32a74a`](https://github.com/alxayo/rtmp-go/commit/f32a74a)) +- `EventAuthFailed` hook event when authentication is rejected ([`f32a74a`](https://github.com/alxayo/rtmp-go/commit/f32a74a)) +- Auth spec document in `specs/004-token-auth/` ([`7c1fa0f`](https://github.com/alxayo/rtmp-go/commit/7c1fa0f)) +- Definition of Done checklist (`docs/definition-of-done.md`) and post-feature review prompt ([`6b3e096`](https://github.com/alxayo/rtmp-go/commit/6b3e096)) + +### Changed +- Query parameters are stripped from stream keys before registry operations (e.g., `live/stream?token=x` → `live/stream`) ([`f32a74a`](https://github.com/alxayo/rtmp-go/commit/f32a74a)) + +### Fixed +- Escaped quotes in Markdown code blocks across documentation ([`bef626b`](https://github.com/alxayo/rtmp-go/commit/bef626b)) +- Broken link in copilot-instructions.md ([`f92d34d`](https://github.com/alxayo/rtmp-go/commit/f92d34d)) + +--- + +## [v0.1.0] — 2025-10-18 + +First feature-complete release of the RTMP server. Supports end-to-end streaming from OBS/FFmpeg to subscribers with recording and relay capabilities. + +### Added + +#### Core RTMP Protocol +- **RTMP v3 handshake**: C0/C1/C2 ↔ S0/S1/S2 exchange with 5-second timeouts and domain-specific error types +- **Chunk streaming**: FMT 0–3 header compression, extended timestamps (≥0xFFFFFF), chunk size negotiation +- **Control messages**: Set Chunk Size, Window Acknowledgement Size, Set Peer Bandwidth, User Control (types 1–6) +- **AMF0 codec**: Number, Boolean, String, Null, Object, and Strict Array encode/decode with golden binary vector tests + +#### Command Flow +- **Command dispatcher**: Routes `connect`, `createStream`, `publish`, and `play` commands +- **Connect**: Parses application name, responds with `_result` (NetConnection.Connect.Success) +- **CreateStream**: Allocates stream IDs, responds with `_result` +- **Publish/Play**: Registers publishers and subscribers in stream registry with `onStatus` responses + +#### Media & Recording +- **Live relay**: Transparent forwarding from publishers to all subscribers +- **Sequence header caching**: H.264 SPS/PPS and AAC AudioSpecificConfig cached for late-joining subscribers +- **Codec detection**: Identifies audio (AAC, MP3, Speex) and video (H.264, H.265) from first media packets +- **FLV recording**: Automatic recording of all streams to FLV files (`-record-all`, `-record-dir` flags) +- **Media logging**: Per-connection bitrate stats and codec identification + +#### Multi-Destination Relay +- **Relay forwarding**: Forward publisher streams to external RTMP servers (`-relay-to` flag) +- **Destination manager**: Connect, monitor, and send media to multiple downstream targets +- **Metrics tracking**: Per-destination message counts, bytes sent, and error tracking + +#### Event Hooks +- **Webhook hook**: HTTP POST with JSON event payload to configured URLs +- **Shell hook**: Execute scripts with event data as environment variables +- **Stdio hook**: Print structured event data to stderr (JSON or env-var format) +- **Hook manager**: Bounded concurrency pool (default 10 workers) with configurable timeout + +#### Server Infrastructure +- **TCP listener**: Accept loop with graceful shutdown support +- **Connection lifecycle**: Handshake → control burst → command exchange → media streaming +- **Stream registry**: Thread-safe map of stream keys to publisher/subscriber lists +- **Structured logging**: `log/slog` with configurable levels (debug/info/warn/error) +- **Buffer pool**: Memory-efficient buffer reuse (`internal/bufpool`) +- **Domain errors**: Typed error wrappers (`HandshakeError`, `ChunkError`, `AMFError`, `ProtocolError`, `TimeoutError`) + +#### Testing & Tooling +- **Golden binary vectors**: Exact wire-format `.bin` files for handshake, chunk headers, AMF0, and control messages +- **Integration tests**: Full publish → subscribe round-trip tests +- **RTMP test client**: Minimal client for driving integration tests (`internal/rtmp/client`) +- **CI workflow**: Automated testing with `go build`, `go vet`, `gofmt`, and `go test` +- **Stream analysis tools**: H.264 frame analyzer, RTMP stream extractor, HLS converter + +#### CLI +- `-listen` — TCP address (default `:1935`) +- `-log-level` — debug/info/warn/error (default `info`) +- `-record-all` — Enable automatic FLV recording +- `-record-dir` — Recording directory (default `recordings`) +- `-chunk-size` — Outbound chunk size, 1–65536 (default 4096) +- `-relay-to` — RTMP relay destination URL (repeatable) +- `-hook-script` — Shell hook: `event_type=/path/to/script` (repeatable) +- `-hook-webhook` — Webhook: `event_type=https://url` (repeatable) +- `-hook-stdio-format` — Stdio output format: `json` or `env` +- `-hook-timeout` — Hook execution timeout (default 30s) +- `-hook-concurrency` — Max concurrent hook executions (default 10) +- `-version` — Print version and exit + +--- + +## Pull Requests + +| PR | Title | Branch | Status | +|----|-------|--------|--------| +| [#4](https://github.com/alxayo/rtmp-go/pull/4) | Token-based authentication | `feature/004-token-auth` | Merged | +| [#3](https://github.com/alxayo/rtmp-go/pull/3) | Set initial semantic version to v0.1.0 | `copilot/determine-semantic-version` | Merged | +| [#2](https://github.com/alxayo/rtmp-go/pull/2) | Fix server connection tracking tests | `copilot/fix-github-actions-workflow-again` | Merged | +| [#1](https://github.com/alxayo/rtmp-go/pull/1) | Fix gofmt formatting violations failing CI | `copilot/fix-github-actions-workflow` | Merged | + +--- + +## Feature Branches + +| Branch | Spec | Description | +|--------|------|-------------| +| `feature/005-error-handling-benchmarks` | [specs/005](specs/005-error-handling-benchmarks/spec.md) | Error handling, connection cleanup, TCP deadlines, performance benchmarks | +| `feature/004-token-auth` | [specs/004](specs/004-token-auth/spec.md) | Token-based stream key authentication with 4 validator backends | +| `003-multi-destination-relay` | [specs/003](specs/003-multi-destination-relay/) | Multi-destination relay to external RTMP servers | +| `T001-init-go-module` | [specs/001](specs/001-rtmp-server-implementation/spec.md) | Core RTMP server implementation (handshake through media streaming) | + +[Unreleased]: https://github.com/alxayo/rtmp-go/compare/v0.1.1...feature/005-error-handling-benchmarks +[v0.1.1]: https://github.com/alxayo/rtmp-go/compare/v0.1.0...v0.1.1 +[v0.1.0]: https://github.com/alxayo/rtmp-go/releases/tag/v0.1.0 diff --git a/README.md b/README.md index 8d3b5e9..284c6ad 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,7 @@ See [docs/getting-started.md](docs/getting-started.md) for the full guide with C | **Media Logging** | Per-connection codec detection and bitrate stats | | **Event Hooks** | Webhooks, shell scripts, and stdio notifications on RTMP events | | **Authentication** | Pluggable token-based validation for publish/play (static tokens, file, webhook) | +| **Connection Cleanup** | TCP deadline enforcement (read 90s, write 30s), disconnect handlers, zombie detection | ## Architecture @@ -126,9 +127,11 @@ Integration tests in `tests/integration/` exercise the full publish → subscrib ## Roadmap +### Recently Completed +- Enhanced error handling: disconnect handlers, TCP deadline enforcement (read 90s, write 30s), relay client cleanup +- Performance benchmarks for chunk parsing, AMF0 encoding, and array operations + ### In Progress -- Enhanced error handling and graceful connection cleanup -- Performance benchmarks for chunk and AMF0 encode/decode - Fuzz testing for AMF0 and chunk parsing (bounds safety) ### Planned @@ -139,6 +142,10 @@ Integration tests in `tests/integration/` exercise the full publish → subscrib - **Transcoding** — server-side codec conversion (e.g. H.265 → H.264) - **Clustering** — horizontal scaling across multiple server instances +## Changelog + +See [CHANGELOG.md](CHANGELOG.md) for a detailed history of all releases and changes. + ## License See [LICENSE](LICENSE) file. \ No newline at end of file diff --git a/cmd/rtmp-server/main.go b/cmd/rtmp-server/main.go index b792ff0..7e25570 100644 --- a/cmd/rtmp-server/main.go +++ b/cmd/rtmp-server/main.go @@ -103,6 +103,7 @@ func main() { log.Info("server stopped cleanly") case <-shutdownCtx.Done(): log.Error("forced exit after timeout") + os.Exit(1) } } diff --git a/docs/architecture.md b/docs/architecture.md index 3dc95f5..eb32d2b 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -85,6 +85,10 @@ Immediately after handshake, the server sends three control messages: - **Window Acknowledgement Size** (2,500,000 bytes): Flow control threshold. - **Set Peer Bandwidth** (2,500,000 bytes): Output rate limit. +TCP deadlines are enforced on the underlying connection: +- **Read deadline**: 90 seconds — detects frozen publishers and stuck subscribers +- **Write deadline**: 30 seconds — prevents slow-loris attacks and half-open connections + ### 4. Command Exchange (`internal/rtmp/rpc`) The client and server exchange AMF0-encoded command messages: diff --git a/docs/design.md b/docs/design.md index 346b293..f8d4f3b 100644 --- a/docs/design.md +++ b/docs/design.md @@ -54,7 +54,16 @@ The first audio and video messages from a publisher typically contain "sequence ### Event Hooks -The server includes an event hook system that notifies external systems when important events occur (connection accept/close, publish start, play start, codec detected). Three hook types are supported: +The server includes an event hook system that notifies external systems when lifecycle events occur. Available events: + +- **connection_accept** / **connection_close**: Client connects or disconnects +- **publish_start** / **publish_stop**: Publisher begins or stops streaming +- **play_start** / **play_stop**: Subscriber starts or stops playback +- **codec_detected**: Audio/video codec identified from first media packet +- **subscriber_count**: Updated subscriber count when subscribers join or leave +- **auth_failed**: Authentication rejected for publish or play + +Three hook types are supported: - **Webhook**: HTTP POST with JSON event payload to a URL - **Shell**: Execute a script with event data as environment variables @@ -85,6 +94,22 @@ The default mode is `none` (accept all requests), preserving backward compatibil | Media logger counters | `sync.RWMutex` | Updated by read loop, read by stats ticker | | Hook execution pool | Buffered channel (semaphore) | Limits concurrent hook goroutines | +### TCP Deadline Enforcement + +Each connection enforces TCP read/write deadlines to detect zombie connections: +- **Read deadline**: 90 seconds — closes connections from frozen or stalled publishers +- **Write deadline**: 30 seconds — drops connections to unresponsive subscribers + +Deadlines are reset on each successful I/O operation, so normal streaming is unaffected. This prevents resource leaks (file descriptors, goroutines) from clients that hang without properly closing sockets. + +### Graceful Shutdown + +On shutdown signal (SIGINT/SIGTERM): +1. Server stops accepting new connections +2. Existing connections receive context cancellation +3. Relay client connections are closed to prevent dangling forwarding +4. If connections don't close within the timeout, the process exits forcefully + ## Error Handling Errors are classified by protocol layer using typed error wrappers: diff --git a/docs/getting-started.md b/docs/getting-started.md index 0e5adc9..83e1125 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -195,6 +195,15 @@ go test ./tests/integration/ | "stream not found" in play | Wrong stream key | Ensure publisher and subscriber use the same `app/streamName` | | High CPU usage | Debug logging | Use `-log-level info` instead of `debug` | | Recording file empty | Publisher disconnected before keyframe | Stream for at least a few seconds | +| Connection dropped after ~90s | TCP read deadline | The server closes idle connections after 90 seconds of silence — ensure the publisher is actively streaming | + +## Connection Management + +The server automatically manages zombie connections: +- **Read deadline (90s)**: If no data arrives from a client within 90 seconds, the connection is closed +- **Write deadline (30s)**: If the server cannot write to a client within 30 seconds, the connection is dropped + +These deadlines reset on each successful I/O operation, so active streams are unaffected. ## Next Steps diff --git a/docs/implementation.md b/docs/implementation.md index 6b99863..f718acb 100644 --- a/docs/implementation.md +++ b/docs/implementation.md @@ -178,10 +178,12 @@ Each tag header contains: TypeID (8=audio, 9=video), data size (24-bit), timesta ## Event Hooks -The hook system (`internal/rtmp/server/hooks/`) notifies external systems when RTMP events occur. It integrates at two points: +The hook system (`internal/rtmp/server/hooks/`) notifies external systems when RTMP events occur. It integrates at multiple points: -1. **Server accept loop** (`server.go`): Triggers `connection_accept` and `connection_close` events -2. **Command handlers** (`command_integration.go`): Triggers `publish_start` and `play_start` events +1. **Server accept loop** (`server.go`): Triggers `connection_accept` on new connections +2. **Disconnect handlers** (`command_integration.go`): Triggers `connection_close`, `publish_stop`, `play_stop`, and `subscriber_count` on disconnect +3. **Command handlers** (`command_integration.go`): Triggers `publish_start`, `play_start`, `subscriber_count`, and `auth_failed` +4. **Media dispatch** (`media_dispatch.go`): Triggers `codec_detected` on first media packet Each hook runs asynchronously in a bounded goroutine pool (default 10 workers). The `HookManager` maps event types to registered hooks and dispatches via `TriggerEvent()`. diff --git a/internal/rtmp/amf/amf.go b/internal/rtmp/amf/amf.go index d31935c..c26f202 100644 --- a/internal/rtmp/amf/amf.go +++ b/internal/rtmp/amf/amf.go @@ -92,14 +92,6 @@ func DecodeAll(data []byte) ([]interface{}, error) { return out, nil } -// Marshal encodes a single AMF0 value and returns the bytes. -func Marshal(v interface{}) ([]byte, error) { return EncodeAll(v) } - -// Unmarshal decodes a single AMF0 value from data. -func Unmarshal(data []byte) (interface{}, error) { - return DecodeValue(bytes.NewReader(data)) -} - // unsupportedMarker returns true if the marker is explicitly listed by task // spec to be rejected (Undefined, Reference, AMF3+ reserved range). func unsupportedMarker(m byte) bool { diff --git a/internal/rtmp/amf/amf_test.go b/internal/rtmp/amf/amf_test.go index 4453803..4297264 100644 --- a/internal/rtmp/amf/amf_test.go +++ b/internal/rtmp/amf/amf_test.go @@ -4,7 +4,7 @@ // used by RTMP to encode command parameters: numbers (float64), booleans, // strings, null, objects (string→value maps), and arrays. // -// These tests exercise the top-level Marshal/Unmarshal and EncodeAll/DecodeAll +// These tests exercise the EncodeAll/DecodeAll and DecodeValue // APIs, which delegate to type-specific encoders/decoders in sibling files. // // Key Go concepts demonstrated: @@ -15,6 +15,7 @@ package amf import ( "bytes" + "fmt" "testing" ) @@ -45,17 +46,19 @@ func TestEncodeDecodeRoundTrip_Primitives(t *testing.T) { []interface{}{[]interface{}{float64(1), float64(2)}, map[string]interface{}{"k": "v"}}, } for i, v := range cases { - b, err := Marshal(v) - if err != nil { - t.Fatalf("case %d marshal error: %v", i, err) - } - rv, err := Unmarshal(b) - if err != nil { - t.Fatalf("case %d unmarshal error: %v", i, err) - } - if !deepEqual(v, rv) { - t.Fatalf("case %d mismatch\norig=%#v\nrtnd=%#v", i, v, rv) - } + t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { + b, err := EncodeAll(v) + if err != nil { + t.Fatalf("encode error: %v", err) + } + rv, err := DecodeValue(bytes.NewReader(b)) + if err != nil { + t.Fatalf("decode error: %v", err) + } + if !deepEqual(v, rv) { + t.Fatalf("mismatch\norig=%#v\nrtnd=%#v", v, rv) + } + }) } } @@ -157,3 +160,39 @@ func deepEqual(a, b interface{}) bool { return false } } + +// --- Benchmarks --- + +// BenchmarkEncodeAll_ConnectCommand benchmarks multi-value encoding of a full connect command. +func BenchmarkEncodeAll_ConnectCommand(b *testing.B) { + b.ReportAllocs() + obj := map[string]interface{}{ + "app": "live", + "type": "nonprivate", + "flashVer": "FMLE/3.0", + "tcUrl": "rtmp://localhost/live", + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = EncodeAll("connect", 1.0, obj) + } +} + +// BenchmarkDecodeAll_ConnectCommand benchmarks multi-value decoding of a full connect command. +func BenchmarkDecodeAll_ConnectCommand(b *testing.B) { + b.ReportAllocs() + obj := map[string]interface{}{ + "app": "live", + "type": "nonprivate", + "flashVer": "FMLE/3.0", + "tcUrl": "rtmp://localhost/live", + } + data, err := EncodeAll("connect", 1.0, obj) + if err != nil { + b.Fatalf("encode: %v", err) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = DecodeAll(data) + } +} diff --git a/internal/rtmp/amf/array_test.go b/internal/rtmp/amf/array_test.go index 7915cfa..c04e7a4 100644 --- a/internal/rtmp/amf/array_test.go +++ b/internal/rtmp/amf/array_test.go @@ -134,3 +134,32 @@ func TestStrictArray_RoundTrip_VariedTypes(t *testing.T) { } } } + +// --- Benchmarks --- + +// BenchmarkEncodeStrictArray benchmarks encoding a mixed-type array. +func BenchmarkEncodeStrictArray(b *testing.B) { + b.ReportAllocs() + arr := []interface{}{1.0, "test", true, nil, 42.0} + b.ResetTimer() + for i := 0; i < b.N; i++ { + var buf bytes.Buffer + _ = EncodeStrictArray(&buf, arr) + } +} + +// BenchmarkDecodeStrictArray benchmarks decoding a mixed-type array. +func BenchmarkDecodeStrictArray(b *testing.B) { + b.ReportAllocs() + arr := []interface{}{1.0, "test", true, nil, 42.0} + var buf bytes.Buffer + if err := EncodeStrictArray(&buf, arr); err != nil { + b.Fatalf("encode: %v", err) + } + data := buf.Bytes() + b.ResetTimer() + for i := 0; i < b.N; i++ { + r := bytes.NewReader(data) + _, _ = DecodeStrictArray(r) + } +} diff --git a/internal/rtmp/amf/object_test.go b/internal/rtmp/amf/object_test.go index cd54d3f..09235ab 100644 --- a/internal/rtmp/amf/object_test.go +++ b/internal/rtmp/amf/object_test.go @@ -120,6 +120,45 @@ func TestEncodeObject_UnsupportedType(t *testing.T) { } } +// --- Benchmarks --- + +// BenchmarkEncodeObject benchmarks encoding a typical RTMP connect-style object. +func BenchmarkEncodeObject(b *testing.B) { + b.ReportAllocs() + obj := map[string]interface{}{ + "app": "live", + "type": "nonprivate", + "flashVer": "FMLE/3.0", + "tcUrl": "rtmp://localhost/live", + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + var buf bytes.Buffer + _ = EncodeObject(&buf, obj) + } +} + +// BenchmarkDecodeObject benchmarks decoding a typical RTMP connect-style object. +func BenchmarkDecodeObject(b *testing.B) { + b.ReportAllocs() + obj := map[string]interface{}{ + "app": "live", + "type": "nonprivate", + "flashVer": "FMLE/3.0", + "tcUrl": "rtmp://localhost/live", + } + var buf bytes.Buffer + if err := EncodeObject(&buf, obj); err != nil { + b.Fatalf("encode: %v", err) + } + data := buf.Bytes() + b.ResetTimer() + for i := 0; i < b.N; i++ { + r := bytes.NewReader(data) + _, _ = DecodeObject(r) + } +} + // TestDecodeObject_InvalidEndMarker crafts bytes where the end-of-object // sentinel is wrong (0x08 instead of 0x09). The decoder must detect this. func TestDecodeObject_InvalidEndMarker(t *testing.T) { diff --git a/internal/rtmp/chunk/reader_test.go b/internal/rtmp/chunk/reader_test.go index 4db14df..7f8397a 100644 --- a/internal/rtmp/chunk/reader_test.go +++ b/internal/rtmp/chunk/reader_test.go @@ -118,104 +118,9 @@ func TestReader_SetChunkSize_Applied(t *testing.T) { // Reader should have updated chunk size -> second message read in one go m2, err := r.ReadMessage() if err != nil { - // If chunk size not updated, reader would attempt to parse a header from payload and fail. - // Provide meaningful error context to aid debugging. - // Fail test if error encountered. - // Note: If large message incorrectly fragmented, this test will hang or error. - // We rely on timeout from `go test` if hang occurs. - // So here just assert. - // (No further action) - // Document: failing here implies SetChunkSize not applied. - // Implementations should not reach this. Fail now. - // - // Provide explicit failure. - // - // - // - // - // Actually fail: - // - // - // - // - // - // - // - // - // - // - // - // - // End commentary. - // - // - // - // - // final: - // - // - // - // (short message) - // - // - // - // - // - // - // - // - // - // - // - // *** - // - // - // - // - // - // Oops; okay really fail now. - // - // - // - // - // - // *** - // - // Enough. t.Fatalf("large message read: %v", err) } if len(m2.Payload) != 3000 { - // Defensive copy visible length mismatch - // Avoid printing huge payload; just lengths. - if len(m2.Payload) < 3000 { - // Data truncated - } - // Fail - // - // Provide summary - // - // - // - // - // - // - // - // - // - // - // end. - // - // - // - // - //** - // fail - // - // - // - // keep succinct - // - // t.Fatalf("expected 3000 payload got %d", len(m2.Payload)) } } @@ -227,115 +132,8 @@ func TestReader_GoldenFileExists(t *testing.T) { p := filepath.Join("..", "..", "..", "tests", "golden", "chunk_fmt0_audio.bin") if _, err := os.Stat(p); err != nil { if os.IsNotExist(err) { - // If golden missing, other tests will fail earlier; still surface here - // but don't hard fail to avoid noise. Use t.Skip to mark. t.Skip("golden file missing") } - // other error -> fail - // Wrap message - // Keep short. - // - // - // - // done - // - // - // - // final - // - // - // - // - // - // real fail: - // - // - // - // - // - // complete - // - // - //** - // - // - // - // - // - // Enough - // - // finish - // - // - // - // just fail - // - // - // (Stop adding commentary!) - // - // - // - // - // - // - // ok - // - // - // - // final - // - // - // - // x - // - // - // - // abort - // - // -- real line below -- - // - // - // Actually fail: - // - // - // - // - // not again - // - // we stop now. - // - // - // - // - // - // - // - // - // done - // - // - // - // . - // - // - // - // End! - // - // (really) - // - // - // - // finish - // - // - // Completed commentary. - // - // fail now - // - // end - // - // - //** t.Fatalf("stat golden: %v", err) } if p == "" || p == "/" || p == "." { @@ -344,3 +142,87 @@ func TestReader_GoldenFileExists(t *testing.T) { _ = io.Discard } } + +// --- Benchmarks --- + +// BenchmarkParseChunkHeader_FMT0 benchmarks parsing of a full 12-byte FMT0 header. +func BenchmarkParseChunkHeader_FMT0(b *testing.B) { + b.ReportAllocs() + h := &ChunkHeader{FMT: 0, CSID: 4, Timestamp: 1000, MessageLength: 100, MessageTypeID: 8, MessageStreamID: 1} + raw, err := EncodeChunkHeader(h, nil) + if err != nil { + b.Fatalf("encode header: %v", err) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + r := bytes.NewReader(raw) + _, _ = ParseChunkHeader(r, nil) + } +} + +// BenchmarkParseChunkHeader_FMT1 benchmarks parsing of an 8-byte FMT1 delta header. +func BenchmarkParseChunkHeader_FMT1(b *testing.B) { + b.ReportAllocs() + prev := &ChunkHeader{FMT: 0, CSID: 6, Timestamp: 1000, MessageLength: 80, MessageTypeID: 9, MessageStreamID: 1} + h := &ChunkHeader{FMT: 1, CSID: 6, Timestamp: 40, MessageLength: 80, MessageTypeID: 9} + raw, err := EncodeChunkHeader(h, nil) + if err != nil { + b.Fatalf("encode header: %v", err) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + r := bytes.NewReader(raw) + _, _ = ParseChunkHeader(r, prev) + } +} + +// BenchmarkParseChunkHeader_FMT3 benchmarks parsing of a minimal 1-byte FMT3 header. +func BenchmarkParseChunkHeader_FMT3(b *testing.B) { + b.ReportAllocs() + prev := &ChunkHeader{FMT: 0, CSID: 6, Timestamp: 2000, MessageLength: 384, MessageTypeID: 9, MessageStreamID: 1} + h := &ChunkHeader{FMT: 3, CSID: 6} + raw, err := EncodeChunkHeader(h, prev) + if err != nil { + b.Fatalf("encode header: %v", err) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + r := bytes.NewReader(raw) + _, _ = ParseChunkHeader(r, prev) + } +} + +// BenchmarkReaderReadMessage_SingleChunk benchmarks reading a single-chunk message. +func BenchmarkReaderReadMessage_SingleChunk(b *testing.B) { + b.ReportAllocs() + payload := make([]byte, 100) + h := &ChunkHeader{FMT: 0, CSID: 4, Timestamp: 1000, MessageLength: 100, MessageTypeID: 8, MessageStreamID: 1} + hdr, err := EncodeChunkHeader(h, nil) + if err != nil { + b.Fatalf("encode header: %v", err) + } + data := append(hdr, payload...) + b.ResetTimer() + for i := 0; i < b.N; i++ { + r := NewReader(bytes.NewReader(data), 128) + _, _ = r.ReadMessage() + } +} + +// BenchmarkReaderReadMessage_MultiChunk benchmarks reading a message spanning multiple chunks. +func BenchmarkReaderReadMessage_MultiChunk(b *testing.B) { + b.ReportAllocs() + payload := make([]byte, 4096) + var buf bytes.Buffer + w := NewWriter(&buf, 128) + msg := &Message{CSID: 6, Timestamp: 0, MessageLength: 4096, TypeID: 9, MessageStreamID: 1, Payload: payload} + if err := w.WriteMessage(msg); err != nil { + b.Fatalf("write: %v", err) + } + data := buf.Bytes() + b.ResetTimer() + for i := 0; i < b.N; i++ { + r := NewReader(bytes.NewReader(data), 128) + _, _ = r.ReadMessage() + } +} diff --git a/internal/rtmp/chunk/writer_test.go b/internal/rtmp/chunk/writer_test.go index 18aaeec..e053af4 100644 --- a/internal/rtmp/chunk/writer_test.go +++ b/internal/rtmp/chunk/writer_test.go @@ -18,6 +18,7 @@ package chunk import ( "bytes" "encoding/hex" + "fmt" "io" "testing" ) @@ -118,15 +119,17 @@ func TestEncodeChunkHeader_CSIDEncodings(t *testing.T) { {320, 1, "41 00 01"}, // three byte form (fmt1 marker 1) } for _, c := range cases { - b, err := EncodeChunkHeader(&ChunkHeader{FMT: c.fmt, CSID: c.csid, Timestamp: 0, MessageLength: 0, MessageTypeID: 0, MessageStreamID: 0}, nil) - if err != nil { - t.Fatalf("csid %d: %v", c.csid, err) - } - // Only compare basic header prefix (length 1/2/3) because we added message header zeros. - wantBytes, _ := hex.DecodeString(c.want) - if !bytes.HasPrefix(b, wantBytes) { - t.Fatalf("csid %d expected prefix %x got %x", c.csid, wantBytes, b) - } + t.Run(fmt.Sprintf("csid_%d_fmt%d", c.csid, c.fmt), func(t *testing.T) { + b, err := EncodeChunkHeader(&ChunkHeader{FMT: c.fmt, CSID: c.csid, Timestamp: 0, MessageLength: 0, MessageTypeID: 0, MessageStreamID: 0}, nil) + if err != nil { + t.Fatalf("csid %d: %v", c.csid, err) + } + // Only compare basic header prefix (length 1/2/3) because we added message header zeros. + wantBytes, _ := hex.DecodeString(c.want) + if !bytes.HasPrefix(b, wantBytes) { + t.Fatalf("csid %d expected prefix %x got %x", c.csid, wantBytes, b) + } + }) } } @@ -375,3 +378,54 @@ func TestWriter_ChunkReaderRoundTrip(t *testing.T) { } } } + +// --- Benchmarks --- + +// BenchmarkEncodeChunkHeader_FMT0 benchmarks header serialization for a full FMT0 header. +func BenchmarkEncodeChunkHeader_FMT0(b *testing.B) { + b.ReportAllocs() + h := &ChunkHeader{FMT: 0, CSID: 4, Timestamp: 1000, MessageLength: 100, MessageTypeID: 8, MessageStreamID: 1} + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = EncodeChunkHeader(h, nil) + } +} + +// BenchmarkWriterWriteMessage_SingleChunk benchmarks writing a single-chunk message. +func BenchmarkWriterWriteMessage_SingleChunk(b *testing.B) { + b.ReportAllocs() + payload := make([]byte, 100) + msg := &Message{CSID: 4, Timestamp: 1000, MessageLength: 100, TypeID: 8, MessageStreamID: 1, Payload: payload} + b.ResetTimer() + for i := 0; i < b.N; i++ { + w := NewWriter(io.Discard, 128) + _ = w.WriteMessage(msg) + } +} + +// BenchmarkWriterWriteMessage_MultiChunk benchmarks writing a multi-chunk message. +func BenchmarkWriterWriteMessage_MultiChunk(b *testing.B) { + b.ReportAllocs() + payload := make([]byte, 4096) + msg := &Message{CSID: 6, Timestamp: 0, MessageLength: 4096, TypeID: 9, MessageStreamID: 1, Payload: payload} + b.ResetTimer() + for i := 0; i < b.N; i++ { + w := NewWriter(io.Discard, 128) + _ = w.WriteMessage(msg) + } +} + +// BenchmarkWriterReaderRoundTrip benchmarks the end-to-end Write→Read cycle. +func BenchmarkWriterReaderRoundTrip(b *testing.B) { + b.ReportAllocs() + payload := make([]byte, 4096) + msg := &Message{CSID: 6, Timestamp: 0, MessageLength: 4096, TypeID: 9, MessageStreamID: 1, Payload: payload} + b.ResetTimer() + for i := 0; i < b.N; i++ { + var buf bytes.Buffer + w := NewWriter(&buf, 128) + _ = w.WriteMessage(msg) + r := NewReader(&buf, 128) + _, _ = r.ReadMessage() + } +} diff --git a/internal/rtmp/client/client.go b/internal/rtmp/client/client.go index b107ff1..f501c57 100644 --- a/internal/rtmp/client/client.go +++ b/internal/rtmp/client/client.go @@ -11,8 +11,6 @@ package client // * Play mode: send play command and read incoming audio/video messages // (parsing limited to chunk reassembly; higher‑level parsing lives in // media layer packages already implemented for the server). -// * CLI compatibility hook (RunCLI) used by a future small main in -// cmd/rtmp-client – kept here so tests can exercise without duplication. // // Non‑Goals (for now): full error command responses, bandwidth / control // messages, extended timestamp edge cases, retransmission, AMF3. @@ -34,7 +32,6 @@ package client import ( "errors" "fmt" - "io" "log/slog" "net" "net/url" @@ -353,49 +350,3 @@ func (c *Client) Close() error { c.writer = nil return err } - -// RunCLI executes a simplified publish / play action based on args. -// Usage examples (from task requirements): -// -// rtmp-client publish rtmp://host/app/stream file.flv -// -// For now we only implement the connect + publish handshake; file muxing -// is out of current scope – we simulate by sending a single dummy audio tag. -func RunCLI(args []string, stdout io.Writer) int { - if len(args) < 3 { - fmt.Fprintln(stdout, "usage: rtmp-client rtmp://host/app/stream [file]") - return 2 - } - mode := args[0] - url := args[1] - c, err := New(url) - if err != nil { - fmt.Fprintln(stdout, "error:", err) - return 1 - } - if err := c.Connect(); err != nil { - fmt.Fprintln(stdout, "connect error:", err) - return 1 - } - switch mode { - case "publish": - if err := c.Publish(); err != nil { - fmt.Fprintln(stdout, "publish error:", err) - return 1 - } - // send one dummy audio packet (AAC sequence header-ish) - _ = c.SendAudio(0, []byte{0xAF, 0x00}) - fmt.Fprintln(stdout, "published", c.streamKey) - case "play": - if err := c.Play(); err != nil { - fmt.Fprintln(stdout, "play error:", err) - return 1 - } - fmt.Fprintln(stdout, "play requested", c.streamKey) - default: - fmt.Fprintln(stdout, "unknown mode", mode) - return 2 - } - _ = c.Close() - return 0 -} diff --git a/internal/rtmp/conn/conn.go b/internal/rtmp/conn/conn.go index c3cca7a..e77ec48 100644 --- a/internal/rtmp/conn/conn.go +++ b/internal/rtmp/conn/conn.go @@ -28,6 +28,14 @@ const ( // sending. When this limit is reached, new sends will block (up to sendTimeout). // 100 messages provides ~3 seconds of buffer at 30fps video. outboundQueueSize = 100 + + // readTimeout is the TCP read deadline for zombie connection detection. + // Generous to accommodate idle subscribers that receive no data when + // no publisher is active. Publishers send data continuously (~30fps) + // so any timeout > a few seconds catches dead peers. + readTimeout = 90 * time.Second + // writeTimeout catches dead TCP peers that never acknowledge writes. + writeTimeout = 30 * time.Second ) // Connection represents an accepted RTMP connection that has completed the @@ -52,10 +60,10 @@ type Connection struct { writeChunkSize uint32 // accessed atomically by multiple goroutines windowAckSize uint32 outboundQueue chan *chunk.Message - session *Session // Internal helpers - onMessage func(*chunk.Message) // test hook / dispatcher injection + onMessage func(*chunk.Message) // test hook / dispatcher injection + onDisconnect func() // called once when readLoop exits (cleanup cascade) } // ID returns the logical connection id. @@ -67,6 +75,9 @@ func (c *Connection) NetConn() net.Conn { return c.netConn } // HandshakeDuration returns how long the RTMP handshake took. func (c *Connection) HandshakeDuration() time.Duration { return c.handshakeDuration } +// AcceptedAt returns the time the connection was accepted. +func (c *Connection) AcceptedAt() time.Time { return c.acceptedAt } + // Close closes the underlying connection. func (c *Connection) Close() error { if c.cancel != nil { @@ -83,6 +94,10 @@ func (c *Connection) Close() error { // fully reassembled RTMP message. MUST be called before Start(). func (c *Connection) SetMessageHandler(fn func(*chunk.Message)) { c.onMessage = fn } +// SetDisconnectHandler installs a callback invoked once when the readLoop +// exits (for any reason: EOF, error, context cancel). MUST be called before Start(). +func (c *Connection) SetDisconnectHandler(fn func()) { c.onDisconnect = fn } + // Start begins the readLoop. MUST be called after SetMessageHandler() to avoid race condition. func (c *Connection) Start() { c.startReadLoop() @@ -121,6 +136,15 @@ func (c *Connection) startReadLoop() { c.wg.Add(1) go func() { defer c.wg.Done() + defer func() { + // Cleanup cascade: cancel context first (stops writeLoop via ctx.Done()), + // then invoke the disconnect handler for higher-level cleanup. + // cancel() is idempotent — safe if Close() already called it. + c.cancel() + if c.onDisconnect != nil { + c.onDisconnect() + } + }() r := chunk.NewReader(c.netConn, c.readChunkSize) for { select { @@ -128,11 +152,19 @@ func (c *Connection) startReadLoop() { return default: } + _ = c.netConn.SetReadDeadline(time.Now().Add(readTimeout)) msg, err := r.ReadMessage() if err != nil { + // Normal disconnect paths — exit silently if errors.Is(err, context.Canceled) || errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) { return } + // Timeout from read deadline — connection is dead + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + c.log.Warn("readLoop timeout (zombie connection reaped)") + return + } c.log.Error("readLoop error", "error", err) return } @@ -160,6 +192,7 @@ func (c *Connection) startWriteLoop() { } currentChunkSize := atomic.LoadUint32(&c.writeChunkSize) w.SetChunkSize(currentChunkSize) + _ = c.netConn.SetWriteDeadline(time.Now().Add(writeTimeout)) if err := w.WriteMessage(msg); err != nil { c.log.Error("writeLoop write failed", "error", err) return diff --git a/internal/rtmp/conn/conn_test.go b/internal/rtmp/conn/conn_test.go index f92e5e8..c7d5c16 100644 --- a/internal/rtmp/conn/conn_test.go +++ b/internal/rtmp/conn/conn_test.go @@ -263,3 +263,99 @@ func TestCloseGraceful(t *testing.T) { t.Fatalf("expected error sending after close") } } + +// --- Disconnect Handler Tests --- + +// TestDisconnectHandler_FiresOnEOF verifies the disconnect handler fires +// when the client closes the connection (EOF in readLoop). +func TestDisconnectHandler_FiresOnEOF(t *testing.T) { + logger.UseWriter(io.Discard) + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + defer ln.Close() + connCh := make(chan *Connection, 1) + go func() { c, _ := Accept(ln); connCh <- c }() + client := dialAndClientHandshake(t, ln.Addr().String()) + serverConn := <-connCh + if serverConn == nil { + t.Fatalf("server conn nil") + } + + var fired atomic.Bool + serverConn.SetDisconnectHandler(func() { fired.Store(true) }) + serverConn.SetMessageHandler(func(m *chunk.Message) {}) + serverConn.Start() + + // Client close triggers EOF in readLoop + client.Close() + + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) && !fired.Load() { + time.Sleep(10 * time.Millisecond) + } + if !fired.Load() { + t.Fatal("disconnect handler did not fire on EOF") + } + _ = serverConn.Close() +} + +// TestDisconnectHandler_FiresOnContextCancel verifies the disconnect handler +// fires when Close() is called (context cancellation). +func TestDisconnectHandler_FiresOnContextCancel(t *testing.T) { + logger.UseWriter(io.Discard) + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + defer ln.Close() + connCh := make(chan *Connection, 1) + go func() { c, _ := Accept(ln); connCh <- c }() + client := dialAndClientHandshake(t, ln.Addr().String()) + defer client.Close() + serverConn := <-connCh + if serverConn == nil { + t.Fatalf("server conn nil") + } + + var fired atomic.Bool + serverConn.SetDisconnectHandler(func() { fired.Store(true) }) + serverConn.SetMessageHandler(func(m *chunk.Message) {}) + serverConn.Start() + + // Close triggers context cancel → readLoop exits → handler fires + _ = serverConn.Close() + + if !fired.Load() { + t.Fatal("disconnect handler did not fire on context cancel") + } +} + +// TestDisconnectHandler_NilSafe verifies readLoop exits cleanly when no +// disconnect handler is set (nil handler must not panic). +func TestDisconnectHandler_NilSafe(t *testing.T) { + logger.UseWriter(io.Discard) + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + defer ln.Close() + connCh := make(chan *Connection, 1) + go func() { c, _ := Accept(ln); connCh <- c }() + client := dialAndClientHandshake(t, ln.Addr().String()) + serverConn := <-connCh + if serverConn == nil { + t.Fatalf("server conn nil") + } + + // No disconnect handler set — just message handler + serverConn.SetMessageHandler(func(m *chunk.Message) {}) + serverConn.Start() + + // Client close triggers EOF → readLoop exits → no panic + client.Close() + + // Close should complete without hanging or panicking + _ = serverConn.Close() +} diff --git a/internal/rtmp/conn/doc.go b/internal/rtmp/conn/doc.go index 67b2f00..8232443 100644 --- a/internal/rtmp/conn/doc.go +++ b/internal/rtmp/conn/doc.go @@ -21,9 +21,4 @@ // The outbound queue is bounded (see [outboundQueueSize]) to provide // backpressure. [SendMessage] blocks briefly (see [sendTimeout]) and returns // an error if the queue is full. -// -// # Session State -// -// Per-connection metadata (app name, stream key, state machine) is tracked -// by the [Session] type in session.go. package conn diff --git a/internal/rtmp/conn/session.go b/internal/rtmp/conn/session.go deleted file mode 100644 index 411a633..0000000 --- a/internal/rtmp/conn/session.go +++ /dev/null @@ -1,104 +0,0 @@ -package conn - -// SessionState represents where a connection is in the RTMP session lifecycle. -// Each connection progresses through these states as commands are exchanged: -// -// Uninitialized → Connected → StreamCreated → Publishing or Playing -// -// The client drives transitions by sending connect, createStream, publish/play commands. -type SessionState uint8 - -const ( - SessionStateUninitialized SessionState = iota // Initial state before any commands - SessionStateConnected // After successful "connect" command - SessionStateStreamCreated // After "createStream" allocates a stream ID - SessionStatePublishing // After "publish" command (sending media) - SessionStatePlaying // After "play" command (receiving media) -) - -// Session holds per-connection RTMP session metadata established during the -// command exchange phase. It tracks the application name, stream identifiers, -// and the current lifecycle state. -type Session struct { - app string // application name from connect command (e.g. "live") - tcUrl string // target URL from connect (e.g. "rtmp://host/live") - flashVer string // client's Flash version string - objectEncoding uint8 // AMF encoding version (0=AMF0, must be 0 for this server) - - transactionID uint32 // incrementing counter for request-response matching - streamID uint32 // message stream ID allocated by createStream (typically 1) - streamKey string // full stream key: "app/streamName" (e.g. "live/mystream") - - state SessionState // current lifecycle state -} - -// NewSession creates a new Session in Uninitialized state. -func NewSession() *Session { - return &Session{transactionID: 1, state: SessionStateUninitialized} -} - -// SetConnectInfo sets fields derived from the "connect" command and -// moves the session into Connected state. -func (s *Session) SetConnectInfo(app, tcUrl, flashVer string, objectEncoding uint8) { - s.app = app - s.tcUrl = tcUrl - s.flashVer = flashVer - s.objectEncoding = objectEncoding - if s.state == SessionStateUninitialized { - s.state = SessionStateConnected - } -} - -// NextTransactionID increments and returns the next transaction id. -// Starts from 1 so the first call returns 2. This mirrors common RTMP -// client behavior (FFmpeg/OBS) where the connect command uses an id -// of 1 and responses increment from there. -func (s *Session) NextTransactionID() uint32 { - s.transactionID++ - return s.transactionID -} - -// AllocateStreamID allocates (or increments) the message stream ID. -// Typical RTMP sessions only allocate a single stream (id 1). We keep -// the counter logic simple to allow future multi-stream support. -// Returns the allocated stream id. -func (s *Session) AllocateStreamID() uint32 { - if s.streamID == 0 { - s.streamID = 1 - } else { - s.streamID++ - } - if s.state == SessionStateConnected { - s.state = SessionStateStreamCreated - } - return s.streamID -} - -// SetStreamKey composes and stores the fully-qualified stream key -// using the application name and provided streamName. Returns the -// constructed key. The higher-level publish/play handlers will set -// the appropriate final state (Publishing or Playing); we only set -// Publishing as a neutral placeholder if not already set. -func (s *Session) SetStreamKey(app, streamName string) string { - // Prefer explicit app param (may match s.app); do not override if empty. - if app != "" { - s.app = app - } - s.streamKey = s.app + "/" + streamName - // If stream was created but role not yet specified, mark as Publishing placeholder. - if s.state == SessionStateStreamCreated { - s.state = SessionStatePublishing - } - return s.streamKey -} - -// Accessor methods (read-only) ------------------------------------------------ - -func (s *Session) App() string { return s.app } -func (s *Session) TcUrl() string { return s.tcUrl } -func (s *Session) FlashVer() string { return s.flashVer } -func (s *Session) ObjectEncoding() uint8 { return s.objectEncoding } -func (s *Session) TransactionID() uint32 { return s.transactionID } -func (s *Session) StreamID() uint32 { return s.streamID } -func (s *Session) StreamKey() string { return s.streamKey } -func (s *Session) State() SessionState { return s.state } diff --git a/internal/rtmp/conn/session_test.go b/internal/rtmp/conn/session_test.go deleted file mode 100644 index bbca7d4..0000000 --- a/internal/rtmp/conn/session_test.go +++ /dev/null @@ -1,66 +0,0 @@ -// session_test.go – tests for the RTMP Session state machine. -// -// A Session tracks the logical state of an RTMP connection: -// - Transaction IDs: monotonically increasing counter for command pairing. -// - Stream allocation: each createStream call allocates a new stream ID. -// - Session state: Connecting → Connected → StreamCreated → Publishing. -// - Stream key: "app/streamName" used for pub/sub routing. -package conn - -import "testing" - -// TestSessionTransactionIDIncrement verifies that transaction IDs start at 1 -// and increment with each NextTransactionID call. RTMP uses transaction IDs -// to match command requests with their responses. -func TestSessionTransactionIDIncrement(t *testing.T) { - s := NewSession() - if got := s.TransactionID(); got != 1 { - t.Fatalf("initial transactionID = %d, want 1", got) - } - next := s.NextTransactionID() - if next != 2 { - t.Fatalf("after first NextTransactionID got %d, want 2", next) - } - next = s.NextTransactionID() - if next != 3 { - t.Fatalf("after second NextTransactionID got %d, want 3", next) - } -} - -// TestSessionAllocateStreamID verifies that stream IDs are allocated -// sequentially (1, 2, 3...) and that the session state transitions from -// Connected to StreamCreated after the first allocation. -func TestSessionAllocateStreamID(t *testing.T) { - s := NewSession() - s.SetConnectInfo("live", "rtmp://example/live", "FMLE/3.0", 0) - if s.State() != SessionStateConnected { - t.Fatalf("expected state Connected, got %v", s.State()) - } - id1 := s.AllocateStreamID() - if id1 != 1 { - t.Fatalf("first stream id = %d, want 1", id1) - } - if s.State() != SessionStateStreamCreated { - t.Fatalf("expected state StreamCreated after allocation, got %v", s.State()) - } - id2 := s.AllocateStreamID() - if id2 != 2 { - t.Fatalf("second stream id = %d, want 2", id2) - } -} - -// TestSessionSetStreamKey verifies the stream key is formed as "app/name" -// and the session state transitions to Publishing. -func TestSessionSetStreamKey(t *testing.T) { - s := NewSession() - s.SetConnectInfo("live", "rtmp://example/live", "FMLE/3.0", 0) - s.AllocateStreamID() - key := s.SetStreamKey("live", "testStream") - want := "live/testStream" - if key != want || s.StreamKey() != want { - t.Fatalf("stream key = %q, want %q", key, want) - } - if s.State() != SessionStatePublishing { // placeholder state set in SetStreamKey - t.Fatalf("expected state Publishing placeholder, got %v", s.State()) - } -} diff --git a/internal/rtmp/relay/destination.go b/internal/rtmp/relay/destination.go index 10228a9..664c397 100644 --- a/internal/rtmp/relay/destination.go +++ b/internal/rtmp/relay/destination.go @@ -53,7 +53,6 @@ func (s DestinationStatus) String() string { } } -// Destination represents a single RTMP relay destination // Destination represents a single relay target — a remote RTMP server that // receives a copy of the publisher's audio/video stream. type Destination struct { @@ -127,6 +126,7 @@ func (d *Destination) Connect() error { } if err := client.Connect(); err != nil { + _ = client.Close() // prevent leak: factory may have allocated TCP resources d.Status = StatusError d.LastError = err d.logger.Error("Failed to connect RTMP client", "error", err) @@ -134,6 +134,7 @@ func (d *Destination) Connect() error { } if err := client.Publish(); err != nil { + _ = client.Close() // prevent leak: connection established but publish failed d.Status = StatusError d.LastError = err d.logger.Error("Failed to publish to destination", "error", err) diff --git a/internal/rtmp/server/auth/auth_test.go b/internal/rtmp/server/auth/auth_test.go index 19e64c6..d60bc88 100644 --- a/internal/rtmp/server/auth/auth_test.go +++ b/internal/rtmp/server/auth/auth_test.go @@ -10,14 +10,23 @@ import ( // TestSentinelErrors verifies that each sentinel error has a non-empty // message and can be matched with errors.Is. func TestSentinelErrors(t *testing.T) { - sentinels := []error{ErrUnauthorized, ErrTokenMissing, ErrForbidden} - for _, e := range sentinels { - if e.Error() == "" { - t.Fatalf("sentinel error has empty message: %v", e) - } - if !errors.Is(e, e) { - t.Fatalf("errors.Is failed for %v", e) - } + sentinels := []struct { + name string + err error + }{ + {"ErrUnauthorized", ErrUnauthorized}, + {"ErrTokenMissing", ErrTokenMissing}, + {"ErrForbidden", ErrForbidden}, + } + for _, s := range sentinels { + t.Run(s.name, func(t *testing.T) { + if s.err.Error() == "" { + t.Fatalf("sentinel error has empty message") + } + if !errors.Is(s.err, s.err) { + t.Fatalf("errors.Is failed") + } + }) } } diff --git a/internal/rtmp/server/command_integration.go b/internal/rtmp/server/command_integration.go index 6d64092..291e436 100644 --- a/internal/rtmp/server/command_integration.go +++ b/internal/rtmp/server/command_integration.go @@ -38,11 +38,12 @@ type commandState struct { allocator *rpc.StreamIDAllocator // assigns unique message stream IDs for createStream mediaLogger *MediaLogger // tracks audio/video packet statistics codecDetector *media.CodecDetector // identifies audio/video codecs on first packets + role string // "publisher" or "subscriber" — set by OnPublish/OnPlay handlers } // attachCommandHandling installs a dispatcher-backed message handler on the // provided connection. Safe to call immediately after Accept returns. -func attachCommandHandling(c *iconn.Connection, reg *Registry, cfg *Config, log *slog.Logger, destMgr *relay.DestinationManager, srv ...*Server) { +func attachCommandHandling(c *iconn.Connection, reg *Registry, cfg *Config, log *slog.Logger, destMgr *relay.DestinationManager, srv *Server) { if c == nil || reg == nil || cfg == nil { return } @@ -51,7 +52,67 @@ func attachCommandHandling(c *iconn.Connection, reg *Registry, cfg *Config, log mediaLogger: NewMediaLogger(c.ID(), log, 30*time.Second), codecDetector: &media.CodecDetector{}, } + // Install disconnect handler — fires when readLoop exits for any reason. + c.SetDisconnectHandler(func() { + // 1. Stop media logger (prevents goroutine + ticker leak) + st.mediaLogger.Stop() + // Compute session duration for hook data. + durationSec := time.Since(c.AcceptedAt()).Seconds() + + // 2. Publisher cleanup: close recorder, unregister publisher, fire hook + if st.streamKey != "" && st.role == "publisher" { + stream := reg.GetStream(st.streamKey) + if stream != nil { + // Close recorder under lock (concurrent with cleanupAllRecorders) + stream.mu.Lock() + if stream.Recorder != nil { + if err := stream.Recorder.Close(); err != nil { + log.Error("recorder close error on disconnect", "error", err, "stream_key", st.streamKey) + } + stream.Recorder = nil + } + stream.mu.Unlock() + // Unregister publisher (allows stream key reuse by new publisher) + PublisherDisconnected(reg, st.streamKey, c) + } + audioPkts, videoPkts, totalBytes, audioCodec, videoCodec := st.mediaLogger.GetStats() + srv.triggerHookEvent(hooks.EventPublishStop, c.ID(), st.streamKey, map[string]interface{}{ + "audio_packets": audioPkts, + "video_packets": videoPkts, + "total_bytes": totalBytes, + "audio_codec": audioCodec, + "video_codec": videoCodec, + "duration_sec": durationSec, + }) + } + + // 3. Subscriber cleanup: unregister subscriber, fire hook + if st.streamKey != "" && st.role == "subscriber" { + SubscriberDisconnected(reg, st.streamKey, c) + srv.triggerHookEvent(hooks.EventPlayStop, c.ID(), st.streamKey, map[string]interface{}{ + "duration_sec": durationSec, + }) + // Fire subscriber count change after removal + stream := reg.GetStream(st.streamKey) + if stream != nil { + srv.triggerHookEvent(hooks.EventSubscriberCount, c.ID(), st.streamKey, map[string]interface{}{ + "count": stream.SubscriberCount(), + }) + } + } + + // 4. Remove from server connection tracking (fixes memory leak) + srv.RemoveConnection(c.ID()) + + // 5. Fire connection close hook + srv.triggerHookEvent(hooks.EventConnectionClose, c.ID(), st.streamKey, map[string]interface{}{ + "role": st.role, + "duration_sec": durationSec, + }) + + log.Info("connection disconnected", "conn_id", c.ID(), "stream_key", st.streamKey, "role", st.role) + }) d := rpc.NewDispatcher(func() string { return st.app }) d.OnConnect = func(cc *rpc.ConnectCommand, msg *chunk.Message) error { @@ -93,7 +154,7 @@ func attachCommandHandling(c *iconn.Connection, reg *Registry, cfg *Config, log d.OnPublish = func(pc *rpc.PublishCommand, msg *chunk.Message) error { // Validate auth token before allowing publish. - if rejected := authenticateRequest(cfg, c, st, msg, "publish", pc.PublishingName, pc.StreamKey, pc.QueryParams, log, srv...); rejected { + if rejected := authenticateRequest(cfg, c, st, msg, "publish", pc.PublishingName, pc.StreamKey, pc.QueryParams, log, srv); rejected { return nil } @@ -105,14 +166,13 @@ func attachCommandHandling(c *iconn.Connection, reg *Registry, cfg *Config, log // Track stream key for this connection st.streamKey = pc.StreamKey + st.role = "publisher" // Trigger publish start hook event - if len(srv) > 0 && srv[0] != nil { - srv[0].triggerHookEvent(hooks.EventPublishStart, c.ID(), pc.StreamKey, map[string]interface{}{ - "app": st.app, - "publishing_name": pc.PublishingName, - }) - } + srv.triggerHookEvent(hooks.EventPublishStart, c.ID(), pc.StreamKey, map[string]interface{}{ + "app": st.app, + "publishing_name": pc.PublishingName, + }) // Initialize recorder if recording is enabled if cfg.RecordAll { @@ -131,7 +191,7 @@ func attachCommandHandling(c *iconn.Connection, reg *Registry, cfg *Config, log d.OnPlay = func(pl *rpc.PlayCommand, msg *chunk.Message) error { // Validate auth token before allowing play. - if rejected := authenticateRequest(cfg, c, st, msg, "play", pl.StreamName, pl.StreamKey, pl.QueryParams, log, srv...); rejected { + if rejected := authenticateRequest(cfg, c, st, msg, "play", pl.StreamName, pl.StreamKey, pl.QueryParams, log, srv); rejected { return nil } @@ -143,11 +203,17 @@ func attachCommandHandling(c *iconn.Connection, reg *Registry, cfg *Config, log // Track stream key for this connection st.streamKey = pl.StreamKey + st.role = "subscriber" // Trigger play start hook event - if len(srv) > 0 && srv[0] != nil { - srv[0].triggerHookEvent(hooks.EventPlayStart, c.ID(), pl.StreamKey, map[string]interface{}{ - "app": st.app, + srv.triggerHookEvent(hooks.EventPlayStart, c.ID(), pl.StreamKey, map[string]interface{}{ + "app": st.app, + }) + // Fire subscriber count change after addition + stream := reg.GetStream(pl.StreamKey) + if stream != nil { + srv.triggerHookEvent(hooks.EventSubscriberCount, c.ID(), pl.StreamKey, map[string]interface{}{ + "count": stream.SubscriberCount(), }) } @@ -187,7 +253,7 @@ func authenticateRequest( streamKey string, queryParams map[string]string, log *slog.Logger, - srv ...*Server, + srv *Server, ) bool { if cfg.AuthValidator == nil { return false // no auth configured — allow @@ -224,12 +290,10 @@ func authenticateRequest( errStatus, _ := buildOnStatus(msg.MessageStreamID, streamKey, statusCode, "Authentication failed.") _ = c.SendMessage(errStatus) - if len(srv) > 0 && srv[0] != nil { - srv[0].triggerHookEvent(hooks.EventAuthFailed, c.ID(), streamKey, map[string]interface{}{ - "action": action, - "error": err.Error(), - }) - } + srv.triggerHookEvent(hooks.EventAuthFailed, c.ID(), streamKey, map[string]interface{}{ + "action": action, + "error": err.Error(), + }) _ = c.Close() return true // rejected diff --git a/internal/rtmp/server/hooks/events.go b/internal/rtmp/server/hooks/events.go index d3fcbde..5aeb6f0 100644 --- a/internal/rtmp/server/hooks/events.go +++ b/internal/rtmp/server/hooks/events.go @@ -35,6 +35,9 @@ const ( // Media events EventCodecDetected EventType = "codec_detected" + // Analytics events + EventSubscriberCount EventType = "subscriber_count" + // Authentication events EventAuthFailed EventType = "auth_failed" ) diff --git a/internal/rtmp/server/registry_test.go b/internal/rtmp/server/registry_test.go index 5d7e0df..eb0b87e 100644 --- a/internal/rtmp/server/registry_test.go +++ b/internal/rtmp/server/registry_test.go @@ -10,8 +10,10 @@ package server import ( + "io" "testing" + "github.com/alxayo/go-rtmp/internal/logger" "github.com/alxayo/go-rtmp/internal/rtmp/chunk" "github.com/alxayo/go-rtmp/internal/rtmp/media" ) @@ -80,3 +82,153 @@ func TestRegistryDelete(t *testing.T) { t.Fatalf("expected second delete to be false") } } + +// TestStreamCodecCaching verifies Set/Get for audio and video codec names. +func TestStreamCodecCaching(t *testing.T) { + r := NewRegistry() + s, _ := r.CreateStream("app/codec_test") + + // Initially empty + if s.GetAudioCodec() != "" { + t.Fatalf("expected empty audio codec, got %q", s.GetAudioCodec()) + } + if s.GetVideoCodec() != "" { + t.Fatalf("expected empty video codec, got %q", s.GetVideoCodec()) + } + + s.SetAudioCodec("AAC") + s.SetVideoCodec("H264") + + if s.GetAudioCodec() != "AAC" { + t.Fatalf("expected AAC, got %q", s.GetAudioCodec()) + } + if s.GetVideoCodec() != "H264" { + t.Fatalf("expected H264, got %q", s.GetVideoCodec()) + } +} + +// identifiableSubscriber is a Subscriber with distinct identity for pointer comparison. +type identifiableSubscriber struct { + id int +} + +func (s *identifiableSubscriber) SendMessage(_ *chunk.Message) error { return nil } + +var _ media.Subscriber = (*identifiableSubscriber)(nil) + +// TestStreamRemoveSubscriber verifies that removing a subscriber decrements +// the count and that removing a non-existent subscriber is a no-op. +func TestStreamRemoveSubscriber(t *testing.T) { + r := NewRegistry() + s, _ := r.CreateStream("app/unsub_test") + + sub1 := &identifiableSubscriber{id: 1} + sub2 := &identifiableSubscriber{id: 2} + s.AddSubscriber(sub1) + s.AddSubscriber(sub2) + if s.SubscriberCount() != 2 { + t.Fatalf("expected 2 subscribers, got %d", s.SubscriberCount()) + } + + s.RemoveSubscriber(sub1) + if s.SubscriberCount() != 1 { + t.Fatalf("expected 1 subscriber after remove, got %d", s.SubscriberCount()) + } + + // Remove again (no-op) + s.RemoveSubscriber(sub1) + if s.SubscriberCount() != 1 { + t.Fatalf("expected 1 subscriber after duplicate remove, got %d", s.SubscriberCount()) + } +} + +// capturingSubscriber records messages for assertion. +type capturingSubscriber struct { + messages []*chunk.Message +} + +func (c *capturingSubscriber) SendMessage(m *chunk.Message) error { + c.messages = append(c.messages, m) + return nil +} + +var _ media.Subscriber = (*capturingSubscriber)(nil) + +// TestBroadcastMessage_RelaysToSubscribers verifies that BroadcastMessage +// delivers a copy of the message to each subscriber. +func TestBroadcastMessage_RelaysToSubscribers(t *testing.T) { + logger.UseWriter(io.Discard) + r := NewRegistry() + s, _ := r.CreateStream("app/broadcast_test") + + sub1 := &capturingSubscriber{} + sub2 := &capturingSubscriber{} + s.AddSubscriber(sub1) + s.AddSubscriber(sub2) + + msg := &chunk.Message{ + CSID: 6, TypeID: 9, Timestamp: 100, + MessageStreamID: 1, MessageLength: 3, + Payload: []byte{0x17, 0x01, 0xFF}, + } + s.BroadcastMessage(nil, msg, logger.Logger()) + + if len(sub1.messages) != 1 { + t.Fatalf("sub1: expected 1 message, got %d", len(sub1.messages)) + } + if len(sub2.messages) != 1 { + t.Fatalf("sub2: expected 1 message, got %d", len(sub2.messages)) + } + + // Verify payload is cloned (not shared) + sub1.messages[0].Payload[0] = 0x00 + if msg.Payload[0] == 0x00 { + t.Fatal("subscriber message payload shares memory with original") + } +} + +// TestBroadcastMessage_CachesVideoSequenceHeader verifies that a video +// sequence header (TypeID=9, avc_packet_type=0) is cached on the stream. +func TestBroadcastMessage_CachesVideoSequenceHeader(t *testing.T) { + logger.UseWriter(io.Discard) + r := NewRegistry() + s, _ := r.CreateStream("app/seqhdr_test") + + // AVC sequence header: keyframe(0x17) + avc_packet_type=0 + seqHdr := &chunk.Message{ + CSID: 6, TypeID: 9, Timestamp: 0, + MessageStreamID: 1, MessageLength: 4, + Payload: []byte{0x17, 0x00, 0x01, 0x02}, + } + s.BroadcastMessage(nil, seqHdr, logger.Logger()) + + if s.VideoSequenceHeader == nil { + t.Fatal("expected video sequence header to be cached") + } + if len(s.VideoSequenceHeader.Payload) != 4 { + t.Fatalf("expected 4-byte payload, got %d", len(s.VideoSequenceHeader.Payload)) + } +} + +// TestBroadcastMessage_CachesAudioSequenceHeader verifies that an AAC +// sequence header (TypeID=8, 0xAF 0x00) is cached on the stream. +func TestBroadcastMessage_CachesAudioSequenceHeader(t *testing.T) { + logger.UseWriter(io.Discard) + r := NewRegistry() + s, _ := r.CreateStream("app/audio_seqhdr") + + // AAC sequence header: sound_format=10(AAC), aac_packet_type=0 + seqHdr := &chunk.Message{ + CSID: 4, TypeID: 8, Timestamp: 0, + MessageStreamID: 1, MessageLength: 4, + Payload: []byte{0xAF, 0x00, 0x12, 0x10}, + } + s.BroadcastMessage(nil, seqHdr, logger.Logger()) + + if s.AudioSequenceHeader == nil { + t.Fatal("expected audio sequence header to be cached") + } + if len(s.AudioSequenceHeader.Payload) != 4 { + t.Fatalf("expected 4-byte payload, got %d", len(s.AudioSequenceHeader.Payload)) + } +} diff --git a/internal/rtmp/server/server.go b/internal/rtmp/server/server.go index 9e94800..c133ae0 100644 --- a/internal/rtmp/server/server.go +++ b/internal/rtmp/server/server.go @@ -207,15 +207,19 @@ func (s *Server) Stop() error { // Close all connections and clean up recorders. s.mu.Lock() - for id, c := range s.conns { - s.triggerHookEvent(hooks.EventConnectionClose, id, "", map[string]interface{}{ - "reason": "server_shutdown", - }) - _ = c.Close() + connsToClose := make([]*iconn.Connection, 0, len(s.conns)) + for _, c := range s.conns { + connsToClose = append(connsToClose, c) } clear(s.conns) s.mu.Unlock() + // Close connections outside the lock to avoid deadlock with + // disconnect handler's RemoveConnection call. + for _, c := range connsToClose { + _ = c.Close() + } + // Clean up all active recorders s.cleanupAllRecorders() @@ -255,6 +259,14 @@ func (s *Server) ConnectionCount() int { return len(s.conns) } +// RemoveConnection removes a single connection from the tracking map. +// Called by the disconnect handler when a connection's readLoop exits. +func (s *Server) RemoveConnection(id string) { + s.mu.Lock() + delete(s.conns, id) + s.mu.Unlock() +} + // singleConnListener wraps a single pre-accepted net.Conn as a net.Listener. // This adapter exists because conn.Accept() expects a net.Listener (for the // handshake flow), but the server's accept loop already called l.Accept() to diff --git a/specs/005-error-handling-benchmarks/spec.md b/specs/005-error-handling-benchmarks/spec.md new file mode 100644 index 0000000..4fe389c --- /dev/null +++ b/specs/005-error-handling-benchmarks/spec.md @@ -0,0 +1,808 @@ +# Feature 005: Enhanced Error Handling, Graceful Connection Cleanup & Performance Benchmarks + +**Feature**: 005-error-handling-benchmarks +**Status**: Draft +**Date**: 2026-03-03 +**Branch**: `feature/005-error-handling-benchmarks` + +## Overview + +Two complementary improvements to the RTMP server: + +1. **Enhanced error handling and graceful connection cleanup** — wire the existing cleanup functions (`PublisherDisconnected`, `SubscriberDisconnected`, `MediaLogger.Stop`) into the connection lifecycle via a disconnect callback, and add TCP deadlines to prevent zombie connections. +2. **Performance benchmarks for chunk and AMF0 encode/decode** — add comprehensive `Benchmark*` functions for the two hottest code paths in the server (chunk framing and AMF0 serialization). + +### Design Constraints + +- **Zero external dependencies** (stdlib only — consistent with the rest of the project) +- **Backward-compatible**: no public API changes; existing behavior preserved for healthy connections +- **Minimal scope**: only the changes listed below; no refactoring beyond what's required + +--- + +## Part 1: Enhanced Error Handling & Graceful Connection Cleanup + +### Problem Statement + +The codebase has well-structured domain-specific error types (`ProtocolError`, `HandshakeError`, `ChunkError`, `AMFError`, `TimeoutError`) and cleanup functions (`PublisherDisconnected()`, `SubscriberDisconnected()`, `MediaLogger.Stop()`). However, **none of the cleanup functions are called during the normal connection lifecycle**. When a client disconnects, the `readLoop` goroutine exits silently and no cleanup fires. + +This causes: +- **Memory leak**: connections accumulate in `s.conns` forever (only cleared on server shutdown) +- **Stream key lockout**: stale publisher references block stream reuse (`ErrPublisherExists`) +- **Goroutine leak**: `MediaLogger` goroutine + `time.Ticker` per connection, never stopped +- **Subscriber waste**: `BroadcastMessage` keeps sending to dead subscribers (timeout per packet) +- **Unclosed recorders**: FLV files left unflushed/unclosed until server shutdown +- **Zombie connections**: no TCP deadlines means stuck connections hold resources indefinitely + +### Issues Identified + +| # | Severity | Issue | Location | +|---|----------|-------|----------| +| 1 | **Critical** | Connections never removed from `s.conns` on normal disconnect — memory leak | `server.go` L175 (`s.conns[c.ID()] = c`), only `clear(s.conns)` in `Stop()` | +| 2 | **Critical** | `PublisherDisconnected()` never called — stale publishers block stream reuse | `publish_handler.go` L65-78 (function exists, never invoked) | +| 3 | **Critical** | `SubscriberDisconnected()` never called — stale subscribers accumulate | `play_handler.go` L141-151 (function exists, never invoked) | +| 4 | **Critical** | `MediaLogger.Stop()` never called — goroutine + ticker leak per connection | `command_integration.go` L58 (`NewMediaLogger` creates goroutine; `Stop()` at `media_logger.go` L156 never called) | +| 5 | **High** | No disconnect cascade — `readLoop` exit doesn't trigger any cleanup | `conn.go` L115-135 (goroutine exits, no defer chain) | +| 6 | **High** | Recorder not closed on publisher disconnect (only on server shutdown) | `server.go` L283-307 (`cleanupAllRecorders` only runs in `Stop()`) | +| 7 | **High** | No `net.Conn` read/write deadlines — stuck TCP connections hold resources forever | `conn.go` (no `SetReadDeadline`/`SetWriteDeadline` calls anywhere) | +| 8 | **Medium** | Relay `Destination.Connect()` leaks client on `Connect()` and `Publish()` failure | `destination.go` L130-145 (client created by factory, never closed on error path) | +| 9 | **Medium** | `main.go` doesn't force-exit on shutdown timeout — falls through to end of `main()` | `main.go` L104 (`log.Error` only, no `os.Exit(1)`) | +| 10 | **Low** | `Session` type is entirely dead code — field in Connection struct never initialized; `session.go` + `session_test.go` completely unused | `conn.go` L55, `session.go`, `session_test.go` | + +### Root Cause + +Issues #1–6 all stem from a single missing feature: **a connection `onDisconnect` callback** that fires when the `readLoop` exits. + +### Code Verification Summary + +The following API facts were verified against actual source (not assumed): + +| Item | Actual Signature / Location | +|------|----------------------------| +| `PublisherDisconnected` | Package-level func: `PublisherDisconnected(reg *Registry, streamKey string, pub sender)` — `publish_handler.go` L65 | +| `SubscriberDisconnected` | Package-level func: `SubscriberDisconnected(reg *Registry, streamKey string, sub sender)` — `play_handler.go` L141 | +| `MediaLogger.Stop()` | Method on `*MediaLogger`: uses `sync.Once`, closes `stopChan`, stops ticker — `media_logger.go` L156-161 | +| `Registry.GetStream` | Method: `(r *Registry) GetStream(key string) *Stream` — `registry.go` L101 | +| `Stream.Recorder` | Field: `Recorder *media.Recorder` — `registry.go` L54; protected by `stream.mu sync.RWMutex` | +| `attachCommandHandling` | `func attachCommandHandling(c *iconn.Connection, reg *Registry, cfg *Config, log *slog.Logger, destMgr *relay.DestinationManager, srv ...*Server)` — `command_integration.go` L52 | +| `sender` interface | `type sender interface { SendMessage(*chunk.Message) error }` — `publish_handler.go` L22; `*iconn.Connection` satisfies this | +| `net.Conn` | Already has `SetReadDeadline(time.Time) error` and `SetWriteDeadline(time.Time) error` — no type assertion needed | +| `Session` usage | `session *Session` field in Connection struct (L55); `NewSession()` only called in `session_test.go`; zero production references | + +--- + +### Implementation Plan + +#### Phase 1: Connection Disconnect Callback (resolves #1–6) + +##### Step 1.1 — Add disconnect handler to `Connection` + +**File**: `internal/rtmp/conn/conn.go` + +**Change A** — Add `onDisconnect` field to `Connection` struct (after `onMessage` field, L57): + +```go +type Connection struct { + // ... existing fields ... + onMessage func(*chunk.Message) // test hook / dispatcher injection + onDisconnect func() // called once when readLoop exits (cleanup cascade) +} +``` + +**Change B** — Add `SetDisconnectHandler` method (after `SetMessageHandler`, ~L76): + +```go +// SetDisconnectHandler installs a callback invoked once when the readLoop +// exits (for any reason: EOF, error, context cancel). MUST be called before Start(). +func (c *Connection) SetDisconnectHandler(fn func()) { c.onDisconnect = fn } +``` + +**Change C** — Modify `startReadLoop()` (L115-135) to add a defer chain that cancels context and invokes the disconnect handler. Also add handling for `net.Error` timeout errors: + +```go +// startReadLoop begins the dechunk → dispatch loop. +func (c *Connection) startReadLoop() { + c.wg.Add(1) + go func() { + defer c.wg.Done() + defer func() { + // Cleanup cascade: cancel context first (stops writeLoop via ctx.Done()), + // then invoke the disconnect handler for higher-level cleanup. + // cancel() is idempotent — safe if Close() already called it. + c.cancel() + if c.onDisconnect != nil { + c.onDisconnect() + } + }() + r := chunk.NewReader(c.netConn, c.readChunkSize) + for { + select { + case <-c.ctx.Done(): + return + default: + } + msg, err := r.ReadMessage() + if err != nil { + // Normal disconnect paths — exit silently + if errors.Is(err, context.Canceled) || errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) { + return + } + // Timeout from read deadline — connection is dead + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + c.log.Warn("readLoop timeout (zombie connection reaped)") + return + } + c.log.Error("readLoop error", "error", err) + return + } + if c.onMessage != nil { + c.onMessage(msg) + } + } + }() +} +``` + +**Defer ordering rationale**: Go defers execute LIFO. `wg.Done()` is the outermost defer (registered first), so it runs *last*. The cleanup defer runs *before* `wg.Done()`, ensuring all cleanup completes before `Close()` → `wg.Wait()` returns. + +**Idempotency**: `c.cancel()` is called both in the defer chain AND in `Close()`. This is safe because `context.CancelFunc` is documented as idempotent. + +##### Step 1.2 — Add `RemoveConnection` to `Server` + +**File**: `internal/rtmp/server/server.go` + +Add method after `ConnectionCount()` (~L234): + +```go +// RemoveConnection removes a single connection from the tracking map. +// Called by the disconnect handler when a connection's readLoop exits. +func (s *Server) RemoveConnection(id string) { + s.mu.Lock() + delete(s.conns, id) + s.mu.Unlock() +} +``` + +##### Step 1.3 — Add `role` field to `commandState` + +**File**: `internal/rtmp/server/command_integration.go` + +Add `role` field to the `commandState` struct (after `codecDetector`, ~L40): + +```go +type commandState struct { + app string // application name from the connect command (e.g. "live") + streamKey string // current stream key (e.g. "live/mystream") + connectParams map[string]interface{} // extra fields from connect command object (for auth context) + allocator *rpc.StreamIDAllocator // assigns unique message stream IDs for createStream + mediaLogger *MediaLogger // tracks audio/video packet statistics + codecDetector *media.CodecDetector // identifies audio/video codecs on first packets + role string // "publisher" or "subscriber" — set by OnPublish/OnPlay handlers +} +``` + +##### Step 1.4 — Set `role` in OnPublish handler + +**File**: `internal/rtmp/server/command_integration.go` + +In the `d.OnPublish` handler, add `st.role = "publisher"` after `st.streamKey = pc.StreamKey` (~L106): + +```go + // Track stream key for this connection + st.streamKey = pc.StreamKey + st.role = "publisher" +``` + +##### Step 1.5 — Set `role` in OnPlay handler + +**File**: `internal/rtmp/server/command_integration.go` + +In the `d.OnPlay` handler, add `st.role = "subscriber"` after `st.streamKey = pl.StreamKey` (~L137): + +```go + // Track stream key for this connection + st.streamKey = pl.StreamKey + st.role = "subscriber" +``` + +##### Step 1.6 — Install disconnect handler in `attachCommandHandling` + +**File**: `internal/rtmp/server/command_integration.go` + +After creating `commandState` and before `d := rpc.NewDispatcher(...)` (~L59), install the disconnect handler. + +**Important API notes for implementer**: +- `PublisherDisconnected` and `SubscriberDisconnected` are **package-level functions** (not methods on `Registry`). Call: `PublisherDisconnected(reg, streamKey, c)`. +- `Registry.GetStream(key)` returns `*Stream` (not `Registry.Get()`). +- `stream.Recorder` access must be protected by `stream.mu.Lock()` (concurrent access with `cleanupAllRecorders` during server shutdown). +- `srv` is variadic `...*Server` — check `len(srv) > 0 && srv[0] != nil`. +- `*iconn.Connection` satisfies the `sender` interface (has `SendMessage(*chunk.Message) error`). + +```go + // Install disconnect handler — fires when readLoop exits for any reason. + // Captures: st, reg, c, log, srv (all from enclosing scope via closure). + c.SetDisconnectHandler(func() { + // 1. Stop media logger (prevents goroutine + ticker leak) + st.mediaLogger.Stop() + + // 2. Publisher cleanup: close recorder, unregister publisher, fire hook + if st.streamKey != "" && st.role == "publisher" { + stream := reg.GetStream(st.streamKey) + if stream != nil { + // Close recorder under lock (concurrent with cleanupAllRecorders) + stream.mu.Lock() + if stream.Recorder != nil { + if err := stream.Recorder.Close(); err != nil { + log.Error("recorder close error on disconnect", "error", err, "stream_key", st.streamKey) + } + stream.Recorder = nil + } + stream.mu.Unlock() + // Unregister publisher (allows stream key reuse by new publisher) + PublisherDisconnected(reg, st.streamKey, c) + } + if len(srv) > 0 && srv[0] != nil { + srv[0].triggerHookEvent(hooks.EventPublishStop, c.ID(), st.streamKey, nil) + } + } + + // 3. Subscriber cleanup: unregister subscriber, fire hook + if st.streamKey != "" && st.role == "subscriber" { + SubscriberDisconnected(reg, st.streamKey, c) + if len(srv) > 0 && srv[0] != nil { + srv[0].triggerHookEvent(hooks.EventPlayStop, c.ID(), st.streamKey, nil) + } + } + + // 4. Remove from server connection tracking (fixes memory leak) + if len(srv) > 0 && srv[0] != nil { + srv[0].RemoveConnection(c.ID()) + } + + // 5. Fire connection close hook + if len(srv) > 0 && srv[0] != nil { + srv[0].triggerHookEvent(hooks.EventConnectionClose, c.ID(), "", nil) + } + + log.Info("connection disconnected", "conn_id", c.ID(), "stream_key", st.streamKey, "role", st.role) + }) +``` + +**Shutdown race safety**: During server shutdown, `Stop()` calls `c.Close()` for each connection, then `cleanupAllRecorders()`. The disconnect handler may also try to close recorders. This is safe because: +1. Both paths lock `stream.mu` before accessing `stream.Recorder`. +2. The first path to close sets `stream.Recorder = nil`. +3. The second path sees `nil` and skips — no double-close. + +#### Phase 2: TCP Deadlines (resolves #7) + +**File**: `internal/rtmp/conn/conn.go` + +##### Step 2.1 — Add deadline constants + +Add after the existing `outboundQueueSize` constant (~L31): + +```go +const ( + sendTimeout = 200 * time.Millisecond + outboundQueueSize = 100 + + // TCP deadlines for zombie connection detection. + // readTimeout is generous to accommodate idle subscribers that receive + // no data when no publisher is active. Publishers send data continuously + // (~30fps) so any timeout > a few seconds catches dead peers. + readTimeout = 90 * time.Second + // writeTimeout catches dead TCP peers that never acknowledge writes. + writeTimeout = 30 * time.Second +) +``` + +##### Step 2.2 — Read deadline in `readLoop` + +In `startReadLoop()`, before each `ReadMessage()` call, set a read deadline. `net.Conn` already has `SetReadDeadline` — no type assertion is needed: + +```go + _ = c.netConn.SetReadDeadline(time.Now().Add(readTimeout)) + msg, err := r.ReadMessage() +``` + +The timeout error is already handled in Step 1.1 via `errors.As(err, &netErr) && netErr.Timeout()`. + +##### Step 2.3 — Write deadline in `writeLoop` + +In `startWriteLoop()` (L138), before each `WriteMessage()` call, set a write deadline: + +```go + case msg, ok := <-c.outboundQueue: + if !ok { + return + } + currentChunkSize := atomic.LoadUint32(&c.writeChunkSize) + w.SetChunkSize(currentChunkSize) + _ = c.netConn.SetWriteDeadline(time.Now().Add(writeTimeout)) + if err := w.WriteMessage(msg); err != nil { + c.log.Error("writeLoop write failed", "error", err) + return + } +``` + +#### Phase 3: Minor Fixes (resolves #8, #9, #10) + +##### Step 3.1 — Fix relay client leak in `Destination.Connect()` + +**File**: `internal/rtmp/relay/destination.go` + +There are **two** leak paths in `Connect()` (L110-149): + +**Leak 1**: After `d.clientFactory(d.URL)` creates a client, if `client.Connect()` fails, the client is not closed. The factory may allocate TCP resources. + +**Leak 2**: After `client.Connect()` succeeds, if `client.Publish()` fails, the connected client is not closed. + +Fix both by closing the client on each error path: + +```go +func (d *Destination) Connect() error { + d.mu.Lock() + defer d.mu.Unlock() + + if d.Status == StatusConnected { + return nil + } + + d.Status = StatusConnecting + d.logger.Info("Connecting to destination") + + client, err := d.clientFactory(d.URL) + if err != nil { + d.Status = StatusError + d.LastError = err + d.logger.Error("Failed to create RTMP client", "error", err) + return fmt.Errorf("create client: %w", err) + } + + if err := client.Connect(); err != nil { + _ = client.Close() // prevent leak: factory may have allocated TCP resources + d.Status = StatusError + d.LastError = err + d.logger.Error("Failed to connect RTMP client", "error", err) + return fmt.Errorf("client connect: %w", err) + } + + if err := client.Publish(); err != nil { + _ = client.Close() // prevent leak: connection established but publish failed + d.Status = StatusError + d.LastError = err + d.logger.Error("Failed to publish to destination", "error", err) + return fmt.Errorf("client publish: %w", err) + } + + d.Client = client + d.Status = StatusConnected + d.Metrics.ConnectTime = time.Now() + d.LastError = nil + d.logger.Info("Connected to destination") + return nil +} +``` + +##### Step 3.2 — Force exit on shutdown timeout + +**File**: `cmd/rtmp-server/main.go` + +At L104, after `log.Error("forced exit after timeout")`, add `os.Exit(1)`: + +```go + case <-shutdownCtx.Done(): + log.Error("forced exit after timeout") + os.Exit(1) +``` + +##### Step 3.3 — Remove dead `Session` code + +**Three files to modify:** + +**File A**: `internal/rtmp/conn/conn.go` — Remove the `session *Session` field from the `Connection` struct (L55). The field is never initialized or assigned anywhere in production code. + +**File B**: `internal/rtmp/conn/session.go` — **Delete entire file**. Contains `Session` struct, `SessionState` enum, `NewSession()`, and methods. None are used in production — `NewSession()` is only called in `session_test.go`. + +**File C**: `internal/rtmp/conn/session_test.go` — **Delete entire file**. Tests for the dead `Session` type. + +--- + +### Behavioral Summary (Part 1) + +**Before**: Client connects → streams → disconnects → `readLoop` exits silently → connection stays in `s.conns` forever, publisher reference blocks stream reuse, `MediaLogger` goroutine ticks every 30s forever, FLV recorder left unclosed. + +**After**: Client connects → streams → disconnects → `readLoop` defer fires → context canceled (stops `writeLoop`) → `onDisconnect` callback runs → MediaLogger stopped, publisher/subscriber unregistered, recorder closed, connection removed from tracking, hook events fired → all resources freed within seconds. + +**Forced-close path** (server shutdown): `Stop()` → `c.Close()` → `cancel()` triggers readLoop exit → defer fires `onDisconnect` → cleanup runs → `cleanupAllRecorders()` runs (sees `Recorder == nil` for already-cleaned connections, skips) → safe. + +--- + +## Part 2: Performance Benchmarks + +### Problem Statement + +The chunk package — the single hottest code path in the server (every byte of audio/video flows through `Reader.ReadMessage()` and `Writer.WriteMessage()`) — has **zero benchmarks**. The AMF package has 8 benchmarks covering only primitive types (Number, Boolean, String, Null), but is missing benchmarks for Objects, Arrays, and the top-level `EncodeAll`/`DecodeAll` APIs used on every RTMP command. + +Without benchmarks: +- Performance regressions from code changes go undetected +- Optimization efforts (e.g., the existing `bufpool` package) have no measurable before/after data +- There's no baseline to guide chunk size tuning or buffer allocation strategy + +### Current Benchmark Coverage + +| Package | Public APIs | Benchmarked | Coverage | +|---------|:-----------:|:-----------:|:--------:| +| `amf` | 18 functions | 8 (primitives only) | 44% | +| `chunk` | 13 functions/methods | **0** | **0%** | + +### Existing AMF Benchmarks (8) + +- `BenchmarkEncodeNull` / `BenchmarkDecodeNull` — `null_test.go` +- `BenchmarkEncodeNumber` / `BenchmarkDecodeNumber` — `number_test.go` +- `BenchmarkEncodeBoolean` / `BenchmarkDecodeBoolean` — `boolean_test.go` +- `BenchmarkEncodeString` / `BenchmarkDecodeString` — `string_test.go` + +### Existing Test Helpers Available for Benchmarks + +| Helper | File | Description | +|--------|------|-------------| +| `buildMessageBytes(t, csid, ts, msgType, msid, payload)` | `reader_test.go` L31 | Constructs FMT0 single-chunk message bytes; uses `EncodeChunkHeader` | +| `loadGoldenChunk(t, name)` | `reader_test.go` L24 | Loads golden binary test data from `tests/golden/` | +| `loadGoldenHeader(t, name, headerLen)` | `writer_test.go` L26 | Loads first N bytes from golden header file | +| `simpleWriter` struct | `writer_test.go` L151 | Wraps `bytes.Buffer` as `io.Writer` for test captures | + +Note: `buildMessageBytes` uses `*testing.T` (not `*testing.B`). For benchmarks, we need a variant that accepts `testing.TB` or build data directly without a test helper. + +--- + +### Implementation Plan + +All benchmarks will call `b.ReportAllocs()` to track allocation counts, providing data for future optimization. + +#### File 1: `internal/rtmp/chunk/reader_test.go` — Chunk Read Benchmarks + +| Benchmark | Description | Setup | +|-----------|-------------|-------| +| `BenchmarkParseChunkHeader_FMT0` | Full 12-byte header parsing via `ParseChunkHeader()` | Pre-encode FMT0 header bytes using `EncodeChunkHeader`; wrap in `bytes.NewReader` per iteration | +| `BenchmarkParseChunkHeader_FMT1` | Delta 8-byte header (common for audio continuation) | Pre-encode FMT1 header bytes with a `prev` header; wrap in `bytes.NewReader` per iteration | +| `BenchmarkParseChunkHeader_FMT3` | Minimal 1-byte header (most common in media streams) | Pre-encode FMT3 header bytes with a `prev` header; wrap in `bytes.NewReader` per iteration | +| `BenchmarkReaderReadMessage_SingleChunk` | Message that fits in one chunk via `Reader.ReadMessage()` | Build FMT0 message bytes (100-byte audio payload, CSID=4, TypeID=8); create new `Reader` per iteration | +| `BenchmarkReaderReadMessage_MultiChunk` | Message spanning multiple chunks via `Reader.ReadMessage()` | Write a 4096-byte video message (TypeID=9) using `Writer` at 128-byte chunk size; read back per iteration | + +Implementation approach: +```go +func BenchmarkParseChunkHeader_FMT0(b *testing.B) { + b.ReportAllocs() + h := &ChunkHeader{FMT: 0, CSID: 4, Timestamp: 1000, MessageLength: 100, MessageTypeID: 8, MessageStreamID: 1} + raw, _ := EncodeChunkHeader(h, nil) + b.ResetTimer() + for i := 0; i < b.N; i++ { + r := bytes.NewReader(raw) + _, _ = ParseChunkHeader(r, nil) + } +} + +func BenchmarkReaderReadMessage_MultiChunk(b *testing.B) { + b.ReportAllocs() + // Pre-fragment: write a 4096-byte message via Writer, capture bytes + payload := make([]byte, 4096) + var buf bytes.Buffer + w := NewWriter(&buf, 128) // 128-byte chunk size → 32+ chunks + msg := &Message{CSID: 6, Timestamp: 0, MessageLength: 4096, TypeID: 9, MessageStreamID: 1, Payload: payload} + _ = w.WriteMessage(msg) + data := buf.Bytes() + b.ResetTimer() + for i := 0; i < b.N; i++ { + r := NewReader(bytes.NewReader(data), 128) + _, _ = r.ReadMessage() + } +} +``` + +#### File 2: `internal/rtmp/chunk/writer_test.go` — Chunk Write Benchmarks + +| Benchmark | Description | Setup | +|-----------|-------------|-------| +| `BenchmarkEncodeChunkHeader_FMT0` | Header serialization via `EncodeChunkHeader()` | Pre-build `ChunkHeader` struct; call repeatedly | +| `BenchmarkWriterWriteMessage_SingleChunk` | Single-chunk message via `Writer.WriteMessage()` | 100-byte audio payload, write to `io.Discard`; new `Writer` per iteration to avoid FMT compression | +| `BenchmarkWriterWriteMessage_MultiChunk` | Multi-chunk message via `Writer.WriteMessage()` | 4096-byte video payload at 128-byte chunk size; write to `io.Discard` | +| `BenchmarkWriterReaderRoundTrip` | End-to-end Write→Read cycle | Write 4096-byte message to `bytes.Buffer`, read back via `Reader` | + +Implementation approach: +```go +func BenchmarkWriterWriteMessage_MultiChunk(b *testing.B) { + b.ReportAllocs() + payload := make([]byte, 4096) + msg := &Message{CSID: 6, Timestamp: 0, MessageLength: 4096, TypeID: 9, MessageStreamID: 1, Payload: payload} + b.ResetTimer() + for i := 0; i < b.N; i++ { + w := NewWriter(io.Discard, 128) + _ = w.WriteMessage(msg) + } +} + +func BenchmarkWriterReaderRoundTrip(b *testing.B) { + b.ReportAllocs() + payload := make([]byte, 4096) + msg := &Message{CSID: 6, Timestamp: 0, MessageLength: 4096, TypeID: 9, MessageStreamID: 1, Payload: payload} + b.ResetTimer() + for i := 0; i < b.N; i++ { + var buf bytes.Buffer + w := NewWriter(&buf, 128) + _ = w.WriteMessage(msg) + r := NewReader(&buf, 128) + _, _ = r.ReadMessage() + } +} +``` + +#### File 3: `internal/rtmp/amf/object_test.go` — Object Benchmarks + +| Benchmark | Description | Data | +|-----------|-------------|------| +| `BenchmarkEncodeObject` | Encode a typical RTMP connect-style object via `EncodeObject()` | `{"app":"live","type":"nonprivate","flashVer":"FMLE/3.0","tcUrl":"rtmp://localhost/live"}` | +| `BenchmarkDecodeObject` | Decode the same object from pre-encoded bytes via `DecodeObject()` | Golden bytes from encoding | + +Implementation approach: +```go +func BenchmarkEncodeObject(b *testing.B) { + b.ReportAllocs() + obj := map[string]interface{}{ + "app": "live", + "type": "nonprivate", + "flashVer": "FMLE/3.0", + "tcUrl": "rtmp://localhost/live", + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + var buf bytes.Buffer + _ = EncodeObject(&buf, obj) + } +} + +func BenchmarkDecodeObject(b *testing.B) { + b.ReportAllocs() + obj := map[string]interface{}{ + "app": "live", "type": "nonprivate", + "flashVer": "FMLE/3.0", "tcUrl": "rtmp://localhost/live", + } + var buf bytes.Buffer + _ = EncodeObject(&buf, obj) + data := buf.Bytes() + b.ResetTimer() + for i := 0; i < b.N; i++ { + r := bytes.NewReader(data) + _, _ = DecodeObject(r) + } +} +``` + +#### File 4: `internal/rtmp/amf/array_test.go` — Array Benchmarks + +| Benchmark | Description | Data | +|-----------|-------------|------| +| `BenchmarkEncodeStrictArray` | Encode a mixed-type array via `EncodeStrictArray()` | `[1.0, "test", true, nil, 42.0]` | +| `BenchmarkDecodeStrictArray` | Decode the same array from pre-encoded bytes via `DecodeStrictArray()` | Golden bytes from encoding | + +Implementation approach: +```go +func BenchmarkEncodeStrictArray(b *testing.B) { + b.ReportAllocs() + arr := []interface{}{1.0, "test", true, nil, 42.0} + b.ResetTimer() + for i := 0; i < b.N; i++ { + var buf bytes.Buffer + _ = EncodeStrictArray(&buf, arr) + } +} + +func BenchmarkDecodeStrictArray(b *testing.B) { + b.ReportAllocs() + arr := []interface{}{1.0, "test", true, nil, 42.0} + var buf bytes.Buffer + _ = EncodeStrictArray(&buf, arr) + data := buf.Bytes() + b.ResetTimer() + for i := 0; i < b.N; i++ { + r := bytes.NewReader(data) + _, _ = DecodeStrictArray(r) + } +} +``` + +#### File 5: `internal/rtmp/amf/amf_test.go` — Top-Level Codec Benchmarks + +| Benchmark | Description | Data | +|-----------|-------------|------| +| `BenchmarkEncodeAll_ConnectCommand` | Multi-value encode simulating a full connect command via `EncodeAll()` | `["connect", 1.0, {app:"live", type:"nonprivate", flashVer:"FMLE/3.0", tcUrl:"rtmp://localhost/live"}]` | +| `BenchmarkDecodeAll_ConnectCommand` | Multi-value decode of the same connect command payload via `DecodeAll()` | Pre-encoded bytes | + +Implementation approach: +```go +func BenchmarkEncodeAll_ConnectCommand(b *testing.B) { + b.ReportAllocs() + obj := map[string]interface{}{ + "app": "live", "type": "nonprivate", + "flashVer": "FMLE/3.0", "tcUrl": "rtmp://localhost/live", + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = EncodeAll("connect", 1.0, obj) + } +} + +func BenchmarkDecodeAll_ConnectCommand(b *testing.B) { + b.ReportAllocs() + obj := map[string]interface{}{ + "app": "live", "type": "nonprivate", + "flashVer": "FMLE/3.0", "tcUrl": "rtmp://localhost/live", + } + data, _ := EncodeAll("connect", 1.0, obj) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = DecodeAll(data) + } +} +``` + +--- + +### Running Benchmarks + +```bash +# All benchmarks +go test -bench . -benchmem ./internal/rtmp/chunk/ ./internal/rtmp/amf/ + +# Chunk benchmarks only +go test -bench . -benchmem ./internal/rtmp/chunk/ + +# AMF benchmarks only (including existing ones) +go test -bench . -benchmem ./internal/rtmp/amf/ + +# Compare before/after with benchstat +go test -bench . -benchmem -count=10 ./internal/rtmp/chunk/ > old.txt +# ... make changes ... +go test -bench . -benchmem -count=10 ./internal/rtmp/chunk/ > new.txt +benchstat old.txt new.txt +``` + +### Expected Benchmark Output Format + +``` +BenchmarkParseChunkHeader_FMT0-8 5000000 234 ns/op 0 B/op 0 allocs/op +BenchmarkReaderReadMessage_SingleChunk-8 2000000 612 ns/op 256 B/op 3 allocs/op +BenchmarkWriterWriteMessage_MultiChunk-8 1000000 1430 ns/op 512 B/op 5 allocs/op +BenchmarkEncodeObject-8 1000000 1102 ns/op 384 B/op 8 allocs/op +BenchmarkEncodeAll_ConnectCommand-8 500000 2340 ns/op 768 B/op 12 allocs/op +``` + +*(Numbers are illustrative; actual values depend on hardware.)* + +--- + +## File Change Summary + +| File | Part | Action | Changes | +|------|------|--------|---------| +| `internal/rtmp/conn/conn.go` | 1 | Edit | Add `onDisconnect` field, `SetDisconnectHandler()` method; modify `startReadLoop()` defer chain with cancel + disconnect handler + timeout error handling; add read deadline before `ReadMessage()`; add write deadline before `WriteMessage()` in writeLoop; add `readTimeout`/`writeTimeout` constants; remove dead `session *Session` field | +| `internal/rtmp/conn/session.go` | 1 | **Delete** | Entire file is dead code (Session struct, SessionState, NewSession, all methods) | +| `internal/rtmp/conn/session_test.go` | 1 | **Delete** | Tests for dead Session type | +| `internal/rtmp/server/server.go` | 1 | Edit | Add `RemoveConnection(id string)` method | +| `internal/rtmp/server/command_integration.go` | 1 | Edit | Add `role` field to `commandState`; set `st.role` in OnPublish/OnPlay handlers; install disconnect handler via `c.SetDisconnectHandler()` with correct API calls | +| `internal/rtmp/relay/destination.go` | 1 | Edit | Close client on both `Connect()` and `Publish()` failure paths in `Connect()` method | +| `cmd/rtmp-server/main.go` | 1 | Edit | Add `os.Exit(1)` after shutdown timeout log | +| `internal/rtmp/chunk/reader_test.go` | 2 | Edit | Add 5 chunk read benchmarks | +| `internal/rtmp/chunk/writer_test.go` | 2 | Edit | Add 4 chunk write benchmarks | +| `internal/rtmp/amf/object_test.go` | 2 | Edit | Add 2 object encode/decode benchmarks | +| `internal/rtmp/amf/array_test.go` | 2 | Edit | Add 2 array encode/decode benchmarks | +| `internal/rtmp/amf/amf_test.go` | 2 | Edit | Add 2 top-level codec benchmarks | + +**Total**: 12 files (9 edits, 2 deletes, 0 new files) + +## Testing Strategy + +### Part 1 — Error Handling & Cleanup + +#### Unit Tests (add to `internal/rtmp/conn/conn_test.go`) + +| Test | Description | +|------|-------------| +| `TestDisconnectHandler_FiresOnEOF` | Client closes connection → readLoop gets EOF → verify disconnect handler fires | +| `TestDisconnectHandler_FiresOnContextCancel` | Call `c.Close()` → context canceled → verify disconnect handler fires | +| `TestDisconnectHandler_FiresOnReadError` | Inject malformed data → readLoop gets parse error → verify handler fires | +| `TestDisconnectHandler_NilSafe` | No handler set → readLoop exits without panic | + +Implementation pattern: +```go +func TestDisconnectHandler_FiresOnEOF(t *testing.T) { + logger.UseWriter(io.Discard) + ln, _ := net.Listen("tcp", "127.0.0.1:0") + defer ln.Close() + connCh := make(chan *Connection, 1) + go func() { c, _ := Accept(ln); connCh <- c }() + client := dialAndClientHandshake(t, ln.Addr().String()) + serverConn := <-connCh + var fired atomic.Bool + serverConn.SetDisconnectHandler(func() { fired.Store(true) }) + serverConn.SetMessageHandler(func(m *chunk.Message) {}) + serverConn.Start() + client.Close() // triggers EOF in readLoop + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) && !fired.Load() { + time.Sleep(10 * time.Millisecond) + } + if !fired.Load() { + t.Fatal("disconnect handler did not fire on EOF") + } + _ = serverConn.Close() +} +``` + +#### Server-Level Tests (add to `internal/rtmp/server/` or `tests/integration/`) + +| Test | Description | +|------|-------------| +| `TestConnectionRemovedOnDisconnect` | Start server, connect client, disconnect client → `ConnectionCount()` returns 0 | +| `TestPublisherReuse` | Publish stream → disconnect → publish same stream key again → succeeds (no `ErrPublisherExists`) | +| `TestGoroutineLeakOnDisconnect` | Record `runtime.NumGoroutine()` before/after N connect+disconnect cycles → delta ≤ 2 | + +### Part 2 — Benchmarks + +- Benchmarks are self-validating (they run the code under test). +- Verify all benchmarks pass: `go test -bench . -benchmem -count=1 ./internal/rtmp/chunk/ ./internal/rtmp/amf/` +- Verify no regressions in existing tests: `go test -race ./...` + +## Verification Commands (Definition of Done) + +Run these in order as a final check: + +```bash +# 1. Compile +go build ./... + +# 2. Static analysis +go vet ./... + +# 3. Formatting +gofmt -l . # should print nothing + +# 4. All internal tests (includes deleted session tests — should still pass) +go test ./internal/... -count=1 + +# 5. Full test suite with race detector +go test -race ./... -count=1 + +# 6. Benchmarks compile and run +go test -bench . -benchmem -count=1 ./internal/rtmp/chunk/ ./internal/rtmp/amf/ +``` + +## Acceptance Criteria + +### Part 1 +- [ ] When a client disconnects normally (EOF), the connection is removed from `s.conns` +- [ ] When a publisher disconnects, `PublisherDisconnected()` is called and the stream key becomes available +- [ ] When a subscriber disconnects, `SubscriberDisconnected()` is called +- [ ] `MediaLogger.Stop()` is called on every connection disconnect (no goroutine leak) +- [ ] FLV recorders are closed when the publishing connection disconnects +- [ ] Zombie TCP connections are reaped within 90 seconds (read deadline) +- [ ] Relay client is properly closed on both `Connect()` and `Publish()` failure paths +- [ ] Server process exits with code 1 on shutdown timeout +- [ ] Dead `Session` type and files removed +- [ ] `go test -race ./...` passes + +### Part 2 +- [ ] Chunk package has benchmarks for header parsing (FMT0, FMT1, FMT3), single-chunk read/write, and multi-chunk read/write +- [ ] AMF package has benchmarks for Object and Array encode/decode +- [ ] AMF package has benchmarks for top-level `EncodeAll`/`DecodeAll` with realistic RTMP command data +- [ ] All benchmarks use `b.ReportAllocs()` +- [ ] `go test -bench . -benchmem ./internal/rtmp/chunk/ ./internal/rtmp/amf/` completes successfully + +### Code Quality (per Definition of Done) +- [ ] `go build ./...` — zero errors +- [ ] `go vet ./...` — zero warnings +- [ ] `gofmt -l .` — no output (all files formatted) +- [ ] No dead code: removed Session type, no unused functions +- [ ] Exported types/functions have godoc comments +- [ ] Error wrapping follows existing patterns (`internal/errors`) diff --git a/tests/integration/chunking_test.go b/tests/integration/chunking_test.go index c9734a3..0edbb5a 100644 --- a/tests/integration/chunking_test.go +++ b/tests/integration/chunking_test.go @@ -156,19 +156,37 @@ func TestChunkingFlow(t *testing.T) { b2 := encodeSingleMessage(multi, 128) // Scenario 3: Interleaved (Audio CSID=4, Video CSID=6) + // Two 256-byte messages (audio + video) interleaved at 128-byte chunk size: + // audio first chunk (FMT0) → video first chunk (FMT0) → + // audio continuation (FMT3) → video continuation (FMT3) interAudioPayload := make([]byte, 256) interVideoPayload := make([]byte, 256) interAudio := &chunk.Message{CSID: 4, Timestamp: 3000, MessageLength: 256, TypeID: 8, MessageStreamID: 1, Payload: interAudioPayload} interVideo := &chunk.Message{CSID: 6, Timestamp: 3000, MessageLength: 256, TypeID: 9, MessageStreamID: 1, Payload: interVideoPayload} - // manually interleave first chunks then second chunks - iaFirst := encodeSingleMessage(&chunk.Message{CSID: interAudio.CSID, Timestamp: interAudio.Timestamp, MessageLength: interAudio.MessageLength, TypeID: interAudio.TypeID, MessageStreamID: interAudio.MessageStreamID, Payload: interAudio.Payload[:128]}, 128) - ivFirst := encodeSingleMessage(&chunk.Message{CSID: interVideo.CSID, Timestamp: interVideo.Timestamp, MessageLength: interVideo.MessageLength, TypeID: interVideo.TypeID, MessageStreamID: interVideo.MessageStreamID, Payload: interVideo.Payload[:128]}, 128) - // continuation halves (simulate by creating messages whose payload is remaining but same headers; encodeSingleMessage will still treat them as new FMT0 so adapt by slicing off headers later) - iaSecondFull := encodeSingleMessage(&chunk.Message{CSID: interAudio.CSID, Timestamp: interAudio.Timestamp, MessageLength: interAudio.MessageLength, TypeID: interAudio.TypeID, MessageStreamID: interAudio.MessageStreamID, Payload: interAudio.Payload[128:]}, 128) - ivSecondFull := encodeSingleMessage(&chunk.Message{CSID: interVideo.CSID, Timestamp: interVideo.Timestamp, MessageLength: interVideo.MessageLength, TypeID: interVideo.TypeID, MessageStreamID: interVideo.MessageStreamID, Payload: interVideo.Payload[128:]}, 128) - // For simplicity we just concatenate: first audio (first chunk only portion), first video, second audio continuation chunk basic header adjusted to FMT=3, second video continuation - // This simplistic approach produces extra FMT0 headers in second parts; the real writer test will refine this once writer implemented. - interleavedBytes := append(append(append(append(iaFirst, ivFirst...), iaSecondFull...), ivSecondFull...), []byte{}...) + + // Build interleaved byte stream manually. + // First chunks use FMT 0 (full header) with MessageLength=256 but only 128 + // bytes of payload in this chunk. encodeSingleMessage with a 256-byte payload + // limited to chunkSize=128 correctly produces FMT0 header + 128 bytes + FMT3 + // continuation. We use the full encoder for each message, then interleave. + audioChunks := encodeSingleMessage(interAudio, 128) + videoChunks := encodeSingleMessage(interVideo, 128) + + // Split each into first chunk and continuation chunk. + // FMT0 basic header (1 byte) + message header (11 bytes) = 12 bytes overhead + 128 payload = 140 bytes for first chunk. + // FMT3 basic header (1 byte) + 128 payload = 129 bytes for continuation chunk. + audioFirstChunk := audioChunks[:140] + audioContChunk := audioChunks[140:] + videoFirstChunk := videoChunks[:140] + videoContChunk := videoChunks[140:] + + // Interleave: audio first → video first → audio cont → video cont + var interleavedBuf bytes.Buffer + interleavedBuf.Write(audioFirstChunk) + interleavedBuf.Write(videoFirstChunk) + interleavedBuf.Write(audioContChunk) + interleavedBuf.Write(videoContChunk) + interleavedBytes := interleavedBuf.Bytes() // Scenario 4: Extended timestamp extPayload := make([]byte, 64) diff --git a/tests/integration/commands_test.go b/tests/integration/commands_test.go index da4e0f8..aa520d5 100644 --- a/tests/integration/commands_test.go +++ b/tests/integration/commands_test.go @@ -1,111 +1,90 @@ // Package integration – end-to-end integration tests for the RTMP server. // -// commands_test.go is a TDD scaffold that specifies the full RTMP command -// exchange sequence: +// commands_test.go validates the full RTMP command exchange sequence through +// a real server and client: // // 1. connect → _result (NetConnection.Connect.Success) -// 2. createStream → _result with stream ID 1 -// 3. publish → onStatus (NetStream.Publish.Start) -// 4. play → onStatus (NetStream.Play.Start) +// 2. createStream → _result with stream ID +// 3. publish → onStatus (NetStream.Publish.Start) +// 4. play → subscriber connected // -// Each sub-flow is a separate subtest so failures are independent. -// At present every subtest deliberately calls t.Fatal because the real -// RPC / command layers are not yet wired up. As those layers land, -// the placeholders will be replaced with actual AMF0 command encoding, -// chunk writing, and response verification over net.Pipe connections. -// -// Key Go patterns demonstrated: -// - net.Pipe() – in-memory connection for protocol exchanges. -// - t.Fatal() – intentional TDD failure to drive implementation. -// - Deferred Close – immediate cleanup of both pipe ends. +// The test starts a real server on an ephemeral port, uses the RTMP client +// to perform the full handshake + command sequence, and verifies that +// publish and play complete without errors. package integration import ( - "net" + "fmt" "testing" - // Future imports (will be used once implementations exist): - // "github.com/alxayo/go-rtmp/internal/rtmp/handshake" - // "github.com/alxayo/go-rtmp/internal/rtmp/chunk" - // "github.com/alxayo/go-rtmp/internal/rtmp/rpc" + "time" + + "github.com/alxayo/go-rtmp/internal/rtmp/client" + srv "github.com/alxayo/go-rtmp/internal/rtmp/server" ) -// TestCommandsFlow is the integration test scaffold for T011. -// It defines the end-to-end command workflow expectations: -// 1. connect -> server replies _result (NetConnection.Connect.Success) -// 2. createStream -> server replies _result with stream ID (e.g., 1) -// 3. publish -> server sends onStatus NetStream.Publish.Start -// 4. play -> server sends onStatus NetStream.Play.Start -// -// Implementation Notes (to be satisfied by later tasks T032-T040): -// - Handshake already covered by T009; this test begins AFTER a successful handshake. -// - AMF0 generic encoder/decoder (T032) will provide helpers to build/parse command payloads. -// - Command dispatcher (T040) will route messages based on first AMF0 string in payload. -// - Stream ID allocation expected to start at 1. -// - onStatus messages must include level="status", code matching the scenario, and description. -// -// Current State: -// - No RPC/command implementation yet; this test intentionally fails (TDD) via t.Fatal placeholders. -// - When implementing, replace the placeholders with real client/server harness using chunk.Reader/Writer -// to exchange AMF0 command messages over an in-memory net.Pipe(). -// -// TestCommandsFlow encodes the expected RTMP command sequence as a -// failing specification (TDD). Four subtests map 1:1 to the protocol -// command flow: connect → createStream → publish → play. -// -// Each currently fails with a descriptive message naming the blocking -// tasks. When the AMF0, RPC, and dispatcher layers are complete, -// replace the t.Fatal placeholders with real handshake + chunk I/O. +// TestCommandsFlow exercises the full command exchange through a live server. +// It verifies connect → createStream → publish (and play) work end-to-end. func TestCommandsFlow(t *testing.T) { - // Use subtests so individual flows can be debugged independently. - - // 1. connect flow - // Expected message sequence (logical): - // C->S: command(connect, tx=1, obj{app, tcUrl, objectEncoding=0}) - // S->C: command(_result, tx=1, properties{fmsVer,capabilities,mode}, info{code=NetConnection.Connect.Success}) - // Future assertions: verify properties + info fields and AMF0 types. - // Failure driver for now: - serverConn1, clientConn1 := net.Pipe() - _ = serverConn1.Close() - _ = clientConn1.Close() - if true { // placeholder branch; remove once implemented - // NOTE: This forces the test to fail until connect handling is implemented. - // Replace with real harness invoking handshake + command exchange. - // Keep failure message descriptive to guide implementation. - t.Fatal("connect flow not implemented (awaiting AMF0 + RPC layers T026-T040)") + s := srv.New(srv.Config{ + ListenAddr: "127.0.0.1:0", + ChunkSize: 4096, + }) + if err := s.Start(); err != nil { + t.Fatalf("server start: %v", err) } + defer s.Stop() - // 2. createStream flow - // Sequence (after successful connect): - // C->S: command(createStream, tx=4, null) - // S->C: command(_result, tx=4, null, 1.0) // stream ID 1 - // Placeholder failure: - serverConn2, clientConn2 := net.Pipe() - _ = serverConn2.Close() - _ = clientConn2.Close() - if true { - t.Fatal("createStream flow not implemented (awaiting RPC dispatcher & response builder T033-T036)") - } + addr := s.Addr().String() - // 3. publish flow - // Prerequisites: stream ID allocated. - // Sequence: - // C->S: command(publish, tx=0, null, "streamKey", "live") on MSID=1 - // S->C: command(onStatus, 0, null, {code: NetStream.Publish.Start}) on MSID=1 - serverConn3, clientConn3 := net.Pipe() - _ = serverConn3.Close() - _ = clientConn3.Close() - if true { - t.Fatal("publish flow not implemented (awaiting publish parser and onStatus builder T037-T039)") - } + t.Run("connect_createStream_publish", func(t *testing.T) { + c, err := client.New(fmt.Sprintf("rtmp://%s/live/test_publish", addr)) + if err != nil { + t.Fatalf("client new: %v", err) + } + defer c.Close() - // 4. play flow - // Sequence: - // C->S: command(play, tx=0, null, "streamKey", -2, -1, true) on MSID=1 - // S->C: command(onStatus, 0, null, {code: NetStream.Play.Start}) on MSID=1 - serverConn4, clientConn4 := net.Pipe() - _ = serverConn4.Close() - _ = clientConn4.Close() - if true { - t.Fatal("play flow not implemented (awaiting play parser and onStatus builder T038-T039)") - } + // Connect performs: TCP dial → handshake → connect command → createStream + if err := c.Connect(); err != nil { + t.Fatalf("connect: %v", err) + } + + // Publish sends the publish command and expects onStatus success + if err := c.Publish(); err != nil { + t.Fatalf("publish: %v", err) + } + + // Verify server registered the stream + time.Sleep(50 * time.Millisecond) + if s.ConnectionCount() < 1 { + t.Fatalf("expected at least 1 connection, got %d", s.ConnectionCount()) + } + }) + + t.Run("connect_createStream_play", func(t *testing.T) { + // First, set up a publisher so play has something to subscribe to + pub, err := client.New(fmt.Sprintf("rtmp://%s/live/test_play", addr)) + if err != nil { + t.Fatalf("publisher new: %v", err) + } + defer pub.Close() + if err := pub.Connect(); err != nil { + t.Fatalf("publisher connect: %v", err) + } + if err := pub.Publish(); err != nil { + t.Fatalf("publisher publish: %v", err) + } + + // Now connect a subscriber + sub, err := client.New(fmt.Sprintf("rtmp://%s/live/test_play", addr)) + if err != nil { + t.Fatalf("subscriber new: %v", err) + } + defer sub.Close() + if err := sub.Connect(); err != nil { + t.Fatalf("subscriber connect: %v", err) + } + if err := sub.Play(); err != nil { + t.Fatalf("subscriber play: %v", err) + } + }) }