Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ ffplay rtmp://localhost:1935/live/test # Subsc
All multi-byte integers are **big-endian** except MSID in chunk headers (little-endian quirk). Use `encoding/binary.BigEndian` consistently and verify against golden vectors in `tests/golden/*.bin`.

### Concurrency Pattern
Each connection runs **one readLoop goroutine** with context cancellation. Use bounded channels for backpressure:
Each connection runs **one readLoop goroutine** with context cancellation and TCP deadline enforcement (read 90s, write 30s for zombie detection). Use bounded channels for backpressure:
```go
outboundQueue := make(chan *chunk.Message, 100) // Bounded queue
```
Protect shared state (stream registry) with `sync.RWMutex`.
Protect shared state (stream registry) with `sync.RWMutex`. On disconnect, a handler fires to clean up publisher/subscriber registrations and close relay clients.

### Error Handling
Use domain-specific error wrappers from `internal/errors`:
Expand Down Expand Up @@ -80,6 +80,7 @@ tests := []struct{ name, file string; want interface{} }{
| Control | CSID=2, MSID=0, types 1-6 |
| AMF0 | Object ends with 0x00 0x00 0x09 |
| Media | TypeID 8=audio, 9=video; cache sequence headers for late-join |
| Deadlines | Read 90s, Write 30s; reset on each I/O (zombie detection) |

## Key Files for Onboarding

Expand Down
144 changes: 144 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# Changelog

All notable changes to go-rtmp are documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased] — feature/005-error-handling-benchmarks

### Added
- **Disconnect handlers**: Each connection fires a cleanup callback when the read loop exits, ensuring publisher/subscriber registrations are removed and relay clients are closed ([`524281f`](https://github.com/alxayo/rtmp-go/commit/524281f))
- **TCP deadline enforcement**: Read deadline (90s) and write deadline (30s) detect zombie connections and prevent resource leaks ([`524281f`](https://github.com/alxayo/rtmp-go/commit/524281f))
- **Lifecycle hook events**: `EventConnectionClose`, `EventPublishStop`, `EventPlayStop`, and `EventSubscriberCount` fire on disconnect with session metadata (duration, packet counts, codecs) ([`2ed5fd2`](https://github.com/alxayo/rtmp-go/commit/2ed5fd2))
- **Performance benchmarks**: Chunk header parsing, AMF0 number/string/object encode/decode, and strict array benchmarks ([`34058ee`](https://github.com/alxayo/rtmp-go/commit/34058ee))
- **Registry tests**: Codec caching, subscriber removal, `BroadcastMessage` relay, and sequence header caching tests ([`fc4d3c7`](https://github.com/alxayo/rtmp-go/commit/fc4d3c7))
- **Spec 005**: Error handling, connection cleanup & performance benchmarks specification ([`6274f77`](https://github.com/alxayo/rtmp-go/commit/6274f77))

### Fixed
- **Relay client leak**: Relay client connections are now properly closed when publisher disconnects ([`69365fe`](https://github.com/alxayo/rtmp-go/commit/69365fe))
- **Server shutdown deadlock**: Server no longer hangs during shutdown when connections are active; force exit after timeout ([`92415d0`](https://github.com/alxayo/rtmp-go/commit/92415d0), [`69365fe`](https://github.com/alxayo/rtmp-go/commit/69365fe))

### Changed
- **Simplified `attachCommandHandling`**: Replaced variadic `srv ...*Server` parameter with direct `*Server`, removing 7 redundant nil-checks ([`919e2a9`](https://github.com/alxayo/rtmp-go/commit/919e2a9))

### Removed
- **Dead `Session` type**: Unused `Session` and `SessionState` types removed from `conn` package ([`524281f`](https://github.com/alxayo/rtmp-go/commit/524281f))
- **Dead `RunCLI` function**: Speculative future code removed from `client` package ([`919e2a9`](https://github.com/alxayo/rtmp-go/commit/919e2a9))
- **Dead `Marshal`/`Unmarshal` wrappers**: Test-only exported functions removed from `amf` package ([`919e2a9`](https://github.com/alxayo/rtmp-go/commit/919e2a9))

---

## [v0.1.1] — 2026-03-03

### Added
- **Token-based authentication** ([PR #4](https://github.com/alxayo/rtmp-go/pull/4)): Pluggable `auth.Validator` interface with four backends:
- `TokenValidator`: In-memory map of streamKey → token pairs (CLI flag `-auth-token`)
- `FileValidator`: JSON token file with live reload via SIGHUP (`-auth-file`)
- `CallbackValidator`: External HTTP webhook for auth decisions (`-auth-callback`)
- `AllowAllValidator`: Default mode, accepts all requests (`-auth-mode=none`)
- Authentication CLI flags: `-auth-mode`, `-auth-token`, `-auth-file`, `-auth-callback`, `-auth-callback-timeout` ([`f32a74a`](https://github.com/alxayo/rtmp-go/commit/f32a74a))
- URL query parameter parsing for stream names: clients pass tokens via `streamName?token=secret` ([`f32a74a`](https://github.com/alxayo/rtmp-go/commit/f32a74a))
- `EventAuthFailed` hook event when authentication is rejected ([`f32a74a`](https://github.com/alxayo/rtmp-go/commit/f32a74a))
- Auth spec document in `specs/004-token-auth/` ([`7c1fa0f`](https://github.com/alxayo/rtmp-go/commit/7c1fa0f))
- Definition of Done checklist (`docs/definition-of-done.md`) and post-feature review prompt ([`6b3e096`](https://github.com/alxayo/rtmp-go/commit/6b3e096))

### Changed
- Query parameters are stripped from stream keys before registry operations (e.g., `live/stream?token=x` → `live/stream`) ([`f32a74a`](https://github.com/alxayo/rtmp-go/commit/f32a74a))

### Fixed
- Escaped quotes in Markdown code blocks across documentation ([`bef626b`](https://github.com/alxayo/rtmp-go/commit/bef626b))
- Broken link in copilot-instructions.md ([`f92d34d`](https://github.com/alxayo/rtmp-go/commit/f92d34d))

---

## [v0.1.0] — 2025-10-18

First feature-complete release of the RTMP server. Supports end-to-end streaming from OBS/FFmpeg to subscribers with recording and relay capabilities.

### Added

#### Core RTMP Protocol
- **RTMP v3 handshake**: C0/C1/C2 ↔ S0/S1/S2 exchange with 5-second timeouts and domain-specific error types
- **Chunk streaming**: FMT 0–3 header compression, extended timestamps (≥0xFFFFFF), chunk size negotiation
- **Control messages**: Set Chunk Size, Window Acknowledgement Size, Set Peer Bandwidth, User Control (types 1–6)
- **AMF0 codec**: Number, Boolean, String, Null, Object, and Strict Array encode/decode with golden binary vector tests

#### Command Flow
- **Command dispatcher**: Routes `connect`, `createStream`, `publish`, and `play` commands
- **Connect**: Parses application name, responds with `_result` (NetConnection.Connect.Success)
- **CreateStream**: Allocates stream IDs, responds with `_result`
- **Publish/Play**: Registers publishers and subscribers in stream registry with `onStatus` responses

#### Media & Recording
- **Live relay**: Transparent forwarding from publishers to all subscribers
- **Sequence header caching**: H.264 SPS/PPS and AAC AudioSpecificConfig cached for late-joining subscribers
- **Codec detection**: Identifies audio (AAC, MP3, Speex) and video (H.264, H.265) from first media packets
- **FLV recording**: Automatic recording of all streams to FLV files (`-record-all`, `-record-dir` flags)
- **Media logging**: Per-connection bitrate stats and codec identification

#### Multi-Destination Relay
- **Relay forwarding**: Forward publisher streams to external RTMP servers (`-relay-to` flag)
- **Destination manager**: Connect, monitor, and send media to multiple downstream targets
- **Metrics tracking**: Per-destination message counts, bytes sent, and error tracking

#### Event Hooks
- **Webhook hook**: HTTP POST with JSON event payload to configured URLs
- **Shell hook**: Execute scripts with event data as environment variables
- **Stdio hook**: Print structured event data to stderr (JSON or env-var format)
- **Hook manager**: Bounded concurrency pool (default 10 workers) with configurable timeout

#### Server Infrastructure
- **TCP listener**: Accept loop with graceful shutdown support
- **Connection lifecycle**: Handshake → control burst → command exchange → media streaming
- **Stream registry**: Thread-safe map of stream keys to publisher/subscriber lists
- **Structured logging**: `log/slog` with configurable levels (debug/info/warn/error)
- **Buffer pool**: Memory-efficient buffer reuse (`internal/bufpool`)
- **Domain errors**: Typed error wrappers (`HandshakeError`, `ChunkError`, `AMFError`, `ProtocolError`, `TimeoutError`)

#### Testing & Tooling
- **Golden binary vectors**: Exact wire-format `.bin` files for handshake, chunk headers, AMF0, and control messages
- **Integration tests**: Full publish → subscribe round-trip tests
- **RTMP test client**: Minimal client for driving integration tests (`internal/rtmp/client`)
- **CI workflow**: Automated testing with `go build`, `go vet`, `gofmt`, and `go test`
- **Stream analysis tools**: H.264 frame analyzer, RTMP stream extractor, HLS converter

#### CLI
- `-listen` — TCP address (default `:1935`)
- `-log-level` — debug/info/warn/error (default `info`)
- `-record-all` — Enable automatic FLV recording
- `-record-dir` — Recording directory (default `recordings`)
- `-chunk-size` — Outbound chunk size, 1–65536 (default 4096)
- `-relay-to` — RTMP relay destination URL (repeatable)
- `-hook-script` — Shell hook: `event_type=/path/to/script` (repeatable)
- `-hook-webhook` — Webhook: `event_type=https://url` (repeatable)
- `-hook-stdio-format` — Stdio output format: `json` or `env`
- `-hook-timeout` — Hook execution timeout (default 30s)
- `-hook-concurrency` — Max concurrent hook executions (default 10)
- `-version` — Print version and exit

---

## Pull Requests

| PR | Title | Branch | Status |
|----|-------|--------|--------|
| [#4](https://github.com/alxayo/rtmp-go/pull/4) | Token-based authentication | `feature/004-token-auth` | Merged |
| [#3](https://github.com/alxayo/rtmp-go/pull/3) | Set initial semantic version to v0.1.0 | `copilot/determine-semantic-version` | Merged |
| [#2](https://github.com/alxayo/rtmp-go/pull/2) | Fix server connection tracking tests | `copilot/fix-github-actions-workflow-again` | Merged |
| [#1](https://github.com/alxayo/rtmp-go/pull/1) | Fix gofmt formatting violations failing CI | `copilot/fix-github-actions-workflow` | Merged |

---

## Feature Branches

| Branch | Spec | Description |
|--------|------|-------------|
| `feature/005-error-handling-benchmarks` | [specs/005](specs/005-error-handling-benchmarks/spec.md) | Error handling, connection cleanup, TCP deadlines, performance benchmarks |
| `feature/004-token-auth` | [specs/004](specs/004-token-auth/spec.md) | Token-based stream key authentication with 4 validator backends |
| `003-multi-destination-relay` | [specs/003](specs/003-multi-destination-relay/) | Multi-destination relay to external RTMP servers |
| `T001-init-go-module` | [specs/001](specs/001-rtmp-server-implementation/spec.md) | Core RTMP server implementation (handshake through media streaming) |

[Unreleased]: https://github.com/alxayo/rtmp-go/compare/v0.1.1...feature/005-error-handling-benchmarks
[v0.1.1]: https://github.com/alxayo/rtmp-go/compare/v0.1.0...v0.1.1
[v0.1.0]: https://github.com/alxayo/rtmp-go/releases/tag/v0.1.0
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ See [docs/getting-started.md](docs/getting-started.md) for the full guide with C
| **Media Logging** | Per-connection codec detection and bitrate stats |
| **Event Hooks** | Webhooks, shell scripts, and stdio notifications on RTMP events |
| **Authentication** | Pluggable token-based validation for publish/play (static tokens, file, webhook) |
| **Connection Cleanup** | TCP deadline enforcement (read 90s, write 30s), disconnect handlers, zombie detection |

## Architecture

Expand Down Expand Up @@ -126,9 +127,11 @@ Integration tests in `tests/integration/` exercise the full publish → subscrib

## Roadmap

### Recently Completed
- Enhanced error handling: disconnect handlers, TCP deadline enforcement (read 90s, write 30s), relay client cleanup
- Performance benchmarks for chunk parsing, AMF0 encoding, and array operations

### In Progress
- Enhanced error handling and graceful connection cleanup
- Performance benchmarks for chunk and AMF0 encode/decode
- Fuzz testing for AMF0 and chunk parsing (bounds safety)

### Planned
Expand All @@ -139,6 +142,10 @@ Integration tests in `tests/integration/` exercise the full publish → subscrib
- **Transcoding** — server-side codec conversion (e.g. H.265 → H.264)
- **Clustering** — horizontal scaling across multiple server instances

## Changelog

See [CHANGELOG.md](CHANGELOG.md) for a detailed history of all releases and changes.

## License

See [LICENSE](LICENSE) file.
1 change: 1 addition & 0 deletions cmd/rtmp-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func main() {
log.Info("server stopped cleanly")
case <-shutdownCtx.Done():
log.Error("forced exit after timeout")
os.Exit(1)
}
}

Expand Down
4 changes: 4 additions & 0 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ Immediately after handshake, the server sends three control messages:
- **Window Acknowledgement Size** (2,500,000 bytes): Flow control threshold.
- **Set Peer Bandwidth** (2,500,000 bytes): Output rate limit.

TCP deadlines are enforced on the underlying connection:
- **Read deadline**: 90 seconds — detects frozen publishers and stuck subscribers
- **Write deadline**: 30 seconds — prevents slow-loris attacks and half-open connections

### 4. Command Exchange (`internal/rtmp/rpc`)
The client and server exchange AMF0-encoded command messages:

Expand Down
27 changes: 26 additions & 1 deletion docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,16 @@ The first audio and video messages from a publisher typically contain "sequence

### Event Hooks

The server includes an event hook system that notifies external systems when important events occur (connection accept/close, publish start, play start, codec detected). Three hook types are supported:
The server includes an event hook system that notifies external systems when lifecycle events occur. Available events:

- **connection_accept** / **connection_close**: Client connects or disconnects
- **publish_start** / **publish_stop**: Publisher begins or stops streaming
- **play_start** / **play_stop**: Subscriber starts or stops playback
- **codec_detected**: Audio/video codec identified from first media packet
- **subscriber_count**: Updated subscriber count when subscribers join or leave
- **auth_failed**: Authentication rejected for publish or play

Three hook types are supported:

- **Webhook**: HTTP POST with JSON event payload to a URL
- **Shell**: Execute a script with event data as environment variables
Expand Down Expand Up @@ -85,6 +94,22 @@ The default mode is `none` (accept all requests), preserving backward compatibil
| Media logger counters | `sync.RWMutex` | Updated by read loop, read by stats ticker |
| Hook execution pool | Buffered channel (semaphore) | Limits concurrent hook goroutines |

### TCP Deadline Enforcement

Each connection enforces TCP read/write deadlines to detect zombie connections:
- **Read deadline**: 90 seconds — closes connections from frozen or stalled publishers
- **Write deadline**: 30 seconds — drops connections to unresponsive subscribers

Deadlines are reset on each successful I/O operation, so normal streaming is unaffected. This prevents resource leaks (file descriptors, goroutines) from clients that hang without properly closing sockets.

### Graceful Shutdown

On shutdown signal (SIGINT/SIGTERM):
1. Server stops accepting new connections
2. Existing connections receive context cancellation
3. Relay client connections are closed to prevent dangling forwarding
4. If connections don't close within the timeout, the process exits forcefully

## Error Handling

Errors are classified by protocol layer using typed error wrappers:
Expand Down
9 changes: 9 additions & 0 deletions docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,15 @@ go test ./tests/integration/
| "stream not found" in play | Wrong stream key | Ensure publisher and subscriber use the same `app/streamName` |
| High CPU usage | Debug logging | Use `-log-level info` instead of `debug` |
| Recording file empty | Publisher disconnected before keyframe | Stream for at least a few seconds |
| Connection dropped after ~90s | TCP read deadline | The server closes idle connections after 90 seconds of silence — ensure the publisher is actively streaming |

## Connection Management

The server automatically manages zombie connections:
- **Read deadline (90s)**: If no data arrives from a client within 90 seconds, the connection is closed
- **Write deadline (30s)**: If the server cannot write to a client within 30 seconds, the connection is dropped

These deadlines reset on each successful I/O operation, so active streams are unaffected.

## Next Steps

Expand Down
8 changes: 5 additions & 3 deletions docs/implementation.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,12 @@ Each tag header contains: TypeID (8=audio, 9=video), data size (24-bit), timesta

## Event Hooks

The hook system (`internal/rtmp/server/hooks/`) notifies external systems when RTMP events occur. It integrates at two points:
The hook system (`internal/rtmp/server/hooks/`) notifies external systems when RTMP events occur. It integrates at multiple points:

1. **Server accept loop** (`server.go`): Triggers `connection_accept` and `connection_close` events
2. **Command handlers** (`command_integration.go`): Triggers `publish_start` and `play_start` events
1. **Server accept loop** (`server.go`): Triggers `connection_accept` on new connections
2. **Disconnect handlers** (`command_integration.go`): Triggers `connection_close`, `publish_stop`, `play_stop`, and `subscriber_count` on disconnect
3. **Command handlers** (`command_integration.go`): Triggers `publish_start`, `play_start`, `subscriber_count`, and `auth_failed`
4. **Media dispatch** (`media_dispatch.go`): Triggers `codec_detected` on first media packet

Each hook runs asynchronously in a bounded goroutine pool (default 10 workers). The `HookManager` maps event types to registered hooks and dispatches via `TriggerEvent()`.

Expand Down
8 changes: 0 additions & 8 deletions internal/rtmp/amf/amf.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,6 @@ func DecodeAll(data []byte) ([]interface{}, error) {
return out, nil
}

// Marshal encodes a single AMF0 value and returns the bytes.
func Marshal(v interface{}) ([]byte, error) { return EncodeAll(v) }

// Unmarshal decodes a single AMF0 value from data.
func Unmarshal(data []byte) (interface{}, error) {
return DecodeValue(bytes.NewReader(data))
}

// unsupportedMarker returns true if the marker is explicitly listed by task
// spec to be rejected (Undefined, Reference, AMF3+ reserved range).
func unsupportedMarker(m byte) bool {
Expand Down
Loading
Loading