From 01658214b0ad9def4ff5e87e6b2a49dacc5f0788 Mon Sep 17 00:00:00 2001 From: Sasha Le Date: Sat, 28 Feb 2026 18:03:39 +1000 Subject: [PATCH 01/21] optimization --- bridges/punchd-bridge/gateway/public/js/sw.js | 230 +++++-- .../gateway/public/js/webrtc-upgrade.js | 395 +++++++++-- .../gateway/src/proxy/http-proxy.ts | 151 ++-- .../gateway/src/webrtc/peer-handler.ts | 647 +++++++++++++----- script/tidecloak/start.sh | 2 +- server/lib/tidecloakApi.ts | 31 + server/routes.ts | 22 +- 7 files changed, 1146 insertions(+), 332 deletions(-) diff --git a/bridges/punchd-bridge/gateway/public/js/sw.js b/bridges/punchd-bridge/gateway/public/js/sw.js index c4188f9..2037804 100644 --- a/bridges/punchd-bridge/gateway/public/js/sw.js +++ b/bridges/punchd-bridge/gateway/public/js/sw.js @@ -1,10 +1,10 @@ /** * Service Worker for WebRTC DataChannel HTTP tunneling. * - * Intercepts same-origin sub-resource requests and routes them through - * the page's WebRTC DataChannel when available. Navigation requests - * always use the network (relay) since they load new pages that need - * to establish their own DataChannel. + * Intercepts same-origin requests (including navigations) and routes + * them through the page's WebRTC DataChannel when available. If the + * requesting client has no DC (e.g. new tab), any other tab's active + * DC is used as a proxy. * * The page signals DC readiness via postMessage({ type: "dc_ready" }). * Only clients that have signaled are used for DataChannel routing. @@ -74,9 +74,43 @@ function waitForDc(clientId, timeoutMs) { }); } +/** Find any active DC client for routing (e.g. new tab without its own DC). */ +function findAnyDcClient() { + if (dcClients.size === 0) return Promise.resolve(null); + return self.clients.matchAll({ type: "window" }).then(function (allClients) { + var alive = {}; + allClients.forEach(function (c) { alive[c.id] = true; }); + for (var id of dcClients) { + if (alive[id]) return id; + } + // All DC clients are gone — clean up stale entries + dcClients.clear(); + return null; + }); +} + /** Gateway-internal paths — skip DataChannel, go through relay. */ var GATEWAY_PATHS = /^\/(js\/|auth\/|login|webrtc-config|_idp\/|realms\/|resources\/|portal|health)/; +/** Fetch with retry — retries on network errors and non-ok responses. */ +function fetchWithRetry(request, retries, delay) { + return fetch(request).then(function (resp) { + if (resp.ok || retries <= 0) return resp; + return new Promise(function (resolve) { + setTimeout(function () { + resolve(fetchWithRetry(request, retries - 1, delay)); + }, delay); + }); + }).catch(function (err) { + if (retries <= 0) throw err; + return new Promise(function (resolve, reject) { + setTimeout(function () { + fetchWithRetry(request, retries - 1, delay).then(resolve).catch(reject); + }, delay); + }); + }); +} + function extractPrefix(pathname) { var m = pathname.match(/^\/__b\/[^/]+/); return m ? m[0] : null; @@ -88,33 +122,44 @@ function stripPrefix(pathname) { } self.addEventListener("fetch", function (event) { - // Navigation requests (page loads) always use relay — new pages - // need to establish their own DataChannel - if (event.request.mode === "navigate") return; - var url = new URL(event.request.url); - // Intercept requests to localhost (any port) that target TideCloak - // paths (/realms/*, /resources/*). The SDK/adapter may construct - // absolute URLs using the TideCloak's internal localhost address. + // Intercept requests to localhost (any port). Backends (Jellyfin, etc.) + // may construct absolute localhost URLs in API responses (e.g. image URLs + // like http://localhost:8096/Items/{id}/Images/Primary). These bypass + // the patchScript's URL rewriting and would be blocked by CSP img-src 'self'. // Rewrite them to same-origin so they route through the gateway proxy. if ( url.origin !== self.location.origin && - (url.hostname === "localhost" || url.hostname === "127.0.0.1") && - (url.pathname.startsWith("/realms/") || url.pathname.startsWith("/resources/")) + (url.hostname === "localhost" || url.hostname === "127.0.0.1") ) { - console.log("[SW] Rewriting localhost request:", event.request.url); - var rewrittenUrl = self.location.origin + url.pathname + url.search; event.respondWith( - fetch(new Request(rewrittenUrl, { - method: event.request.method, - headers: event.request.headers, - body: event.request.method !== "GET" && event.request.method !== "HEAD" - ? event.request.body - : undefined, - credentials: "same-origin", - redirect: event.request.redirect, - })) + (async function () { + var targetPath = url.pathname + url.search; + // Add backend prefix from the requesting client's page URL + if (event.clientId && !targetPath.startsWith("/__b/")) { + try { + var reqClient = await self.clients.get(event.clientId); + if (reqClient) { + var clientPrefix = extractPrefix(new URL(reqClient.url).pathname); + if (clientPrefix && !GATEWAY_PATHS.test(url.pathname)) { + targetPath = clientPrefix + targetPath; + } + } + } catch (e) {} + } + var rewrittenUrl = self.location.origin + targetPath; + console.log("[SW] Rewriting localhost request:", event.request.url, "→", rewrittenUrl); + return fetch(new Request(rewrittenUrl, { + method: event.request.method, + headers: event.request.headers, + body: event.request.method !== "GET" && event.request.method !== "HEAD" + ? event.request.body + : undefined, + credentials: "same-origin", + redirect: event.request.redirect, + })); + })() ); return; } @@ -122,38 +167,68 @@ self.addEventListener("fetch", function (event) { if (url.origin !== self.location.origin) return; // Skip gateway-internal paths (strip prefix first for matching) - if (GATEWAY_PATHS.test(stripPrefix(url.pathname))) return; + if (GATEWAY_PATHS.test(stripPrefix(url.pathname))) { + // Retry chunk/bundle file loading on failure — fast SPA navigation + // can saturate connections, causing transient load failures. + if (/\.(chunk|bundle)\.(js|css)/.test(url.pathname)) { + event.respondWith(fetchWithRetry(event.request, 2, 500)); + } + return; + } - // If DC is already active, route through it immediately. - // If not yet ready, wait up to 8s for it — this prevents the burst - // of sub-resource requests from flooding the STUN relay while - // WebRTC is still connecting. Falls back to network on timeout. - if (!event.clientId) return; + // Navigation requests (page loads, reloads) should NEVER wait for DC — + // let them go through relay immediately so the page loads fast. + // DC will be set up in the background and used for subsequent requests. + var isNavigation = event.request.mode === "navigate"; + + // Route through DataChannel if possible. Priority: + // 1. Requesting client has DC → use it immediately + // 2. Any other client has DC → use it (new tabs, redirects) + // 3. Wait briefly for requesting client's DC (only for subresources) + // 4. Fall back to network (relay) + if (event.clientId && dcClients.has(event.clientId)) { + event.respondWith(rewriteAndHandle(event, event.clientId)); + return; + } - if (dcClients.has(event.clientId)) { - event.respondWith(rewriteAndHandle(event)); + if (dcClients.size > 0) { + event.respondWith( + findAnyDcClient().then(function (dcClientId) { + if (dcClientId) return rewriteAndHandle(event, dcClientId); + // No live DC client found — navigations go straight to relay + if (isNavigation) return fetch(event.request); + if (event.clientId) { + return waitForDc(event.clientId, 3000).then(function (ready) { + if (ready) return rewriteAndHandle(event, event.clientId); + return fetch(event.request); + }); + } + return fetch(event.request); + }) + ); return; } + // No DC clients at all — navigations go straight to relay (don't block page load) + if (isNavigation || !event.clientId) return; + + // Subresource request: wait briefly for DC, then fall back to relay event.respondWith( - waitForDc(event.clientId, 8000).then(function (ready) { - if (ready) { - return rewriteAndHandle(event); - } - // DC didn't connect in time — fall back to network (relay) + waitForDc(event.clientId, 3000).then(function (ready) { + if (ready) return rewriteAndHandle(event, event.clientId); return fetch(event.request); }) ); }); -async function rewriteAndHandle(event) { +async function rewriteAndHandle(event, clientId) { var request = event.request; var url = new URL(request.url); - // Prepend /__b/ prefix from requesting client if needed - if (!url.pathname.startsWith("/__b/") && event.clientId) { + // Prepend /__b/ prefix from DC client's page if needed + if (!url.pathname.startsWith("/__b/") && clientId) { try { - var client = await self.clients.get(event.clientId); + var client = await self.clients.get(clientId); if (client) { var prefix = extractPrefix(new URL(client.url).pathname); if (prefix && !GATEWAY_PATHS.test(url.pathname)) { @@ -167,7 +242,7 @@ async function rewriteAndHandle(event) { } } - return handleViaDataChannel(event.clientId, request); + return handleViaDataChannel(clientId, request); } async function handleViaDataChannel(clientId, request) { @@ -194,6 +269,11 @@ async function handleViaDataChannel(clientId, request) { for (var pair of request.headers) { headers[pair[0]] = pair[1]; } + // Strip conditional headers — DC responses bypass the browser's HTTP + // cache, so a 304 from the backend produces a null-body response that + // the browser can't match to a cache entry. Force full 200 responses. + delete headers["if-none-match"]; + delete headers["if-modified-since"]; client.postMessage( { @@ -240,6 +320,18 @@ async function handleViaDataChannel(clientId, request) { } } + // Redirect responses: return a proper redirect so the browser follows it. + // The subsequent request will be intercepted by the SW and routed through DC. + if ([301, 302, 303, 307, 308].indexOf(e.data.statusCode) !== -1) { + var location = responseHeaders.get("location"); + if (location) { + try { + resolve(Response.redirect(new URL(location, request.url).href, e.data.statusCode)); + return; + } catch (err) { /* invalid URL, fall through */ } + } + } + if (e.data.streaming) { // Live streaming response (SSE, NDJSON) — return a ReadableStream // so the browser can consume data progressively. @@ -279,26 +371,42 @@ async function handleViaDataChannel(clientId, request) { return; } - var bodyBytes; - if (e.data.binaryBody instanceof ArrayBuffer) { - bodyBytes = new Uint8Array(e.data.binaryBody); - } else { - bodyBytes = Uint8Array.from(atob(e.data.body || ""), function (c) { - return c.charCodeAt(0); - }); - } + // Null-body status codes (204, 304) must not have a body per spec + var nullBodyStatus = e.data.statusCode === 204 || e.data.statusCode === 304; + + try { + var bodyBytes; + if (nullBodyStatus) { + bodyBytes = null; + } else if (e.data.binaryBody instanceof ArrayBuffer) { + bodyBytes = new Uint8Array(e.data.binaryBody); + } else { + bodyBytes = Uint8Array.from(atob(e.data.body || ""), function (c) { + return c.charCodeAt(0); + }); + } + + // Remove content-length — the browser derives it from bodyBytes. + // A mismatched value (from the backend's original HTTP response) can + // cause truncation or rendering failures in SW-constructed Responses. + responseHeaders.delete("content-length"); + responseHeaders.delete("content-encoding"); + + console.log("[SW] DC response:", e.data.statusCode, + "type:", responseHeaders.get("content-type") || "", + "body:", nullBodyStatus ? 0 : bodyBytes.length, "bytes", + "via:", nullBodyStatus ? "null-body" : e.data.binaryBody ? "binary" : "base64"); - console.log("[SW] DC response:", e.data.statusCode, - "body:", bodyBytes.length, "bytes", - "content-range:", responseHeaders.get("content-range"), - "via:", e.data.binaryBody ? "ArrayBuffer" : "base64"); - - resolve( - new Response(bodyBytes, { - status: e.data.statusCode, - headers: responseHeaders, - }) - ); + resolve( + new Response(bodyBytes, { + status: e.data.statusCode, + headers: responseHeaders, + }) + ); + } catch (buildErr) { + console.error("[SW] Failed to build DC response:", buildErr); + resolve(fetch(fallbackRequest)); + } }; }); } catch (e) { diff --git a/bridges/punchd-bridge/gateway/public/js/webrtc-upgrade.js b/bridges/punchd-bridge/gateway/public/js/webrtc-upgrade.js index a10c813..a03ea05 100644 --- a/bridges/punchd-bridge/gateway/public/js/webrtc-upgrade.js +++ b/bridges/punchd-bridge/gateway/public/js/webrtc-upgrade.js @@ -8,6 +8,11 @@ * * Falls back gracefully — if WebRTC fails, HTTP relay continues working. * Automatically reconnects when the DataChannel or signaling drops. + * + * Supports dual DataChannels for high-throughput scenarios (4K video, gaming): + * - "http-tunnel" (control): JSON control messages, small responses + * - "bulk-data" (bulk): binary streaming chunks, binary WebSocket frames + * Falls back to single-channel mode for older gateways. */ (function () { @@ -24,9 +29,31 @@ const RECONNECT_DELAY = 5000; const MAX_RECONNECT_DELAY = 60000; + // Block other Service Worker registrations (e.g. Jellyfin's serviceworker.js) + // that would steal our scope and prevent DataChannel routing. + // Must be done early, before any other code can register a SW. + const _origSWRegister = navigator.serviceWorker + ? navigator.serviceWorker.register.bind(navigator.serviceWorker) + : null; + if (navigator.serviceWorker) { + navigator.serviceWorker.register = function (scriptURL, options) { + var url = new URL(scriptURL, location.href); + if (url.pathname.endsWith("/sw.js")) { + return _origSWRegister(scriptURL, options); + } + console.log("[WebRTC] Blocking conflicting SW registration:", scriptURL); + return navigator.serviceWorker.ready; + }; + } + + // Binary WebSocket fast-path magic byte (must match gateway's BINARY_WS_MAGIC) + const BINARY_WS_MAGIC = 0x02; + let signalingWs = null; let peerConnection = null; - let dataChannel = null; + let dataChannel = null; // Control channel ("http-tunnel") + let bulkChannel = null; // Bulk data channel ("bulk-data") + let bulkEnabled = false; // True after capability handshake confirms bulk support let clientId = "client-" + Math.random().toString(36).slice(2, 10); let pairedGatewayId = null; let config = null; @@ -36,6 +63,8 @@ let reconnectTimer = null; let swRegistered = false; let dcReadySignaled = false; + let capabilityTimer = null; + let gatewayFeatures = []; // Features confirmed by gateway capabilities response // Pending requests waiting for DataChannel responses const pendingRequests = new Map(); @@ -45,6 +74,10 @@ const streamingPorts = new Map(); // Active WebSocket connections tunneled through DataChannel const dcWebSockets = new Map(); + // Early chunks buffer: binary chunks that arrive on the bulk channel + // before their http_response_start message arrives on the control channel. + // (Dual DataChannels have independent ordering — bulk can deliver faster.) + const earlyChunks = new Map(); async function init() { try { @@ -69,6 +102,13 @@ try { dataChannel.onclose = null; dataChannel.onerror = null; dataChannel.close(); } catch {} dataChannel = null; } + if (bulkChannel) { + try { bulkChannel.onclose = null; bulkChannel.onerror = null; bulkChannel.close(); } catch {} + bulkChannel = null; + } + bulkEnabled = false; + gatewayFeatures = []; + if (capabilityTimer) { clearTimeout(capabilityTimer); capabilityTimer = null; } if (peerConnection) { try { peerConnection.onicecandidate = null; peerConnection.onconnectionstatechange = null; peerConnection.close(); } catch {} peerConnection = null; @@ -81,6 +121,7 @@ } pendingRequests.clear(); chunkedResponses.clear(); + earlyChunks.clear(); // End any in-flight streaming responses so SW promises don't hang for (var [id, port] of streamingPorts) { port.postMessage({ type: "end" }); @@ -237,6 +278,16 @@ } } + function sendCapabilities() { + if (dataChannel && dataChannel.readyState === "open") { + dataChannel.send(JSON.stringify({ + type: "capabilities", + version: 2, + features: ["bulk-channel", "binary-ws"], + })); + } + } + function startWebRTC() { if (!pairedGatewayId) return; @@ -264,22 +315,51 @@ iceServers: iceServers.length > 0 ? iceServers : undefined, }); + // --- Control channel: JSON messages, small responses --- dataChannel = peerConnection.createDataChannel("http-tunnel", { ordered: true, }); dataChannel.binaryType = "arraybuffer"; + // --- Bulk channel: binary streaming chunks, binary WS frames --- + bulkChannel = peerConnection.createDataChannel("bulk-data", { + ordered: true, + }); + bulkChannel.binaryType = "arraybuffer"; + dataChannel.onopen = async () => { - console.log("[WebRTC] DataChannel OPEN — direct connection established!"); + console.log("[WebRTC] Control DataChannel OPEN — direct connection established!"); reconnectAttempts = 0; // Reset backoff on success // Refresh session token before DC requests start (token may have expired since page load) await fetchSessionToken(); - // Refresh token every 4 minutes to stay ahead of 5-minute expiry + // Refresh token every 2 minutes — the server proactively refreshes + // tokens within 2 min of expiry, so this ensures we always have a + // fresh token with ~3+ min remaining lifetime. if (tokenRefreshTimer) clearInterval(tokenRefreshTimer); - tokenRefreshTimer = setInterval(fetchSessionToken, 4 * 60 * 1000); + tokenRefreshTimer = setInterval(fetchSessionToken, 2 * 60 * 1000); installWebSocketShim(); await registerServiceWorker(); + // Send capability handshake to negotiate bulk channel + binary WS. + // The gateway also sends capabilities proactively on channel open, + // so we may receive them before we even send — that's fine. + sendCapabilities(); + // Retry once after 2s if no response yet (message could be lost) + capabilityTimer = setTimeout(function () { + capabilityTimer = null; + if (!bulkEnabled) { + console.log("[WebRTC] No capabilities response yet — retrying..."); + sendCapabilities(); + // Final fallback after another 3s + capabilityTimer = setTimeout(function () { + capabilityTimer = null; + if (!bulkEnabled) { + console.log("[WebRTC] Gateway did not respond to capabilities — single-channel mode"); + } + }, 3000); + } + }, 2000); + // Only signal dc_ready if we have a valid session token — without it, // DC requests would 401 and fall back to relay anyway (wasted round-trip) if (!sessionToken) { @@ -300,6 +380,7 @@ signalDcReady(); }; + // --- Control channel message handler --- dataChannel.onmessage = (event) => { // Binary message — could be a streaming chunk OR a JSON control message // sent as binary (to avoid SCTP PPID confusion when interleaving). @@ -320,22 +401,9 @@ return; } - // Binary streaming chunk: 36-byte requestId prefix + raw bytes - if (buf.length < 36) return; - const requestId = new TextDecoder().decode(buf.subarray(0, 36)); - const entry = chunkedResponses.get(requestId); - if (!entry || !entry.streaming) return; - if (entry.live) { - // Live stream (SSE/NDJSON) — forward chunk to SW immediately - const port = streamingPorts.get(requestId); - if (port) { - const chunkBytes = buf.slice(36).buffer; - port.postMessage({ type: "chunk", data: chunkBytes }, [chunkBytes]); - } - } else { - // Finite response (video, etc) — buffer chunk on page side - entry.chunks.push(buf.slice(36)); - } + // Binary streaming chunk on control channel (single-channel fallback): + // 36-byte requestId prefix + raw bytes + handleBinaryChunk(buf); return; } @@ -348,7 +416,125 @@ } }; + // --- Bulk channel message handler --- + bulkChannel.onmessage = function (event) { + if (typeof event.data !== "string") { + var buf = new Uint8Array(event.data); + if (buf.length === 0) return; + + // Binary WS fast-path: [0x02][36-byte WS UUID][payload] + if (buf[0] === BINARY_WS_MAGIC && buf.length >= 37) { + var wsId = new TextDecoder().decode(buf.subarray(1, 37)); + var payload = buf.slice(37); + var ws = dcWebSockets.get(wsId); + if (ws) ws._fireMessageBinary(payload.buffer); + return; + } + + // JSON control message on bulk channel + if (buf[0] === 0x7B) { + try { + var msg = JSON.parse(new TextDecoder().decode(buf)); + handleDcMessage(msg); + } catch (parseErr) { + console.error("[WebRTC] Failed to parse bulk JSON message:", parseErr.message, "len:", buf.length); + } + return; + } + + // Binary streaming chunk: 36-byte requestId prefix + raw bytes + handleBinaryChunk(buf); + } + }; + + bulkChannel.onopen = function () { + console.log("[WebRTC] Bulk channel OPEN"); + // If gateway already confirmed bulk-channel support, enable now + if (gatewayFeatures.indexOf("bulk-channel") !== -1) { + bulkEnabled = true; + console.log("[WebRTC] Bulk channel enabled — dual-channel mode active"); + } + }; + + bulkChannel.onclose = function () { + console.log("[WebRTC] Bulk channel closed"); + bulkChannel = null; + bulkEnabled = false; + }; + + bulkChannel.onerror = function () { + console.log("[WebRTC] Bulk channel error"); + }; + + /** Handle a binary streaming chunk (shared between control and bulk channels). */ + function handleBinaryChunk(buf) { + if (buf.length < 36) return; + var requestId = new TextDecoder().decode(buf.subarray(0, 36)); + var entry = chunkedResponses.get(requestId); + if (!entry || !entry.streaming) { + // Chunk arrived before http_response_start (cross-channel race) — buffer it + var pending = earlyChunks.get(requestId); + if (!pending) { + pending = []; + earlyChunks.set(requestId, pending); + } + pending.push(buf.slice(36)); + if (pending.length % 100 === 0) { + console.log("[WebRTC] Early chunks buffered:", pending.length, "for", requestId); + } + return; + } + if (entry.live) { + // Live stream (SSE/NDJSON) — forward chunk to SW immediately + var port = streamingPorts.get(requestId); + if (port) { + var chunkBytes = buf.slice(36).buffer; + port.postMessage({ type: "chunk", data: chunkBytes }, [chunkBytes]); + } + } else { + // Finite response (video, etc) — buffer chunk on page side + entry.chunks.push(buf.slice(36)); + if (entry.chunks.length % 100 === 0) { + var totalSoFar = entry.chunks.reduce(function (s, c) { return s + c.length; }, 0); + console.log("[WebRTC] Streaming chunks:", entry.chunks.length, "received for", requestId, "(" + totalSoFar + " bytes)"); + } + } + } + function handleDcMessage(msg) { + if (msg.type === "capabilities") { + // Gateway capability response — store features for deferred activation + if (capabilityTimer) { clearTimeout(capabilityTimer); capabilityTimer = null; } + gatewayFeatures = msg.features || []; + console.log("[WebRTC] Gateway capabilities:", gatewayFeatures.join(", ")); + // Enable bulk channel if it's already open + if (gatewayFeatures.indexOf("bulk-channel") !== -1 && bulkChannel && bulkChannel.readyState === "open") { + bulkEnabled = true; + console.log("[WebRTC] Bulk channel enabled — dual-channel mode active"); + } + return; + } + + if (msg.type === "http_response_ack" && msg.id) { + // Gateway acknowledged a streaming response — extend our timeout + // so the full http_response_start (on the potentially congested bulk + // channel) has time to arrive. + var ackPending = pendingRequests.get(msg.id); + if (ackPending) { + clearTimeout(ackPending.timeout); + ackPending.timeout = setTimeout(function () { + console.warn("[WebRTC] Streaming request timed out after ack:", msg.id); + pendingRequests.delete(msg.id); + streamingPorts.delete(msg.id); + ackPending.port.postMessage({ error: "Timeout" }); + }, 300000); // 5 minutes for large streaming responses + // Tell SW to extend its timeout too + ackPending.port.postMessage({ type: "progress" }); + console.log("[WebRTC] Extended timeout for streaming response:", msg.id); + } + return; + } + if (msg.type === "http_response" && msg.id) { // Single buffered response const pending = pendingRequests.get(msg.id); @@ -359,6 +545,7 @@ } else if (msg.type === "http_response_start" && msg.id) { if (msg.streaming) { var isLive = !!msg.live; + console.log("[WebRTC] Streaming start received:", msg.id, "status:", msg.statusCode, "live:", isLive); if (isLive) { // Live stream (SSE, NDJSON) — resolve immediately with ReadableStream. // Client consumes data progressively as it arrives. @@ -374,7 +561,7 @@ } else { // Buffered streaming (video, large files) — extend timeouts since // the full response must be received before we can deliver it. - // A 50MB 4K segment over DataChannel can take minutes. + // Range capping on the gateway keeps each response small (~5MB). var pending = pendingRequests.get(msg.id); if (pending) { clearTimeout(pending.timeout); @@ -388,7 +575,7 @@ } // live=true: forward chunks to SW via ReadableStream // live=false: buffer chunks page-side, deliver complete Response on end - // (Chrome's media pipeline doesn't handle ReadableStream 206 from SW) + // (Chrome's media pipeline can't consume ReadableStream 206 from SW) chunkedResponses.set(msg.id, { streaming: true, live: isLive, @@ -396,6 +583,26 @@ headers: msg.headers, chunks: isLive ? undefined : [], }); + // Apply any chunks that arrived on the bulk channel before this start message + var early = earlyChunks.get(msg.id); + if (early) { + earlyChunks.delete(msg.id); + var createdEntry = chunkedResponses.get(msg.id); + for (var ei = 0; ei < early.length; ei++) { + if (isLive) { + var livePort = streamingPorts.get(msg.id); + if (livePort) { + var earlyBuf = early[ei].buffer; + livePort.postMessage({ type: "chunk", data: earlyBuf }, [earlyBuf]); + } + } else { + createdEntry.chunks.push(early[ei]); + } + } + if (early.length > 0) { + console.log("[WebRTC] Applied " + early.length + " early chunks for " + msg.id); + } + } } else { // Size-chunked reassembly (large buffered responses) chunkedResponses.set(msg.id, { @@ -437,11 +644,12 @@ } } else if (msg.type === "http_response_end" && msg.id) { const entry = chunkedResponses.get(msg.id); + console.log("[WebRTC] Streaming end received:", msg.id, "entry:", entry ? ("chunks=" + (entry.chunks ? entry.chunks.length : "none") + " live=" + entry.live) : "MISSING"); chunkedResponses.delete(msg.id); + earlyChunks.delete(msg.id); // Clean up any orphaned early chunks if (entry && !entry.live && entry.chunks) { - // Buffered finite response — concatenate chunks and encode as base64. - // Uses the same proven path as small responses (body field). + // Buffered finite response — concatenate chunks and deliver. const totalLength = entry.chunks.reduce((sum, c) => sum + c.length, 0); const merged = new Uint8Array(totalLength); let offset = 0; @@ -449,7 +657,7 @@ merged.set(chunk, offset); offset += chunk.length; } - console.log(`[WebRTC] Buffered response complete: ${msg.id} (${totalLength} bytes)`); + console.log(`[WebRTC] Buffered response complete: ${msg.id} (${totalLength} bytes, ${entry.chunks.length} chunks)`); const pending = pendingRequests.get(msg.id); if (pending) { pendingRequests.delete(msg.id); @@ -458,6 +666,8 @@ headers: entry.headers, binaryBody: merged.buffer, }); + } else { + console.warn("[WebRTC] Buffered response complete but no pending request (timeout already fired?):", msg.id); } } else { // Live stream — close the ReadableStream @@ -552,13 +762,40 @@ } async function registerServiceWorker() { - if (swRegistered || !("serviceWorker" in navigator)) { + if (swRegistered || !("serviceWorker" in navigator) || !_origSWRegister) { return; } try { - await navigator.serviceWorker.register("/js/sw.js", { scope: "/", updateViaCache: "none" }); - console.log("[WebRTC] Service Worker registered"); + // Use the backend prefix scope so our SW controls the page. + // Without this, Jellyfin's serviceworker.js at web/ scope would + // take priority and our SW would never see any fetch events. + var swScope = BACKEND_PREFIX ? BACKEND_PREFIX + "/web/" : "/"; + + // Unregister any conflicting SWs: + // - Jellyfin's own serviceworker.js at the same scope + // - Stale registrations of our sw.js at "/" scope (from before scope migration) + var existingRegs = await navigator.serviceWorker.getRegistrations(); + for (var i = 0; i < existingRegs.length; i++) { + var reg = existingRegs[i]; + var regScopePath = new URL(reg.scope).pathname; + var isOurSw = reg.active && reg.active.scriptURL.endsWith("/sw.js"); + // Unregister conflicting SWs at our target scope + if (regScopePath === swScope || regScopePath.startsWith(BACKEND_PREFIX + "/web")) { + if (!isOurSw) { + console.log("[WebRTC] Unregistering conflicting SW:", reg.scope); + await reg.unregister(); + } + } + // Unregister stale sw.js at root scope when we've migrated to a prefix scope + if (isOurSw && regScopePath === "/" && swScope !== "/") { + console.log("[WebRTC] Unregistering stale root-scope SW:", reg.scope); + await reg.unregister(); + } + } + + await _origSWRegister("/js/sw.js", { scope: swScope, updateViaCache: "none" }); + console.log("[WebRTC] Service Worker registered at scope:", swScope); swRegistered = true; // When a new SW takes control mid-session (e.g., after SW update), @@ -600,7 +837,23 @@ console.log("[WebRTC] Signaled dc_ready to Service Worker"); } + // Serialize fetchSessionToken calls — multiple 401 retries must not + // trigger concurrent refresh requests (causes refresh token rotation races). + var _tokenRefreshPromise = null; + async function fetchSessionToken() { + if (_tokenRefreshPromise) { + return _tokenRefreshPromise; + } + _tokenRefreshPromise = _doFetchSessionToken(); + try { + return await _tokenRefreshPromise; + } finally { + _tokenRefreshPromise = null; + } + } + + async function _doFetchSessionToken() { try { const res = await fetch(BACKEND_PREFIX + "/auth/session-token", { headers: { "X-Requested-With": "XMLHttpRequest" }, @@ -624,12 +877,7 @@ } } - function handleSwFetch(request, responsePort) { - if (!dataChannel || dataChannel.readyState !== "open") { - responsePort.postMessage({ error: "DataChannel not open" }); - return; - } - + function sendDcRequest(request, responsePort, isRetry) { const requestId = crypto.randomUUID(); // Inject session cookie that the SW can't read (HttpOnly). @@ -656,6 +904,7 @@ ); var timeout = setTimeout(() => { + console.warn("[WebRTC] DC request timed out (15s initial):", request.method, request.url, "id:", requestId); pendingRequests.delete(requestId); streamingPorts.delete(requestId); responsePort.postMessage({ error: "Timeout" }); @@ -666,6 +915,19 @@ port: responsePort, resolve: (msg) => { clearTimeout(timeout); + // On 401, refresh token and retry once (token may have expired + // between our 4-minute refresh intervals) + if (msg.statusCode === 401 && !isRetry) { + console.log("[WebRTC] Got 401 — refreshing token and retrying"); + fetchSessionToken().then(function () { + if (sessionToken) { + sendDcRequest(request, responsePort, true); + } else { + responsePort.postMessage({ statusCode: 401, headers: msg.headers, body: msg.body }); + } + }); + return; + } if (msg.streaming) { // Live streaming response (SSE, NDJSON) — keep the port open for chunks streamingPorts.set(requestId, responsePort); @@ -692,6 +954,14 @@ }); } + function handleSwFetch(request, responsePort) { + if (!dataChannel || dataChannel.readyState !== "open") { + responsePort.postMessage({ error: "DataChannel not open" }); + return; + } + sendDcRequest(request, responsePort, false); + } + // --- WebSocket shim: tunnels same-origin WS connections through DataChannel --- function bufToBase64(bytes) { @@ -700,6 +970,17 @@ return btoa(binary); } + /** Send a binary WebSocket frame via the bulk channel fast-path. */ + function sendBinaryWsFrame(wsId, payload) { + // [0x02][36-byte UUID][raw payload] + var frame = new Uint8Array(37 + payload.length); + frame[0] = BINARY_WS_MAGIC; + var encoder = new TextEncoder(); + frame.set(encoder.encode(wsId), 1); + frame.set(payload, 37); + bulkChannel.send(frame); + } + class DCWebSocket { constructor(url, protocols) { this._id = crypto.randomUUID(); @@ -759,18 +1040,36 @@ send(data) { if (this.readyState !== 1) throw new DOMException("WebSocket not open", "InvalidStateError"); if (typeof data === "string") { + // Text messages always go via JSON on control channel dataChannel.send(JSON.stringify({ type: "ws_message", id: this._id, data: data, binary: false })); - } else if (data instanceof ArrayBuffer) { - dataChannel.send(JSON.stringify({ type: "ws_message", id: this._id, data: bufToBase64(new Uint8Array(data)), binary: true })); - } else if (ArrayBuffer.isView(data)) { - dataChannel.send(JSON.stringify({ type: "ws_message", id: this._id, data: bufToBase64(new Uint8Array(data.buffer, data.byteOffset, data.byteLength)), binary: true })); - } else if (data instanceof Blob) { - const wsId = this._id; - const ws = this; - data.arrayBuffer().then(function (buf) { - if (ws.readyState !== 1) return; - dataChannel.send(JSON.stringify({ type: "ws_message", id: wsId, data: bufToBase64(new Uint8Array(buf)), binary: true })); - }); + } else if (bulkEnabled && bulkChannel && bulkChannel.readyState === "open") { + // Binary fast-path: send raw binary on bulk channel (no base64/JSON) + if (data instanceof ArrayBuffer) { + sendBinaryWsFrame(this._id, new Uint8Array(data)); + } else if (ArrayBuffer.isView(data)) { + sendBinaryWsFrame(this._id, new Uint8Array(data.buffer, data.byteOffset, data.byteLength)); + } else if (data instanceof Blob) { + var wsId = this._id; + var ws = this; + data.arrayBuffer().then(function (buf) { + if (ws.readyState !== 1 || !bulkChannel || bulkChannel.readyState !== "open") return; + sendBinaryWsFrame(wsId, new Uint8Array(buf)); + }); + } + } else { + // Fallback: JSON+base64 path (single-channel mode or bulk not ready) + if (data instanceof ArrayBuffer) { + dataChannel.send(JSON.stringify({ type: "ws_message", id: this._id, data: bufToBase64(new Uint8Array(data)), binary: true })); + } else if (ArrayBuffer.isView(data)) { + dataChannel.send(JSON.stringify({ type: "ws_message", id: this._id, data: bufToBase64(new Uint8Array(data.buffer, data.byteOffset, data.byteLength)), binary: true })); + } else if (data instanceof Blob) { + const wsId = this._id; + const ws = this; + data.arrayBuffer().then(function (buf) { + if (ws.readyState !== 1) return; + dataChannel.send(JSON.stringify({ type: "ws_message", id: wsId, data: bufToBase64(new Uint8Array(buf)), binary: true })); + }); + } } } @@ -801,6 +1100,12 @@ this._dispatch("message", new MessageEvent("message", { data: payload })); } + /** Receive binary data directly from bulk channel (no base64 decode needed). */ + _fireMessageBinary(arrayBuffer) { + var payload = this.binaryType === "arraybuffer" ? arrayBuffer : new Blob([arrayBuffer]); + this._dispatch("message", new MessageEvent("message", { data: payload })); + } + _fireClose(code, reason) { if (this.readyState === 3) return; this.readyState = 3; diff --git a/bridges/punchd-bridge/gateway/src/proxy/http-proxy.ts b/bridges/punchd-bridge/gateway/src/proxy/http-proxy.ts index b2c1fcb..516aa51 100644 --- a/bridges/punchd-bridge/gateway/src/proxy/http-proxy.ts +++ b/bridges/punchd-bridge/gateway/src/proxy/http-proxy.ts @@ -469,6 +469,59 @@ export function createProxy(options: ProxyOptions): { // Server-side endpoints (token exchange, refresh) always use internal URL const serverEndpoints: OidcEndpoints = getOidcEndpoints(options.tcConfig, tcInternalUrl); const clientId = options.tcConfig.resource; + + // ── Refresh token dedup cache ───────────────────────────────── + // When the access token expires, multiple concurrent requests (manifest.json, + // DC requests, session-token refresh) may all try to use the same refresh + // token simultaneously. TideCloak rotates refresh tokens on use, so the + // second concurrent refresh fails (old token consumed). Fix: deduplicate + // concurrent refreshes and cache the result briefly. + interface RefreshResult { + accessToken: string; + expiresIn: number; + refreshToken?: string; + refreshExpiresIn?: number; + timestamp: number; + } + let lastRefreshResult: RefreshResult | null = null; + let refreshInFlight: Promise | null = null; + + async function deduplicatedRefresh(refreshToken: string): Promise { + // Reuse a recent result (< 60 seconds) — prevents hammering TideCloak + // when multiple requests trigger refresh simultaneously or in quick succession + if (lastRefreshResult && Date.now() - lastRefreshResult.timestamp < 60_000) { + return lastRefreshResult; + } + // If a refresh is already in flight, wait for it + if (refreshInFlight) { + return refreshInFlight; + } + refreshInFlight = (async () => { + try { + const tokens = await refreshAccessToken( + serverEndpoints, + clientId, + refreshToken + ); + const result: RefreshResult = { + accessToken: tokens.access_token, + expiresIn: tokens.expires_in, + refreshToken: tokens.refresh_token, + refreshExpiresIn: tokens.refresh_expires_in, + timestamp: Date.now(), + }; + lastRefreshResult = result; + return result; + } catch (err) { + console.log("[Gateway] Deduplicated refresh failed:", err); + return null; + } finally { + refreshInFlight = null; + } + })(); + return refreshInFlight; + } + const isTls = !!options.tls; _useSecureCookies = isTls; @@ -561,6 +614,7 @@ export function createProxy(options: ProxyOptions): { stunServer: options.iceServers?.[0] ? `stun:${options.iceServers[0].replace("stun:", "")}` : null, + targetGatewayId: options.gatewayId || undefined, }; if (options.turnServer && options.turnSecret) { // Only serve TURN credentials to authenticated users @@ -705,36 +759,34 @@ export function createProxy(options: ProxyOptions): { accessToken = authHeader.slice(7); } } - if (!accessToken) { - res.writeHead(401, { "Content-Type": "application/json" }); - res.end(JSON.stringify({ error: "No session" })); - return; - } - let payload = await options.auth.verifyToken(accessToken); - // If access token expired, try refreshing with refresh token + let payload = accessToken + ? await options.auth.verifyToken(accessToken) + : null; + + // Always refresh when the session-token endpoint is called. + // This endpoint is only called by the client's periodic refresh + // (every 2 min), so it's not excessive. It ensures the client + // always gets a token with full lifetime and the browser's + // gateway_access cookie is renewed (preventing expiry-based 401s + // on non-DC requests like manifest.json). const setCookies: string[] = []; - if (!payload && cookies["gateway_refresh"]) { - try { - const tokens = await refreshAccessToken( - serverEndpoints, - clientId, - cookies["gateway_refresh"] - ); - payload = await options.auth.verifyToken(tokens.access_token); - if (payload) { - accessToken = tokens.access_token; + if (cookies["gateway_refresh"]) { + const refreshResult = await deduplicatedRefresh(cookies["gateway_refresh"]); + if (refreshResult) { + const refreshedPayload = await options.auth.verifyToken(refreshResult.accessToken); + if (refreshedPayload) { + payload = refreshedPayload; + accessToken = refreshResult.accessToken; setCookies.push( - buildCookieHeader("gateway_access", tokens.access_token, tokens.expires_in) + buildCookieHeader("gateway_access", refreshResult.accessToken, refreshResult.expiresIn) ); - if (tokens.refresh_token) { + if (refreshResult.refreshToken) { setCookies.push( - buildCookieHeader("gateway_refresh", tokens.refresh_token, tokens.refresh_expires_in || 1800, "Strict") + buildCookieHeader("gateway_refresh", refreshResult.refreshToken, refreshResult.refreshExpiresIn || 1800, "Strict") ); } } - } catch (err) { - console.log("[Gateway] Session token refresh failed:", err); } } @@ -982,46 +1034,41 @@ export function createProxy(options: ProxyOptions): { // If access token expired, try refreshing with refresh token if (!payload && cookies["gateway_refresh"]) { - try { - const tokens = await refreshAccessToken( - serverEndpoints, - clientId, - cookies["gateway_refresh"] - ); - - payload = await options.auth.verifyToken(tokens.access_token); - + const refreshResult = await deduplicatedRefresh(cookies["gateway_refresh"]); + if (refreshResult) { + payload = await options.auth.verifyToken(refreshResult.accessToken); if (payload) { - // Set updated cookies on the response - token = tokens.access_token; + token = refreshResult.accessToken; const refreshCookies: string[] = [ buildCookieHeader( "gateway_access", - tokens.access_token, - tokens.expires_in + refreshResult.accessToken, + refreshResult.expiresIn ), ]; - if (tokens.refresh_token) { + if (refreshResult.refreshToken) { refreshCookies.push( buildCookieHeader( "gateway_refresh", - tokens.refresh_token, - tokens.refresh_expires_in || 1800, + refreshResult.refreshToken, + refreshResult.refreshExpiresIn || 1800, "Strict" ) ); } - // Store cookies to set on the proxied response (res as any).__refreshCookies = refreshCookies; } - } catch (err) { - console.log("[Gateway] Token refresh failed:", err); } } // No valid token — redirect browser or 401 for API if (!payload) { stats.rejectedRequests++; + // Diagnostic: log why auth failed for DC requests + if (req.headers["x-dc-request"]) { + const tokenSnippet = token ? `${token.slice(0, 20)}...` : "null"; + console.log(`[Gateway] DC auth failed: url=${url} token=${tokenSnippet} hasRefreshCookie=${!!cookies["gateway_refresh"]}`); + } if (isBrowserRequest(req)) { const fullUrl = backendPrefix + url; @@ -1209,7 +1256,7 @@ export function createProxy(options: ProxyOptions): { // /__b/ prefix prepended automatically. // Gateway-internal paths (/auth/*, /js/*, /realms/*, etc.) are // skipped — they work without the prefix. - // Escape backendPrefix for safe JS string interpolation (prevents XSS if name contains quotes/backslashes) + // Escape backendPrefix for safe JS string interpolation (prevents XSS if name contains quotes/backslashes). const safePrefix = backendPrefix.replace(/\\/g, "\\\\").replace(/"/g, '\\"').replace(/(function(){` + `var P="${safePrefix}";` + @@ -1232,6 +1279,26 @@ export function createProxy(options: ProxyOptions): { `var SA=Element.prototype.setAttribute;Element.prototype.setAttribute=function(a,v){` + `if((a==="src"||a==="href")&&typeof v==="string"&&n(v))v=P+v;` + `return SA.call(this,a,v)};` + + // Fix CSS url() breakage when /__b/ contains apostrophes or spaces: + // strip quotes from url('…') / url("…") and percent-encode chars that + // are invalid in unquoted CSS url() (spaces, quotes, parens, tabs). + // Uses a Proxy on HTMLElement.style for reliable interception. + `function q(v){if(typeof v!=="string"||v.indexOf("url(")===-1)return v;` + + `return v.replace(/url\\(([^)]*)\\)/g,function(m,i){` + + `var u=i.trim();` + + `if(u.length>1&&(u[0]==="'"||u[0]==='"')&&u[u.length-1]===u[0])u=u.slice(1,-1);` + + `return"url("+u.replace(/ /g,"%20").replace(/'/g,"%27").replace(/"/g,"%22").replace(/\\t/g,"%09")+")"` + + `})}` + + `var _sd=Object.getOwnPropertyDescriptor(HTMLElement.prototype,"style");` + + `if(_sd&&_sd.get){var _wm=new WeakMap();Object.defineProperty(HTMLElement.prototype,"style",{` + + `get:function(){var r=_sd.get.call(this),p=_wm.get(r);if(!p){p=new Proxy(r,{` + + `set:function(t,k,v){t[k]=q(v);return true},` + + `get:function(t,k){var v=t[k];if(typeof v!=="function")return v;` + + `if(k==="setProperty")return function(){if(arguments.length>1)arguments[1]=q(arguments[1]);return t.setProperty.apply(t,arguments)};` + + `return v.bind(t)}` + + `});_wm.set(r,p)}return p},` + + `set:_sd.set?function(v){_sd.set.call(this,q(v))}:void 0,` + + `configurable:true})}` + `})()`; if (html.includes("")) { html = html.replace("", `${patchScript}`); diff --git a/bridges/punchd-bridge/gateway/src/webrtc/peer-handler.ts b/bridges/punchd-bridge/gateway/src/webrtc/peer-handler.ts index bd9e73c..4513cf1 100644 --- a/bridges/punchd-bridge/gateway/src/webrtc/peer-handler.ts +++ b/bridges/punchd-bridge/gateway/src/webrtc/peer-handler.ts @@ -6,10 +6,15 @@ * the gateway creates a PeerConnection, establishes a DataChannel, * and tunnels HTTP requests/responses over it — same format * as the WebSocket-based HTTP relay. + * + * Supports dual DataChannels for high-throughput scenarios (4K video, gaming): + * - "http-tunnel" (control): JSON control messages, small responses + * - "bulk-data" (bulk): binary streaming chunks, binary WebSocket frames + * Falls back to single-channel mode for older clients. */ import { createHmac } from "crypto"; -import { PeerConnection, DataChannel } from "node-datachannel"; +import { PeerConnection, DataChannel, setSctpSettings } from "node-datachannel"; import { request as httpRequest } from "http"; import { request as httpsRequest } from "https"; import WebSocket from "ws"; @@ -40,8 +45,140 @@ export interface PeerHandler { const MAX_PEERS = 200; const ALLOWED_METHODS = new Set(["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"]); +// Buffer thresholds — separate for control (small JSON) and bulk (streaming data) +const CONTROL_MAX_BUFFER = 512_000; // 512KB for control channel +const BULK_MAX_BUFFER = 4_194_304; // 4MB for bulk channel — keeps pipe full for 4K video + +// Chunk coalescing: batch small HTTP response chunks into larger DC messages +const COALESCE_TARGET = 65_536; // 64KB target coalesced message size +const COALESCE_TIMEOUT = 1; // 1ms max coalescing delay + +// Binary WebSocket fast-path magic byte (avoids JSON+base64 overhead for gaming) +const BINARY_WS_MAGIC = 0x02; + +// Tune SCTP buffers for high-throughput streaming +let sctpConfigured = false; +function ensureSctpSettings(): void { + if (sctpConfigured) return; + sctpConfigured = true; + try { + setSctpSettings({ + sendBufferSize: 8 * 1024 * 1024, // 8MB send buffer + recvBufferSize: 8 * 1024 * 1024, // 8MB receive buffer + maxChunksOnQueue: 65536, // up from default 8192 + initialCongestionWindow: 32, // faster ramp-up + }); + } catch { + // setSctpSettings may fail if PeerConnections already exist + } +} + +/** Per-peer state shared between control and bulk channels. */ +interface PeerState { + wsConnections: Map; + capabilities: Set; + controlDc: DataChannel | null; + bulkDc: DataChannel | null; + controlQueue: Buffer[]; + bulkQueue: Buffer[]; + controlPaused: boolean; + bulkPaused: boolean; + pausedStreams: Set; +} + export function createPeerHandler(options: PeerHandlerOptions): PeerHandler { + ensureSctpSettings(); + const peers = new Map(); + const peerStates = new Map(); + + function getPeerState(clientId: string): PeerState { + let state = peerStates.get(clientId); + if (!state) { + state = { + wsConnections: new Map(), + capabilities: new Set(), + controlDc: null, + bulkDc: null, + controlQueue: [], + bulkQueue: [], + controlPaused: false, + bulkPaused: false, + pausedStreams: new Set(), + }; + peerStates.set(clientId, state); + } + return state; + } + + // --- Shared send-queue helpers with event-driven flow control --- + + function setupFlowControl(dc: DataChannel, queue: Buffer[], maxBuffer: number, getPaused: () => boolean, setPaused: (v: boolean) => void, state: PeerState): void { + dc.setBufferedAmountLowThreshold(maxBuffer / 4); + dc.onBufferedAmountLow(() => { + if (getPaused()) { + setPaused(false); + drainQueue(dc, queue, maxBuffer, getPaused, setPaused, state); + } + }); + } + + function drainQueue(dc: DataChannel, queue: Buffer[], maxBuffer: number, getPaused: () => boolean, setPaused: (v: boolean) => void, state: PeerState): void { + while (queue.length > 0) { + if (!dc.isOpen()) return; + if (dc.bufferedAmount() > maxBuffer) { + setPaused(true); + // Pause all in-flight HTTP response streams + for (const stream of state.pausedStreams) { + stream.pause(); + } + return; + } + try { + const sent = dc.sendMessageBinary(queue[0]); + if (!sent) { + setPaused(true); + return; + } + } catch { + return; + } + queue.shift(); + } + // Queue drained — resume any paused streams + for (const stream of state.pausedStreams) { + stream.resume(); + } + } + + function enqueueControl(state: PeerState, buf: Buffer): void { + const dc = state.controlDc; + if (!dc || !dc.isOpen()) return; + state.controlQueue.push(buf); + if (!state.controlPaused) { + drainQueue(dc, state.controlQueue, CONTROL_MAX_BUFFER, + () => state.controlPaused, (v) => { state.controlPaused = v; }, state); + } + } + + function enqueueBulk(state: PeerState, buf: Buffer): void { + // Use bulk channel if available, otherwise fall back to control + const dc = state.bulkDc && state.bulkDc.isOpen() ? state.bulkDc : state.controlDc; + if (!dc || !dc.isOpen()) return; + + if (dc === state.bulkDc) { + state.bulkQueue.push(buf); + if (!state.bulkPaused) { + drainQueue(dc, state.bulkQueue, BULK_MAX_BUFFER, + () => state.bulkPaused, (v) => { state.bulkPaused = v; }, state); + } + } else { + // Fallback to control channel (single-channel mode) + enqueueControl(state, buf); + } + } + + // --- Channel setup --- function handleSdpOffer(clientId: string, sdp: string): void { // Clean up existing peer if reconnecting @@ -49,6 +186,7 @@ export function createPeerHandler(options: PeerHandlerOptions): PeerHandler { if (existing) { existing.close(); peers.delete(clientId); + peerStates.delete(clientId); } // Reject new peers if at capacity (reconnects already cleaned up above) @@ -123,45 +261,136 @@ export function createPeerHandler(options: PeerHandlerOptions): PeerHandler { } if (state === "closed" || state === "failed") { peers.delete(clientId); + peerStates.delete(clientId); } }); pc.onDataChannel((dc) => { - console.log(`[WebRTC] DataChannel opened with client: ${clientId} (label: ${dc.getLabel()})`); - const wsConnections = new Map(); + const label = dc.getLabel(); + console.log(`[WebRTC] DataChannel opened with client: ${clientId} (label: ${label})`); + + if (label === "http-tunnel") { + setupControlChannel(dc, clientId); + } else if (label === "bulk-data") { + setupBulkChannel(dc, clientId); + } else { + console.warn(`[WebRTC] Unknown DataChannel label: ${label}, treating as control`); + setupControlChannel(dc, clientId); + } + }); - dc.onMessage((msg) => { - try { - const parsed = JSON.parse(typeof msg === "string" ? msg : msg.toString()); - if (parsed.type === "http_request") { - handleDataChannelRequest(dc, parsed); - } else if (parsed.type === "ws_open") { - handleWsOpen(dc, parsed, wsConnections); - } else if (parsed.type === "ws_message") { - const ws = wsConnections.get(parsed.id); - if (ws && ws.readyState === WebSocket.OPEN) { - ws.send(parsed.binary ? Buffer.from(parsed.data, "base64") : parsed.data); - } - } else if (parsed.type === "ws_close") { - const ws = wsConnections.get(parsed.id); - if (ws) ws.close(parsed.code || 1000, parsed.reason || ""); + pc.setRemoteDescription(sdp, "offer"); + peers.set(clientId, pc); + } + + const GATEWAY_FEATURES = ["bulk-channel", "binary-ws"]; + + function sendCapabilities(state: PeerState): void { + enqueueControl(state, Buffer.from(JSON.stringify({ + type: "capabilities", + version: 2, + features: GATEWAY_FEATURES, + }))); + } + + function setupControlChannel(dc: DataChannel, clientId: string): void { + const state = getPeerState(clientId); + state.controlDc = dc; + + setupFlowControl(dc, state.controlQueue, CONTROL_MAX_BUFFER, + () => state.controlPaused, (v) => { state.controlPaused = v; }, state); + + // Send capabilities proactively as soon as the channel is open — + // don't wait for the client to ask (message could be lost or delayed). + if (dc.isOpen()) { + console.log(`[WebRTC] Sending proactive capabilities to ${clientId}`); + sendCapabilities(state); + } + dc.onOpen(() => { + console.log(`[WebRTC] Control channel fully open for ${clientId}, sending capabilities`); + sendCapabilities(state); + }); + + dc.onMessage((msg) => { + try { + const parsed = JSON.parse(typeof msg === "string" ? msg : Buffer.from(msg as ArrayBuffer).toString()); + if (parsed.type === "http_request") { + handleDataChannelRequest(state, parsed); + } else if (parsed.type === "ws_open") { + handleWsOpen(state, parsed); + } else if (parsed.type === "ws_message") { + const ws = state.wsConnections.get(parsed.id); + if (ws && ws.readyState === WebSocket.OPEN) { + ws.send(parsed.binary ? Buffer.from(parsed.data, "base64") : parsed.data); } - } catch { - console.error("[WebRTC] Failed to parse DataChannel message"); + } else if (parsed.type === "ws_close") { + const ws = state.wsConnections.get(parsed.id); + if (ws) ws.close(parsed.code || 1000, parsed.reason || ""); + } else if (parsed.type === "capabilities") { + // Client capability handshake — respond with our supported features + const clientFeatures: string[] = parsed.features || []; + for (const f of clientFeatures) { + if (GATEWAY_FEATURES.includes(f)) state.capabilities.add(f); + } + console.log(`[WebRTC] Client ${clientId} capabilities: ${[...state.capabilities].join(", ")}`); + // Reply (client may have missed the proactive announcement) + sendCapabilities(state); } - }); + } catch { + console.error("[WebRTC] Failed to parse DataChannel message"); + } + }); + + dc.onClosed(() => { + console.log(`[WebRTC] Control channel closed with client: ${clientId}`); + for (const [, ws] of state.wsConnections) { + try { ws.close(); } catch {} + } + state.wsConnections.clear(); + state.controlDc = null; + }); + } + + function setupBulkChannel(dc: DataChannel, clientId: string): void { + const state = getPeerState(clientId); + state.bulkDc = dc; + + setupFlowControl(dc, state.bulkQueue, BULK_MAX_BUFFER, + () => state.bulkPaused, (v) => { state.bulkPaused = v; }, state); + + dc.onMessage((msg) => { + const buf = Buffer.isBuffer(msg) ? msg : Buffer.from(msg as ArrayBuffer); + if (buf.length < 1) return; - dc.onClosed(() => { - console.log(`[WebRTC] DataChannel closed with client: ${clientId}`); - for (const [, ws] of wsConnections) { - try { ws.close(); } catch {} + // Binary WS fast-path: [0x02][36-byte WS UUID][payload] + if (buf[0] === BINARY_WS_MAGIC && buf.length >= 37) { + const wsId = buf.toString("ascii", 1, 37); + const payload = buf.subarray(37); + const ws = state.wsConnections.get(wsId); + if (ws && ws.readyState === WebSocket.OPEN) { + ws.send(payload); } - wsConnections.clear(); - }); + return; + } + + // Other binary messages on bulk channel (shouldn't happen but handle gracefully) + try { + const parsed = JSON.parse(buf.toString()); + if (parsed.type === "ws_message") { + const ws = state.wsConnections.get(parsed.id); + if (ws && ws.readyState === WebSocket.OPEN) { + ws.send(parsed.binary ? Buffer.from(parsed.data, "base64") : parsed.data); + } + } + } catch { + // Not JSON, ignore + } }); - pc.setRemoteDescription(sdp, "offer"); - peers.set(clientId, pc); + dc.onClosed(() => { + console.log(`[WebRTC] Bulk channel closed with client: ${clientId}`); + state.bulkDc = null; + }); } function handleCandidate(clientId: string, candidate: string, mid: string): void { @@ -180,9 +409,22 @@ export function createPeerHandler(options: PeerHandlerOptions): PeerHandler { || (ct.includes("text/plain") && res.headers["transfer-encoding"] === "chunked"); } - // Responses smaller than this are sent as a single DC message; - // larger responses are streamed progressively. - const MAX_SINGLE_MSG = 200_000; + /** Binary content types that should always stream (not buffer + base64) */ + function isBinaryContent(res: import("http").IncomingMessage): boolean { + const ct = (res.headers["content-type"] || "").toLowerCase(); + return ct.startsWith("image/") + || ct.startsWith("video/") + || ct.startsWith("audio/") + || ct.startsWith("font/") + || ct.includes("application/octet-stream") + || ct.includes("application/wasm") + || ct.includes("application/zip") + || ct.includes("application/pdf"); + } + + // Responses smaller than this are sent as a single DC message (base64 on control); + // larger responses are streamed progressively as binary via bulk channel. + const MAX_SINGLE_MSG = 32_000; // 32KB — API JSON fits; images stream /** * Handle an HTTP request received over DataChannel. @@ -191,7 +433,7 @@ export function createPeerHandler(options: PeerHandlerOptions): PeerHandler { * are forwarded progressively as data arrives from the backend. */ function handleDataChannelRequest( - dc: DataChannel, + state: PeerState, msg: { id: string; method?: string; url?: string; headers?: Record; body?: string } ): void { const requestId = msg.id; @@ -202,29 +444,25 @@ export function createPeerHandler(options: PeerHandlerOptions): PeerHandler { // Validate URL path — must start with / and contain no CRLF (header injection) if (!url.startsWith("/") || /[\r\n]/.test(url)) { - if (dc.isOpen()) { - dc.sendMessageBinary(Buffer.from(JSON.stringify({ - type: "http_response", - id: requestId, - statusCode: 400, - headers: { "content-type": "application/json" }, - body: Buffer.from(JSON.stringify({ error: "Invalid URL" })).toString("base64"), - }))); - } + enqueueControl(state, Buffer.from(JSON.stringify({ + type: "http_response", + id: requestId, + statusCode: 400, + headers: { "content-type": "application/json" }, + body: Buffer.from(JSON.stringify({ error: "Invalid URL" })).toString("base64"), + }))); return; } // Validate HTTP method if (!ALLOWED_METHODS.has(method.toUpperCase())) { - if (dc.isOpen()) { - dc.sendMessageBinary(Buffer.from(JSON.stringify({ - type: "http_response", - id: requestId, - statusCode: 405, - headers: { "content-type": "application/json" }, - body: Buffer.from(JSON.stringify({ error: "Method not allowed" })).toString("base64"), - }))); - } + enqueueControl(state, Buffer.from(JSON.stringify({ + type: "http_response", + id: requestId, + statusCode: 405, + headers: { "content-type": "application/json" }, + body: Buffer.from(JSON.stringify({ error: "Method not allowed" })).toString("base64"), + }))); return; } @@ -233,15 +471,13 @@ export function createPeerHandler(options: PeerHandlerOptions): PeerHandler { // Limit decoded body size to 10MB to prevent OOM const MAX_BODY_SIZE = 10 * 1024 * 1024; if (bodyB64 && bodyB64.length > MAX_BODY_SIZE * 1.37) { - if (dc.isOpen()) { - dc.sendMessageBinary(Buffer.from(JSON.stringify({ - type: "http_response", - id: requestId, - statusCode: 413, - headers: { "content-type": "application/json" }, - body: Buffer.from(JSON.stringify({ error: "Request body too large" })).toString("base64"), - }))); - } + enqueueControl(state, Buffer.from(JSON.stringify({ + type: "http_response", + id: requestId, + statusCode: 413, + headers: { "content-type": "application/json" }, + body: Buffer.from(JSON.stringify({ error: "Request body too large" })).toString("base64"), + }))); return; } const bodyBuf = bodyB64 ? Buffer.from(bodyB64, "base64") : undefined; @@ -251,6 +487,30 @@ export function createPeerHandler(options: PeerHandlerOptions): PeerHandler { // Don't forward accept-encoding — we send raw bytes over DC, compression // breaks Content-Range offsets and confuses browser media pipelines. delete (headers as Record)["accept-encoding"]; + // Strip conditional headers — DC responses bypass the browser's HTTP + // cache, so 304 responses produce null-body Responses in the Service + // Worker that the browser can't match to a cache entry. + delete (headers as Record)["if-none-match"]; + delete (headers as Record)["if-modified-since"]; + + // Cap Range request size for DataChannel responses. + // Non-live responses (video) are buffered on the client before delivery + // (Chrome's