Skip to content

Commit 8a12b00

Browse files
staging-devin-ai-integration[bot]streamer45streamkit-devin
authored
chore: update MoQ dependencies to latest versions (#53)
* chore: update moq * chore: update MoQ dependencies to latest versions Co-Authored-By: Claudio Costa <devin@streamkit.dev> * fix: use explicit Container::default() for clippy pedantic Co-Authored-By: Claudio Costa <devin@streamkit.dev> * fix: inject priority into catalog JSON for JS client compatibility Co-Authored-By: Staging-Devin AI <166158716+staging-devin-ai-integration[bot]@users.noreply.github.com> * style: format catalog_to_json function Co-Authored-By: Staging-Devin AI <166158716+staging-devin-ai-integration[bot]@users.noreply.github.com> --------- Co-authored-by: streamer45 <cstcld91@gmail.com> Co-authored-by: StreamKit Devin <devin@streamkit.dev> Co-authored-by: Staging-Devin AI <166158716+staging-devin-ai-integration[bot]@users.noreply.github.com>
1 parent 0a45bc7 commit 8a12b00

File tree

11 files changed

+717
-787
lines changed

11 files changed

+717
-787
lines changed

Cargo.lock

Lines changed: 448 additions & 512 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/skit/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ jemalloc_pprof = { version = "0.8", features = ["symbolize"], optional = true }
115115
dhat = { version = "0.3", optional = true }
116116

117117
# MoQ support (optional)
118-
moq-native = { version = "0.11.0", optional = true }
118+
moq-native = { version = "0.12.1", optional = true }
119119
async-trait = { workspace = true }
120120

121121
# For glob pattern matching in permissions
@@ -130,7 +130,7 @@ getrandom = "0.3"
130130
aws-lc-rs = "1"
131131

132132
# For MoQ auth path matching (optional, with moq feature)
133-
moq-lite = { version = "0.11.0", optional = true }
133+
moq-lite = { version = "0.13.0", optional = true }
134134
blake2 = "0.10.6"
135135

136136
[features]

apps/skit/src/moq_gateway.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ impl MoqGateway {
8484
#[allow(clippy::cognitive_complexity)]
8585
pub async fn accept_connection(
8686
&self,
87-
session: moq_native::web_transport_quinn::Session,
87+
request: moq_native::Request,
8888
path: String,
8989
auth: Option<Arc<dyn streamkit_core::moq_gateway::MoqAuthChecker>>,
9090
) -> Result<(), String> {
@@ -123,8 +123,8 @@ impl MoqGateway {
123123
if let Some(connection_tx) = connection_tx {
124124
let (response_tx, response_rx) = oneshot::channel();
125125

126-
// Type-erase the WebTransport session
127-
let session_boxed: streamkit_core::moq_gateway::WebTransportSession = Box::new(session);
126+
// Type-erase the moq-native Request
127+
let session_boxed: streamkit_core::moq_gateway::WebTransportSession = Box::new(request);
128128

129129
let conn =
130130
MoqConnection { path: path.clone(), session: session_boxed, response_tx, auth };

apps/skit/src/server.rs

Lines changed: 114 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -3214,17 +3214,20 @@ fn start_moq_webtransport_acceptor(
32143214
key_path = %config.server.key_path,
32153215
"Using provided TLS certificates for MoQ WebTransport"
32163216
);
3217-
ServerTlsConfig {
3218-
cert: vec![std::path::PathBuf::from(&config.server.cert_path)],
3219-
key: vec![std::path::PathBuf::from(&config.server.key_path)],
3220-
generate: vec![],
3221-
}
3217+
let mut tls = ServerTlsConfig::default();
3218+
tls.cert = vec![std::path::PathBuf::from(&config.server.cert_path)];
3219+
tls.key = vec![std::path::PathBuf::from(&config.server.key_path)];
3220+
tls
32223221
} else {
32233222
info!("Auto-generating self-signed certificate for MoQ WebTransport (14-day validity for local development)");
3224-
ServerTlsConfig { cert: vec![], key: vec![], generate: vec!["localhost".to_string()] }
3223+
let mut tls = ServerTlsConfig::default();
3224+
tls.generate = vec!["localhost".to_string()];
3225+
tls
32253226
};
32263227

3227-
let moq_config = MoqServerConfig { bind: Some(addr), tls };
3228+
let mut moq_config = MoqServerConfig::default();
3229+
moq_config.bind = Some(addr);
3230+
moq_config.tls = tls;
32283231

32293232
info!(
32303233
address = %addr,
@@ -3256,142 +3259,41 @@ fn start_moq_webtransport_acceptor(
32563259
let auth_state = Arc::clone(&auth_state);
32573260

32583261
tokio::spawn(async move {
3259-
match request {
3260-
moq_native::Request::WebTransport(wt_request) => {
3261-
let url = wt_request.url();
3262-
let path = url.path().to_string();
3263-
3264-
// SECURITY: Never log the full URL (may contain jwt)
3265-
debug!(path = %path, "Received WebTransport connection request");
3266-
3267-
// Validate MoQ auth if enabled
3268-
let moq_auth = if auth_state.is_enabled() {
3269-
// Extract jwt from query params
3270-
let jwt = url
3271-
.query_pairs()
3272-
.find(|(k, _)| k == "jwt")
3273-
.map(|(_, v)| v.to_string());
3274-
3275-
let Some(jwt) = jwt else {
3276-
warn!(path = %path, "MoQ auth failed: missing jwt parameter");
3277-
let _ = wt_request
3278-
.close(axum::http::StatusCode::UNAUTHORIZED)
3279-
.await;
3280-
return;
3281-
};
3282-
3283-
// Validate JWT
3284-
let claims = match auth_state.validate_moq_token(&jwt) {
3285-
Ok(c) => c,
3286-
Err(e) => {
3287-
warn!(path = %path, error = %e, "MoQ JWT validation failed");
3288-
let _ = wt_request
3289-
.close(axum::http::StatusCode::UNAUTHORIZED)
3290-
.await;
3291-
return;
3292-
},
3293-
};
3294-
3295-
// Check audience
3296-
if claims.aud != crate::auth::AUD_MOQ {
3297-
warn!(path = %path, expected = crate::auth::AUD_MOQ, actual = %claims.aud, "MoQ auth failed: wrong audience");
3298-
let _ = wt_request
3299-
.close(axum::http::StatusCode::UNAUTHORIZED)
3300-
.await;
3301-
return;
3302-
}
3303-
3304-
let token_hash = crate::auth::hash_token(&jwt);
3305-
3306-
// Enforce "tokens we mint" policy (parity with HTTP API auth).
3307-
let metadata_store = auth_state.token_metadata_store().cloned();
3308-
let Some(metadata_store) = metadata_store else {
3309-
warn!(path = %path, "MoQ auth failed: token metadata store not available");
3310-
let _ = wt_request
3311-
.close(axum::http::StatusCode::SERVICE_UNAVAILABLE)
3312-
.await;
3313-
return;
3314-
};
3315-
3316-
let meta = match metadata_store.get(&claims.jti).await {
3317-
Ok(Some(meta)) => meta,
3318-
Ok(None) => {
3319-
warn!(path = %path, jti = %claims.jti, "MoQ auth failed: token not recognized (not minted by this server)");
3320-
let _ = wt_request
3321-
.close(axum::http::StatusCode::UNAUTHORIZED)
3322-
.await;
3323-
return;
3324-
},
3325-
Err(e) => {
3326-
warn!(path = %path, error = %e, "MoQ auth failed: metadata store error");
3327-
let _ = wt_request
3328-
.close(axum::http::StatusCode::SERVICE_UNAVAILABLE)
3329-
.await;
3330-
return;
3331-
},
3332-
};
3333-
3334-
// Extra robustness: ensure the presented token matches the stored hash.
3335-
if meta.token_hash != token_hash {
3336-
warn!(path = %path, jti = %claims.jti, "MoQ auth failed: token hash mismatch");
3337-
let _ = wt_request
3338-
.close(axum::http::StatusCode::UNAUTHORIZED)
3339-
.await;
3340-
return;
3341-
}
3342-
3343-
if meta.revoked {
3344-
warn!(path = %path, jti = %claims.jti, "MoQ auth failed: token revoked");
3345-
let _ = wt_request
3346-
.close(axum::http::StatusCode::UNAUTHORIZED)
3347-
.await;
3348-
return;
3349-
}
3350-
3351-
// Check revocation
3352-
if auth_state.is_revoked(&token_hash) {
3353-
warn!(path = %path, "MoQ auth failed: token revoked");
3354-
let _ = wt_request
3355-
.close(axum::http::StatusCode::UNAUTHORIZED)
3356-
.await;
3357-
return;
3358-
}
3359-
3360-
// Verify root matches path and reduce permissions
3361-
match crate::auth::verify_moq_token(&claims, &path) {
3362-
Ok(ctx) => Some(Arc::new(ctx)
3363-
as Arc<
3364-
dyn streamkit_core::moq_gateway::MoqAuthChecker,
3365-
>),
3366-
Err(e) => {
3367-
warn!(path = %path, error = %e, "MoQ path verification failed");
3368-
let _ = wt_request
3369-
.close(axum::http::StatusCode::UNAUTHORIZED)
3370-
.await;
3371-
return;
3372-
},
3373-
}
3374-
} else {
3375-
None
3376-
};
3377-
3378-
match wt_request.ok().await {
3379-
Ok(session) => {
3380-
if let Err(e) = gateway
3381-
.accept_connection(session, path.clone(), moq_auth)
3382-
.await
3383-
{
3384-
warn!(path = %path, error = %e, "Failed to route WebTransport connection");
3385-
}
3386-
},
3387-
Err(e) => {
3388-
warn!(path = %path, error = %e, "Failed to accept WebTransport session");
3389-
},
3390-
}
3391-
},
3392-
moq_native::Request::Quic(_quic_request) => {
3393-
debug!("Received raw QUIC connection (not WebTransport), ignoring");
3394-
},
3262+
// Extract URL data before consuming the request.
3263+
// request.url() borrows, so we copy what we need first.
3264+
let (path, jwt_param) = {
3265+
let Some(url) = request.url() else {
3266+
debug!("Received MoQ connection without URL (raw QUIC), ignoring");
3267+
return;
3268+
};
3269+
let path = url.path().to_string();
3270+
let jwt_param = url
3271+
.query_pairs()
3272+
.find(|(k, _)| k == "jwt")
3273+
.map(|(_, v)| v.to_string());
3274+
(path, jwt_param)
3275+
};
3276+
3277+
// SECURITY: Never log the full URL (may contain jwt)
3278+
debug!(path = %path, "Received MoQ connection request");
3279+
3280+
// Validate MoQ auth if enabled
3281+
let moq_auth = if auth_state.is_enabled() {
3282+
match validate_moq_auth(&auth_state, &path, jwt_param).await {
3283+
Ok(ctx) => Some(ctx),
3284+
Err(status) => {
3285+
let _ = request.reject(status).await;
3286+
return;
3287+
},
3288+
}
3289+
} else {
3290+
None
3291+
};
3292+
3293+
if let Err(e) =
3294+
gateway.accept_connection(request, path.clone(), moq_auth).await
3295+
{
3296+
warn!(path = %path, error = %e, "Failed to route MoQ connection");
33953297
}
33963298
});
33973299
}
@@ -3407,6 +3309,75 @@ fn start_moq_webtransport_acceptor(
34073309
Ok(())
34083310
}
34093311

3312+
/// Validates MoQ auth for an incoming connection, returning the auth context on success
3313+
/// or the HTTP status code to reject with on failure.
3314+
#[cfg(feature = "moq")]
3315+
async fn validate_moq_auth(
3316+
auth_state: &crate::auth::AuthState,
3317+
path: &str,
3318+
jwt_param: Option<String>,
3319+
) -> Result<Arc<dyn streamkit_core::moq_gateway::MoqAuthChecker>, axum::http::StatusCode> {
3320+
let Some(jwt) = jwt_param else {
3321+
warn!(path = %path, "MoQ auth failed: missing jwt parameter");
3322+
return Err(axum::http::StatusCode::UNAUTHORIZED);
3323+
};
3324+
3325+
// Validate JWT
3326+
let claims = auth_state.validate_moq_token(&jwt).map_err(|e| {
3327+
warn!(path = %path, error = %e, "MoQ JWT validation failed");
3328+
axum::http::StatusCode::UNAUTHORIZED
3329+
})?;
3330+
3331+
// Check audience
3332+
if claims.aud != crate::auth::AUD_MOQ {
3333+
warn!(path = %path, expected = crate::auth::AUD_MOQ, actual = %claims.aud, "MoQ auth failed: wrong audience");
3334+
return Err(axum::http::StatusCode::UNAUTHORIZED);
3335+
}
3336+
3337+
let token_hash = crate::auth::hash_token(&jwt);
3338+
3339+
// Enforce "tokens we mint" policy (parity with HTTP API auth).
3340+
let metadata_store = auth_state.token_metadata_store().cloned().ok_or_else(|| {
3341+
warn!(path = %path, "MoQ auth failed: token metadata store not available");
3342+
axum::http::StatusCode::SERVICE_UNAVAILABLE
3343+
})?;
3344+
3345+
let meta = metadata_store.get(&claims.jti).await.map_err(|e| {
3346+
warn!(path = %path, error = %e, "MoQ auth failed: metadata store error");
3347+
axum::http::StatusCode::SERVICE_UNAVAILABLE
3348+
})?;
3349+
3350+
let Some(meta) = meta else {
3351+
warn!(path = %path, jti = %claims.jti, "MoQ auth failed: token not recognized (not minted by this server)");
3352+
return Err(axum::http::StatusCode::UNAUTHORIZED);
3353+
};
3354+
3355+
// Extra robustness: ensure the presented token matches the stored hash.
3356+
if meta.token_hash != token_hash {
3357+
warn!(path = %path, jti = %claims.jti, "MoQ auth failed: token hash mismatch");
3358+
return Err(axum::http::StatusCode::UNAUTHORIZED);
3359+
}
3360+
3361+
if meta.revoked {
3362+
warn!(path = %path, jti = %claims.jti, "MoQ auth failed: token revoked");
3363+
return Err(axum::http::StatusCode::UNAUTHORIZED);
3364+
}
3365+
3366+
// Check revocation
3367+
if auth_state.is_revoked(&token_hash) {
3368+
warn!(path = %path, "MoQ auth failed: token revoked");
3369+
return Err(axum::http::StatusCode::UNAUTHORIZED);
3370+
}
3371+
3372+
// Verify root matches path and reduce permissions
3373+
crate::auth::verify_moq_token(&claims, path)
3374+
.map_err(|e| {
3375+
warn!(path = %path, error = %e, "MoQ path verification failed");
3376+
axum::http::StatusCode::UNAUTHORIZED
3377+
})
3378+
.map(|ctx| Arc::new(ctx) as Arc<dyn streamkit_core::moq_gateway::MoqAuthChecker>)
3379+
}
3380+
34103381
/// Starts the HTTP/HTTPS server and optional MoQ WebTransport acceptor.
34113382
///
34123383
/// # Errors

crates/nodes/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ url = { version = "2.5.8", optional = true, features = ["serde"] }
4646
rquickjs = { version = "0.11", features = ["array-buffer", "futures", "loader", "parallel"], optional = true }
4747
wildmatch = { version = "2.6", optional = true }
4848

49-
moq-transport = { version = "0.12.1", optional = true }
50-
moq-native = { version = "0.11.0", optional = true }
51-
moq-lite = { version = "0.11.0", optional = true }
52-
hang = { version = "0.10.0", optional = true }
49+
moq-transport = { version = "0.12.2", optional = true }
50+
moq-native = { version = "0.12.1", optional = true }
51+
moq-lite = { version = "0.13.0", optional = true }
52+
hang = { version = "0.13.0", optional = true }
5353

5454
# For local dev, debugging moq stuff
5555
# moq-transport = { version = "0.11.0", optional = true }

crates/nodes/src/transport/moq/mod.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ use streamkit_core::{
3131

3232
static SHARED_INSECURE_CLIENT: OnceLock<Result<moq_native::Client, String>> = OnceLock::new();
3333

34+
/// Returns a cached `moq_native::Client` with TLS verification disabled.
35+
///
36+
/// In moq-native 0.12, publish/consume origins are set on the `Client` via builder methods
37+
/// (`with_publish` / `with_consume`) before calling `connect()`. The cached client has
38+
/// neither set, so callers must clone and configure it for each connection.
3439
fn shared_insecure_client() -> Result<moq_native::Client, StreamKitError> {
3540
let client = SHARED_INSECURE_CLIENT.get_or_init(|| {
3641
let mut client_config = moq_native::ClientConfig::default();
@@ -46,6 +51,27 @@ fn shared_insecure_client() -> Result<moq_native::Client, StreamKitError> {
4651
}
4752
}
4853

54+
/// Serialize a catalog to JSON with `priority` fields injected into `video` and `audio`.
55+
///
56+
/// The published `@moq/hang` JS client (0.1.2) still requires `priority` in the catalog
57+
/// schema, but the Rust `hang` 0.13.0 crate removed it from the structs.
58+
/// The upstream JS source has already dropped the requirement, but a new npm release
59+
/// hasn't been published yet. This shim keeps the two sides compatible.
60+
pub(super) fn catalog_to_json(catalog: &hang::catalog::Catalog) -> Result<String, StreamKitError> {
61+
let mut value = serde_json::to_value(catalog)
62+
.map_err(|e| StreamKitError::Runtime(format!("Failed to serialize catalog: {e}")))?;
63+
64+
if let Some(video) = value.get_mut("video").and_then(|v| v.as_object_mut()) {
65+
video.entry("priority").or_insert(serde_json::json!(60));
66+
}
67+
if let Some(audio) = value.get_mut("audio").and_then(|v| v.as_object_mut()) {
68+
audio.entry("priority").or_insert(serde_json::json!(80));
69+
}
70+
71+
serde_json::to_string(&value)
72+
.map_err(|e| StreamKitError::Runtime(format!("Failed to serialize catalog: {e}")))
73+
}
74+
4975
pub(super) fn redact_url_str_for_logs(raw: &str) -> String {
5076
raw.parse::<Url>().map_or_else(
5177
|_| raw.split(['?', '#']).next().unwrap_or(raw).to_string(),

0 commit comments

Comments
 (0)