Conversation
…n type Add connection lifecycle cleanup mechanism and zombie connection detection: - Add onDisconnect callback field and SetDisconnectHandler() method - Modify startReadLoop() with defer chain: cancel context then invoke disconnect handler, ensuring cleanup runs on any exit (EOF, error, cancel) - Add read deadline (90s) before each ReadMessage() to detect dead peers - Add write deadline (30s) before each WriteMessage() in writeLoop - Handle net.Error timeout in readLoop for clean zombie reaping - Remove dead session *Session field (never initialized in production) - Delete session.go and session_test.go (entirely unused dead code) The disconnect handler fires exactly once when readLoop exits, regardless of exit reason. context.CancelFunc is idempotent so calling cancel() in both the defer chain and Close() is safe. wg.Done() runs last (outermost defer) so Close() -> wg.Wait() still works correctly. Resolves issues #5 (no disconnect cascade), #7 (no TCP deadlines), #10 (dead Session code) from spec 005.
…dlock Wire the disconnect handler in attachCommandHandling to perform full cleanup when a connection's readLoop exits: - Stop MediaLogger (prevents goroutine + ticker leak per connection) - Close FLV recorder under stream.mu lock (safe with cleanupAllRecorders) - Call PublisherDisconnected() to clear publisher reference (fixes stream key lockout where stale publishers block reuse via ErrPublisherExists) - Call SubscriberDisconnected() to remove dead subscribers - Call RemoveConnection() to remove from s.conns (fixes memory leak where connections accumulated forever, only cleared on server shutdown) - Fire publish_stop/play_stop/connection_close hook events Add role field to commandState (set to 'publisher'/'subscriber' by OnPublish/OnPlay handlers) to determine cleanup path. Add RemoveConnection(id) method to Server for single-connection removal. Fix deadlock in Stop(): previously held s.mu.Lock() while calling c.Close() -> wg.Wait(), but the disconnect handler's RemoveConnection() also acquires s.mu.Lock(). Fix: collect connections, clear map, release lock, then close connections outside the lock. Resolves issues #1 (conn memory leak), #2 (stale publishers), #3 (stale subscribers), #4 (MediaLogger leak), #6 (unclosed recorders) from spec 005.
…meout Fix two resource leaks in Destination.Connect(): - Close client on Connect() failure (factory may allocate TCP resources) - Close client on Publish() failure (connection established but unusable) Previously both error paths set status to error and returned without closing the client, leaking TCP connections to relay destinations. Fix main.go shutdown: add os.Exit(1) after shutdown timeout. Previously the timeout handler only logged an error and fell through to end of main(), potentially leaving the process in an undefined state. Resolves issues #8 (relay client leak) and #9 (no forced exit) from spec 005.
…nchmarks Add 3 disconnect handler unit tests (conn_test.go): - TestDisconnectHandler_FiresOnEOF: handler fires when client disconnects - TestDisconnectHandler_FiresOnContextCancel: handler fires on Close() - TestDisconnectHandler_NilSafe: no panic when handler is nil Add 9 chunk benchmarks covering the hottest code path in the server: - ParseChunkHeader: FMT0 (full), FMT1 (delta), FMT3 (minimal) - Reader.ReadMessage: single-chunk (100B) and multi-chunk (4096B/128B) - EncodeChunkHeader: FMT0 serialization - Writer.WriteMessage: single-chunk and multi-chunk - Writer+Reader round-trip: end-to-end write-then-read cycle Add 6 AMF benchmarks for complex types (8 primitive benchmarks existed): - EncodeObject/DecodeObject: realistic RTMP connect-style object - EncodeStrictArray/DecodeStrictArray: mixed-type array - EncodeAll/DecodeAll: full connect command (string + number + object) All benchmarks use b.ReportAllocs() for allocation tracking. Total new benchmark count: 15 (9 chunk + 6 AMF), bringing totals to 9 chunk + 14 AMF.
Validate all API names, signatures, and line numbers against actual source. Correct multiple issues found during code review: - Fix wrong API names: reg.Get() -> reg.GetStream(), method calls -> package-level function calls for PublisherDisconnected/SubscriberDisconnected - Add mutex lock requirement for stream.Recorder access - Document additional relay leak on client.Connect() failure - Remove unnecessary net.Conn type assertions (already has deadline methods) - Add net.Error timeout handling for read deadline errors - Expand dead code scope to include session.go + session_test.go deletion - Document shutdown race safety (disconnect handler vs cleanupAllRecorders) - Add concrete benchmark implementation code samples - Add unit test patterns with code for disconnect handler tests - Add Code Verification Summary table with verified signatures - Add Verification Commands section aligned with Definition of Done - Update File Change Summary with action column (Edit/Delete)
…event
Enrich existing hook events with useful data that was available but
previously discarded:
- publish_stop: now includes audio_packets, video_packets, total_bytes,
audio_codec, video_codec, and duration_sec from MediaLogger stats.
Enables post-stream analytics and billing calculations.
- play_stop: now includes duration_sec (session duration). Enables
viewer watch-time tracking.
- connection_close: now includes role (publisher/subscriber/empty),
stream_key, and duration_sec. Previously passed nil data and empty
stream_key, losing all context about what the connection was doing.
Add new subscriber_count event (EventSubscriberCount) for real-time
viewer tracking:
- Fired after a subscriber joins (play_start) with current count
- Fired after a subscriber leaves (play_stop) with updated count
- Data payload: {count: <int>} with conn_id and stream_key context
- Enables live dashboards and viewer count APIs
Add AcceptedAt() accessor to Connection for session duration calculation.
…adic srv parameter
There was a problem hiding this comment.
Pull request overview
This PR introduces enhanced error handling, graceful connection cleanup, and performance benchmarks for the RTMP server (Feature 005). It addresses a cluster of pre-existing bugs — memory/goroutine leaks, zombie connections, stale publisher references, and relay client leaks — by wiring a disconnect callback into the connection lifecycle and enforcing TCP read/write deadlines. It also adds comprehensive benchmarks for the two hottest code paths (chunk I/O and AMF0 codec) and removes several pieces of dead code.
Changes:
- Error handling & cleanup: Adds
onDisconnectcallback toConnection, installs it inattachCommandHandlingto properly stopMediaLogger, unregister publisher/subscriber, close FLV recorders, remove connections from tracking map, and fire lifecycle hooks on disconnect. RefactorsStop()to avoid deadlock with the new disconnect handler. - TCP deadline enforcement: Adds
readTimeout(90s) andwriteTimeout(30s) constants and sets them before each I/O operation to detect zombie connections. - Performance benchmarks: Adds 5 chunk-read, 4 chunk-write, 2 AMF0 object, 2 AMF0 array, and 2 top-level AMF0 benchmarks (all with
b.ReportAllocs()).
Reviewed changes
Copilot reviewed 26 out of 26 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
internal/rtmp/conn/conn.go |
Adds onDisconnect field, SetDisconnectHandler(), AcceptedAt(), TCP deadlines in readLoop/writeLoop, removes dead session field |
internal/rtmp/conn/session.go |
Deleted — dead code removal |
internal/rtmp/conn/session_test.go |
Deleted — tests for dead Session type |
internal/rtmp/conn/conn_test.go |
Adds 3 disconnect handler tests |
internal/rtmp/server/server.go |
Adds RemoveConnection(), refactors Stop() to avoid deadlock |
internal/rtmp/server/command_integration.go |
Installs disconnect handler, adds role field to commandState, changes attachCommandHandling to non-variadic *Server |
internal/rtmp/server/hooks/events.go |
Adds EventSubscriberCount event type |
internal/rtmp/server/registry_test.go |
Adds codec caching, subscriber removal, and broadcast tests |
internal/rtmp/server/auth/auth_test.go |
Refactors to named subtests |
internal/rtmp/relay/destination.go |
Fixes client resource leak on Connect() and Publish() error paths |
internal/rtmp/client/client.go |
Removes dead RunCLI function |
internal/rtmp/amf/amf.go |
Removes dead Marshal/Unmarshal wrapper functions |
internal/rtmp/amf/amf_test.go |
Migrates tests from removed wrappers to EncodeAll/DecodeAll; adds benchmarks |
internal/rtmp/amf/object_test.go |
Adds 2 object encode/decode benchmarks |
internal/rtmp/amf/array_test.go |
Adds 2 strict array encode/decode benchmarks |
internal/rtmp/chunk/reader_test.go |
Removes excessive dead comments; adds 5 read benchmarks |
internal/rtmp/chunk/writer_test.go |
Refactors to subtests; adds 4 write benchmarks |
cmd/rtmp-server/main.go |
Adds os.Exit(1) on shutdown timeout |
specs/005-error-handling-benchmarks/spec.md |
New feature specification |
docs/design.md |
Documents TCP deadlines, graceful shutdown, and expanded hook events |
docs/architecture.md |
Documents TCP deadline enforcement in connection context |
docs/getting-started.md |
Adds troubleshooting entry for 90s read deadline and new Connection Management section |
docs/implementation.md |
Updates event hook integration points |
README.md |
Adds connection cleanup feature, moves items to "Recently Completed", adds CHANGELOG link |
CHANGELOG.md |
New file documenting all release history |
.github/copilot-instructions.md |
Updates concurrency pattern description with deadlines and disconnect handlers |
You can also share your feedback on Copilot code review. Take the survey.
| outboundQueue chan *chunk.Message | ||
| session *Session | ||
|
|
||
| // Internal helpers | ||
| onMessage func(*chunk.Message) // test hook / dispatcher injection | ||
| onMessage func(*chunk.Message) // test hook / dispatcher injection | ||
| onDisconnect func() // called once when readLoop exits (cleanup cascade) |
There was a problem hiding this comment.
When the session *Session field is removed and session.go is deleted, the package-level documentation in internal/rtmp/conn/doc.go (lines 25–28) still references the deleted Session type: "Per-connection metadata ... is tracked by the [Session] type in session.go." This stale reference should be removed from doc.go as part of the Session type cleanup.
This pull request introduces several improvements focused on robust connection management, error handling, and performance benchmarking for the RTMP server. The most important changes include enforcing TCP read/write deadlines to detect and clean up zombie connections, adding disconnect handlers for proper resource cleanup, expanding lifecycle event hooks, and introducing new performance benchmarks and tests. Documentation and onboarding guides have also been updated to reflect these changes and provide clearer guidance.