Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions internal/rtmp/chunk/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ const (
// (CSID) can omit fields that haven't changed. The FMT field controls which
// fields are present in the wire format:
//
// FMT 0: All fields present (Timestamp is absolute)
// FMT 1: Timestamp delta, MessageLength, MessageTypeID (MessageStreamID inherited)
// FMT 2: Timestamp delta only (all other fields inherited)
// FMT 3: No fields (everything inherited — used for continuation chunks)
// FMT 0: All fields present (Timestamp is absolute)
// FMT 1: Timestamp delta, MessageLength, MessageTypeID (MessageStreamID inherited)
// FMT 2: Timestamp delta only (all other fields inherited)
// FMT 3: No fields (everything inherited — used for continuation chunks)
//
// IsDelta indicates whether Timestamp holds a delta (FMT 1/2) or absolute value (FMT 0).
type ChunkHeader struct {
Expand Down
10 changes: 5 additions & 5 deletions internal/rtmp/chunk/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ import (
// It maintains per-stream state to handle header compression and multi-chunk reassembly.
// Not safe for concurrent use; designed for a single read-loop goroutine per connection.
type Reader struct {
br io.Reader // underlying byte stream (typically a TCP connection)
chunkSize uint32 // maximum payload bytes per chunk (default 128, server may increase)
states map[uint32]*ChunkStreamState // per-CSID assembly state (tracks partial messages)
prevHeader map[uint32]*ChunkHeader // last header per CSID (for FMT 1/2/3 field inheritance)
scratch []byte // reusable buffer for reading chunk payloads
br io.Reader // underlying byte stream (typically a TCP connection)
chunkSize uint32 // maximum payload bytes per chunk (default 128, server may increase)
states map[uint32]*ChunkStreamState // per-CSID assembly state (tracks partial messages)
prevHeader map[uint32]*ChunkHeader // last header per CSID (for FMT 1/2/3 field inheritance)
scratch []byte // reusable buffer for reading chunk payloads
}

// NewReader creates a new dechunker with the provided initial inbound chunk size (spec default 128).
Expand Down
24 changes: 12 additions & 12 deletions internal/rtmp/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,18 @@ const (
// It handles the full connection lifecycle: TCP dial → handshake → connect
// command → createStream → publish/play → send audio/video.
type Client struct {
conn net.Conn // underlying TCP connection
writer *chunk.Writer // encodes outbound messages into chunks
reader *chunk.Reader // decodes inbound chunks into messages
url *url.URL // parsed RTMP URL
log *slog.Logger // structured logger with "rtmp_client" component tag

app string // application name extracted from URL path (e.g. "live")
streamKey string // full stream key: "app/streamName" (e.g. "live/mystream")
streamID uint32 // message stream ID assigned by server's createStream response

trxMu sync.Mutex // protects trxID from concurrent access
trxID float64 // incrementing transaction ID for request-response matching
conn net.Conn // underlying TCP connection
writer *chunk.Writer // encodes outbound messages into chunks
reader *chunk.Reader // decodes inbound chunks into messages
url *url.URL // parsed RTMP URL
log *slog.Logger // structured logger with "rtmp_client" component tag

app string // application name extracted from URL path (e.g. "live")
streamKey string // full stream key: "app/streamName" (e.g. "live/mystream")
streamID uint32 // message stream ID assigned by server's createStream response

trxMu sync.Mutex // protects trxID from concurrent access
trxID float64 // incrementing transaction ID for request-response matching
}

// New creates a new Client (not yet connected).
Expand Down
10 changes: 5 additions & 5 deletions internal/rtmp/client/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
// This client implements just enough of the RTMP protocol to drive the
// server through its connection lifecycle:
//
// 1. TCP dial + RTMP handshake
// 2. connect command → wait for _result
// 3. createStream command → wait for _result (stream ID)
// 4. publish or play command
// 5. Send/receive audio and video messages
// 1. TCP dial + RTMP handshake
// 2. connect command → wait for _result
// 3. createStream command → wait for _result (stream ID)
// 4. publish or play command
// 5. Send/receive audio and video messages
//
// # Limitations
//
Expand Down
8 changes: 4 additions & 4 deletions internal/rtmp/conn/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
//
// # Connection Lifecycle
//
// 1. Accept(listener) → handshake → control burst → Connection
// 2. SetMessageHandler(fn) – install the dispatch callback
// 3. Start() – begin the read loop
// 4. Close() – cancel context, close TCP, wait for goroutines
// 1. Accept(listener) → handshake → control burst → Connection
// 2. SetMessageHandler(fn) – install the dispatch callback
// 3. Start() – begin the read loop
// 4. Close() – cancel context, close TCP, wait for goroutines
//
// # Concurrency Model
//
Expand Down
18 changes: 9 additions & 9 deletions internal/rtmp/conn/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package conn
// SessionState represents where a connection is in the RTMP session lifecycle.
// Each connection progresses through these states as commands are exchanged:
//
// Uninitialized → Connected → StreamCreated → Publishing or Playing
// Uninitialized → Connected → StreamCreated → Publishing or Playing
//
// The client drives transitions by sending connect, createStream, publish/play commands.
type SessionState uint8
Expand All @@ -20,16 +20,16 @@ const (
// command exchange phase. It tracks the application name, stream identifiers,
// and the current lifecycle state.
type Session struct {
app string // application name from connect command (e.g. "live")
tcUrl string // target URL from connect (e.g. "rtmp://host/live")
flashVer string // client's Flash version string
objectEncoding uint8 // AMF encoding version (0=AMF0, must be 0 for this server)
app string // application name from connect command (e.g. "live")
tcUrl string // target URL from connect (e.g. "rtmp://host/live")
flashVer string // client's Flash version string
objectEncoding uint8 // AMF encoding version (0=AMF0, must be 0 for this server)

transactionID uint32 // incrementing counter for request-response matching
streamID uint32 // message stream ID allocated by createStream (typically 1)
streamKey string // full stream key: "app/streamName" (e.g. "live/mystream")
transactionID uint32 // incrementing counter for request-response matching
streamID uint32 // message stream ID allocated by createStream (typically 1)
streamKey string // full stream key: "app/streamName" (e.g. "live/mystream")

state SessionState // current lifecycle state
state SessionState // current lifecycle state
}

// NewSession creates a new Session in Uninitialized state.
Expand Down
1 change: 1 addition & 0 deletions internal/rtmp/handshake/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func setReadDeadline(c net.Conn, d time.Duration) error {
}
return nil
}

// setWriteDeadline sets a timeout for the next write operation on the connection.
func setWriteDeadline(c net.Conn, d time.Duration) error {
if err := c.SetWriteDeadline(time.Now().Add(d)); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion internal/rtmp/media/audio.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (

// Audio codec identifiers. These correspond to the SoundFormat IDs in the
// FLV/RTMP specification (high nibble of the first audio payload byte):
// MP3 = SoundFormat 2, AAC = SoundFormat 10, Speex = SoundFormat 11
//
// MP3 = SoundFormat 2, AAC = SoundFormat 10, Speex = SoundFormat 11
const (
AudioCodecMP3 = "MP3"
AudioCodecAAC = "AAC"
Expand Down
3 changes: 2 additions & 1 deletion internal/rtmp/media/video.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import "fmt"

// Video codec identifiers. These correspond to the CodecID values in the
// FLV/RTMP specification (low nibble of the first video payload byte):
// H264/AVC = CodecID 7, H265/HEVC = CodecID 12
//
// H264/AVC = CodecID 7, H265/HEVC = CodecID 12
const (
VideoCodecAVC = "H264" // H.264 / Advanced Video Coding (most common)
VideoCodecHEVC = "H265" // H.265 / High Efficiency Video Coding
Expand Down
26 changes: 23 additions & 3 deletions internal/rtmp/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"net"
"testing"
"time"

"github.com/alxayo/go-rtmp/internal/rtmp/handshake"
)

// TestServerStartStop verifies basic lifecycle: Start on :0, Addr non-nil, Stop idempotent.
Expand Down Expand Up @@ -39,7 +41,11 @@ func TestServerAcceptConnection(t *testing.T) {
t.Fatalf("dial failed: %v", err)
}
defer c.Close()
// Allow handshake + control burst to complete.
// Perform RTMP client handshake so the server can register the connection.
if err := handshake.ClientHandshake(c); err != nil {
t.Fatalf("client handshake failed: %v", err)
}
// Allow server to register the connection after the handshake.
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if s.ConnectionCount() == 1 {
Expand All @@ -63,6 +69,10 @@ func TestServerGracefulShutdown(t *testing.T) {
if err != nil {
t.Fatalf("dial failed: %v", err)
}
// Perform RTMP client handshake so the server can register the connection.
if err := handshake.ClientHandshake(c); err != nil {
t.Fatalf("client handshake failed: %v", err)
}
// Wait until tracked.
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
Expand All @@ -77,8 +87,18 @@ func TestServerGracefulShutdown(t *testing.T) {
if err := s.Stop(); err != nil {
t.Fatalf("stop failed: %v", err)
}
// Subsequent read/write should fail quickly due to close (handshake already occurred so we write nothing).
if _, err := c.Write([]byte{0}); err == nil {
// Subsequent read/write should fail after server close. TCP close propagation
// may need a brief moment, so retry until an error is observed.
var writeErr error
writeDeadline := time.Now().Add(time.Second)
for time.Now().Before(writeDeadline) {
_, writeErr = c.Write([]byte{0})
if writeErr != nil {
break
}
time.Sleep(5 * time.Millisecond)
}
if writeErr == nil {
t.Fatalf("expected write error after stop")
}
}
Loading