feat(rtmp): add sans-I/O RTMP publish client and publisher node#266
feat(rtmp): add sans-I/O RTMP publish client and publisher node#266streamer45 merged 28 commits intomainfrom
Conversation
Add a new transport::rtmp::publish node that publishes encoded H.264 video and AAC audio to arbitrary RTMP/RTMPS endpoints using the shiguredo_rtmp (Sans I/O) library. Key features: - Accepts EncodedVideo(H264) on 'video' pin and EncodedAudio(AAC) on 'audio' pin (sink node, no outputs) - Converts H.264 Annex B to AVCC format for RTMP/FLV wire format - Sends AVC sequence headers (SPS/PPS) on keyframes - Sends AAC AudioSpecificConfig sequence header on first audio packet - Supports both RTMP and RTMPS (TLS via tokio-rustls) - Feature-gated behind 'rtmp' feature (included in defaults) Also includes a sample compositing pipeline (moq_to_rtmp_composite.yml) demonstrating MoQ input -> compositor -> YouTube Live RTMP output with AAC audio encoding and OpenH264 video encoding. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
- Filter SPS/PPS NALUs from AVCC NalUnit data (they belong only in the AVC sequence header, matching FFmpeg/OBS behavior) - Mask stream key in info-level URL log to prevent credential exposure - Add sample_rate/channels config params to RtmpPublishConfig instead of hardcoding 48kHz stereo in the AAC AudioSpecificConfig - Add rationale comments to all #[allow(clippy::...)] suppressions - Remove unnecessary #[allow(cast_possible_truncation)] on build_aac_audio_specific_config - Add regression tests for SPS/PPS filtering and stream key masking Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
- Mask stream key in error message (was leaking in StreamKitError) - Add explicit 48_000 match arm + warn on unrecognized AAC sample rate - Handle DisconnectedByPeer event: drain_events now returns bool to break the publishing loop when the peer signals disconnect - Add flush() to RtmpStream + call it after draining send buffer to ensure TLS-buffered data is sent immediately Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
- Wrap publishing loop in async block to capture errors, then emit_failed or emit_stopped accordingly (matches http.rs pattern) - Set TCP_NODELAY on connect to avoid Nagle buffering latency Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
…nished' Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
…peline
- Add stream_key and stream_key_env config fields to RtmpPublishConfig.
stream_key_env reads the key from an environment variable at startup
(takes precedence over stream_key). This avoids inlining secrets in
YAML pipeline files.
- Fix sample pipeline (moq_to_rtmp_composite.yml):
- Add missing VP9 decoder between moq_peer and compositor (moq_peer
outputs EncodedVideo/VP9, compositor expects RawVideo).
- Fix video pin name: video/hd (not video/data) — matches the track
name used by @moq/publish.
- Use stream_key_env: RTMP_STREAM_KEY instead of embedding the key
in the URL.
- Add 7 unit tests for resolve_rtmp_url covering: inline URL, separate
key, env var precedence, missing/empty env var, trailing slash.
Signed-off-by: StreamKit Devin <devin@streamkit.dev>
Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Three runtime issues fixed: 1. **A/V timestamp desync** — Audio and video packets arrived from different pipeline paths (compositor running clock vs. MoQ origin) with unrelated timestamp epoch bases. YouTube detected the severe desync and disconnected after ~16 seconds. Fix: compute RTMP timestamps from wall-clock elapsed time (Instant::now() at publish start), guaranteeing audio and video share a common time base regardless of upstream timestamp origins. 2. **No break on DISCONNECTING** — When the RTMP server disconnected, the node kept trying to send packets and spammed hundreds of InvalidState warnings. Fix: check connection.state() before each send; break the loop if no longer Publishing. 3. **Empty NalUnit guard** — Access units containing only SPS/PPS (no slice NALUs) produced an empty AVCC payload. Some RTMP servers reject zero-length NalUnit frames. Fix: skip the NalUnit frame when video_data is empty (SPS/PPS are still sent in the sequence header). Also fixes: - Sample pipeline aspect ratio: added explicit rect with aspect_fit for the main webcam layer to prevent stretching non-16:9 inputs. - Env var naming: changed sample from RTMP_STREAM_KEY to SKIT_RTMP_STREAM_KEY; documented that the env var name is fully user-controlled for multi-output scalability. Signed-off-by: Devin AI <devin@cognition.ai> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
…sync Replace wall-clock timestamps with per-track rebase logic that mirrors the WebM muxer's stage_frame approach. Source timestamps from mic and camera are synchronized (same browser epoch), so preserving them gives correct A/V sync. The rebase offset aligns tracks that start at different wall-clock times (e.g. compositor early frames vs. late MoQ audio) and handles compositor calibration backward jumps. Signed-off-by: Devin AI <devin@cognition.ai> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Signed-off-by: Devin AI <devin@cognition.ai> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Signed-off-by: Devin AI <devin@cognition.ai> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
- Non-blocking ACK drain after each flush_send_buf using biased tokio::select! with std::future::ready(). Prevents ACK window overflow that caused ~5s disconnects on RTMPS (YouTube Live). Works for both plain TCP and TLS streams. - Replace dangerous().with_custom_certificate_verifier(Verifier::new()) with BuilderVerifierExt::with_platform_verifier() to ensure the CryptoProvider is available on Linux (item 1). - Upgrade rustls-platform-verifier 0.5 -> 0.6, eliminating duplicate versions in the binary (item 2). - Split shared packet_count into video_packet_count / audio_packet_count with per-track metric labels (item 3). - Replace (us / 1_000) as i64 with i64::try_from().unwrap_or(i64::MAX) to guard against u64 -> i64 overflow (item 6). - Add comment on Packet::Video arm explaining it is included for type completeness, not expected in practice (item 8). - Extract 500 ms backward-jump threshold to named constant BACKWARD_JUMP_THRESHOLD_MS (item 9). - Add note about unique env-var names making #[serial] unnecessary for current tests (item 10). Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
The previous non-blocking drain (biased select + std::future::ready) never actually read data: stream.read() returns Poll::Pending on first poll of a new future (reactor hasn't registered the fd yet), so the ready() arm always fires immediately, making the drain a no-op. Two-part fix: 1. Main select loop is now biased with TCP read as the FIRST arm. Server ACKs / pings are always processed before sending more media, preventing the unacked_bytes > window * 2 overflow that caused ~9s disconnects. 2. Post-flush drain uses try_read() — a direct non-blocking syscall that bypasses the tokio reactor — to drain ACKs already sitting in the OS receive buffer. For TLS, try_read returns WouldBlock (no synchronous decrypt path) and the biased main loop handles draining instead. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
- Add best-effort TCP shutdown (FIN) when publishing loop exits. The shiguredo_rtmp library does not expose deleteStream/FCUnpublish on the publish client, so TCP close is the next best signal. - Fix mask_stream_key over-redaction: bare URLs without a stream key (e.g. rtmp://host/app) now show the app name instead of redacting it. Only the last path segment is redacted when 2+ segments are present (i.e. when a key is actually embedded). - Add tests: mask_stream_key_bare_url_not_over_redacted, mask_stream_key_no_scheme. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
The shiguredo_rtmp library auto-disconnects when total_bytes_sent − last_ack_received > local_ack_window_size × 2. Many RTMP ingest servers (including YouTube Live) do not send Acknowledgement messages at the rate implied by SetPeerBandwidth, yet clients like OBS and FFmpeg work fine because librtmp does not enforce ACK-window checks on the send side. To match that behaviour, after the handshake completes we feed a synthetic SetPeerBandwidth RTMP chunk into the connection, raising local_ack_window_size to ~2 GB (u32::MAX / 2). This effectively disables the overly-strict disconnect while still relying on TCP flow control for backpressure — the same approach used by all major RTMP publishing clients. Includes a unit test verifying the synthetic chunk is well-formed and accepted by the library's decoder. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
The shiguredo_rtmp library auto-disconnects when total_bytes_sent − last_ack_received > local_ack_window_size × 2. Many RTMP ingest servers (including YouTube Live) do not send Acknowledgement messages at the rate implied by SetPeerBandwidth, yet clients like OBS and FFmpeg work fine because librtmp does not enforce ACK-window checks on the send side. To match that behaviour, after the handshake completes we feed a synthetic SetPeerBandwidth RTMP chunk into the connection, raising local_ack_window_size to ~2 GB (u32::MAX / 2). This effectively disables the overly-strict disconnect while still relying on TCP flow control for backpressure — the same approach used by all major RTMP publishing clients. Includes a unit test verifying the synthetic chunk is well-formed and accepted by the library's decoder. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Prevents a potential u32 overflow panic in debug builds when last_ms is near u32::MAX (~49 days of continuous streaming). Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
…drain propagation) 1. Propagate disconnect from drain_events in flush_send_buf — the return value was silently discarded, delaying shutdown detection on a dead connection. 2. Add 10s TCP connect timeout — TcpStream::connect had no timeout, allowing firewalled hosts to hang for 2+ minutes. 3. Validate AAC channels (1..=7) and sample_rate at node startup — channels > 7 would overflow the 4-bit channelConfiguration field, producing a corrupt AudioSpecificConfig. Unknown sample rates are now rejected instead of silently defaulting to 48 kHz. 4. Replace expect() + as-u8 cast in build_aac_audio_specific_config with try_from + map_or_else to satisfy clippy::expect_used and clippy::cast_possible_truncation. Adds 4 unit tests for the AAC config validation. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
- Add doc comment to parse_annexb_nal_units noting the known 00 00 01 in-payload ambiguity and that emulation prevention bytes make OpenH264 output safe. Shared tech debt with containers/mp4.rs. - Set gop_size: 60 explicitly in the sample RTMP pipeline for clarity (keyframe every 2s at 30 fps). Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
…client Replace the external shiguredo_rtmp library dependency with a self-contained, sans-I/O RTMP publish client module (rtmp_client.rs) within the streamkit-nodes crate. Key improvements over the vendored library: - Correct CSID assignment: csid=2 for protocol control only, csid=3+ for commands and media (fixes Twitch connection rejection) - Server-assigned stream ID: uses the stream ID from createStream _result instead of hardcoding 2 (fixes Twitch silent publish ignore) - No ACK window enforcement on send side: matches OBS/FFmpeg behavior, eliminates the override_ack_window hack - tcUrl without default port: prevents degraded Twitch responses - Random handshake data: avoids all-zero C1 that some servers fingerprint The module implements: - RTMP URL parsing (rtmp:// and rtmps:// schemes) - Client-side handshake state machine (C0+C1 -> S0+S1+S2 -> C2) - Chunk encoder with per-CSID fmt 0/1/2/3 header compression - Chunk decoder with partial-read handling and multi-chunk reassembly - AMF0 codec subset (Number, String, Object, Null, Boolean) - Full connection state machine (Handshaking -> Publishing) - FLV-format video (H.264/AVC) and audio (AAC) frame encoding - 44 unit tests covering all protocol components Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
| .bitrate(BitRate::from_bps(config.bitrate_kbps.saturating_mul(1000))) | ||
| .max_frame_rate(FrameRate::from_hz(config.max_frame_rate)) | ||
| .rate_control_mode(RateControlMode::Bitrate) | ||
| .intra_frame_period(IntraFramePeriod::from_num_frames(config.gop_size)) |
There was a problem hiding this comment.
🚩 OpenH264 gop_size=0 passed through without validation
The new gop_size field in OpenH264EncoderConfig (at crates/nodes/src/video/openh264.rs:65) accepts 0 to mean "let the encoder decide" per the doc comment. However, unlike bitrate_kbps and max_frame_rate, there's no validation in OpenH264EncoderNode::new() — the value is passed directly to IntraFramePeriod::from_num_frames(config.gop_size). This relies on the openh264 crate correctly interpreting 0 as "auto". The doc comment states this behavior, so it's intentional, but it might be worth verifying that the openh264 crate's IntraFramePeriod::from_num_frames(0) actually triggers auto mode rather than causing unexpected behavior (e.g., every-frame IDR or an error).
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
- Move ACK byte counter after handshake guard so only post-handshake bytes are counted (RTMP spec §5.4) - Return error instead of silently truncating AMF0 strings >65535 bytes - Add MAX_SEND_BUF (8 MB) high-water mark; disconnect on backpressure - Remove unused _c1 field from Handshake struct - Preserve leftover bytes after S2 in handshake completion so pipelined server messages (WinAckSize, SetPeerBandwidth) are not discarded - Detect Disconnecting state in drive_handshake to fail fast on server rejection instead of waiting for 10s timeout - Add #[serde(deny_unknown_fields)] to RtmpPublishConfig - Add 30s timeout to TCP read in publishing loop to prevent hanging when server is unresponsive and input channels are idle - Refactor resolve_rtmp_url to accept env resolver, eliminating unsound std::env::set_var/remove_var calls in tests (Rust 1.83+) - Replace is_multiple_of with idiomatic form preferred by clippy - Add test for handshake leftover byte preservation Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
The chunk decoder's decode_message() returned Ok(None) after decoding each individual chunk of a multi-chunk message, causing the caller's while-let loop to exit before continuation chunks were processed. When the server sent messages exceeding 128 bytes (the default chunk size) — such as YouTube's _result response to the connect command (~191 bytes, split into two chunks) — the second chunk was stranded in the decoder buffer, the _result was never processed, and the connection timed out. Fix: wrap decode_message() in an internal loop that keeps consuming continuation chunks from the buffer until the message is complete or there truly isn't enough data for the next chunk. Also: - Replace is_multiple_of() with % N == 0 for MSRV < 1.85 compat - Add full_youtube_server_simulation integration test covering the complete Handshaking → Publishing state machine flow Signed-off-by: Devin AI <devin@cognition.ai> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Signed-off-by: Devin AI <devin@cognition.ai> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Switch the sample RTMP pipeline from rtmp:// to rtmps:// (YouTube's TLS endpoint) and add a commented Twitch ingest URL for easy switching. Signed-off-by: Devin AI <devin@cognition.ai> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
The comment claimed multiple decode_message() calls were needed for multi-chunk messages. Since the internal loop fix, a single call now assembles the full message. Signed-off-by: Devin AI <devin@cognition.ai> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Summary
Replaces the vendored
shiguredo_rtmplibrary with a self-contained, sans-I/O RTMP publish client module (rtmp_client.rs, ~2300 lines). This fixes two spec-compliance bugs in the old library and removes the vendored dependency entirely.Spec fixes vs shiguredo_rtmp:
createStream_result(was: hardcoded to 2 → Twitch silently ignored publish)override_ack_windowhack (matches OBS/FFmpeg behavior)Critical bug fix — chunk decoder multi-chunk reassembly:
The chunk decoder's
decode_message()returnedOk(None)after decoding each individual chunk of a multi-chunk message. The caller'swhile let Some(msg)loop would exit, leaving continuation chunks stranded in the buffer. This caused YouTube Live's_resultresponse (~191 bytes, split into two 128-byte chunks) to never be processed, resulting in a 10s handshake timeout. Fixed by wrapping the decoder in an internal loop that keeps consuming continuation chunks until the message is complete or the buffer is truly exhausted.Other fixes from code review:
deny_unknown_fieldsonRtmpPublishConfigstd::env::set_varreplaced with env resolver pattern (Rust 1.83+ unsoundness)is_multiple_of()→% N == 0for MSRV < 1.85 compatdrive_handshakefails fast on server rejection (Disconnecting state)_c1field from Handshake structrtmps://with commented Twitch URLTest coverage: 72 tests (27 existing rtmp.rs + 45 new rtmp_client.rs), including a full YouTube-like server simulation test that exercises the complete Handshaking → Publishing state machine flow with multi-chunk message reassembly.
Validated: Publishing confirmed working on both YouTube Live and Twitch.
Review & Testing Checklist for Human
decode_message()handles edge cases — interleaved csids (server sending chunks from different csids between continuation chunks of the same message), partial continuation chunks at TCP boundariesMAX_SEND_BUFlimit — verify this is appropriate for typical bitrates and transient network stalls (at 6 Mbps, ~10s of buffering)Notes
data.to_vec()on audio frames (~128 bytes/frame) was intentionally deferred — fixing it would require changingAudioFrame::datato&[u8]orBytes, which is a broader API changetry_readno-op for TLS connections is documented but not fixed — the biased select handles the ACK drain adequately for RTMPSadvance_send_bufusesVec::drain(..n)which is O(remaining_len) — acceptable given frequent flushes keep the buffer small, but worth revisiting if profiling shows contentionLink to Devin session: https://staging.itsdev.in/sessions/b7c48152ed0f4a6b9f1e618282433fb0
Requested by: @streamer45