diff --git a/src/core/drive/streaming_body_renderer.js b/src/core/drive/streaming_body_renderer.js new file mode 100644 index 000000000..1b2532f9f --- /dev/null +++ b/src/core/drive/streaming_body_renderer.js @@ -0,0 +1,79 @@ +import { activateScriptElement, dispatch, parseHTMLDocument } from "../../util" + +/** + * Progressively renders HTML chunks from a streamed response into a target container. + * Used when the server sends X-Turbo-Stream-Body: true and chunks delimited by . + */ +export class StreamingBodyRenderer { + constructor(fetchResponse, visit) { + this.fetchResponse = fetchResponse + this.visit = visit + } + + async render() { + const target = this.resolveTarget() + if (!target) return + + let isFirstChunk = true + + for await (const chunk of this.fetchResponse.streamBodyChunks()) { + const fragment = this.parseChunk(chunk) + + if (isFirstChunk) { + this.injectFirstChunk(target, fragment) + isFirstChunk = false + } else { + this.appendChunk(target, fragment) + } + } + } + + resolveTarget() { + const selector = this.fetchResponse.streamTarget + const element = document.querySelector(selector) + return element || document.body + } + + parseChunk(html) { + const trimmed = html.trim() + const isFullDocument = trimmed.startsWith("${trimmed}`) + } + + injectFirstChunk(target, doc) { + if (doc.head?.childNodes?.length) { + for (const node of [...doc.head.childNodes]) { + document.head.appendChild(document.adoptNode(node)) + } + } + target.replaceChildren() + if (doc.body) { + while (doc.body.firstChild) { + target.appendChild(document.adoptNode(doc.body.firstChild)) + } + } + } + + appendChunk(target, doc) { + const container = doc.body || doc.documentElement + this.appendFragment(target, container) + } + + appendFragment(target, container) { + const event = dispatch("turbo:before-stream-render", { + target: document.documentElement, + cancelable: true, + detail: { target, fragment: container } + }) + if (event.defaultPrevented) return + + while (container.firstChild) { + const node = document.adoptNode(container.firstChild) + if (node.tagName === "SCRIPT") { + target.appendChild(activateScriptElement(node)) + } else { + target.appendChild(node) + } + } + } +} diff --git a/src/core/drive/visit.js b/src/core/drive/visit.js index 092b73b01..81c0be0e7 100644 --- a/src/core/drive/visit.js +++ b/src/core/drive/visit.js @@ -3,6 +3,7 @@ import { getAnchor } from "../url" import { PageSnapshot } from "./page_snapshot" import { getHistoryMethodForAction, uuid } from "../../util" import { StreamMessage } from "../streams/stream_message" +import { StreamingBodyRenderer } from "./streaming_body_renderer" import { ViewTransitioner } from "./view_transitioner" const defaultOptions = { @@ -202,7 +203,13 @@ export class Visit { loadResponse() { if (this.response) { - const { statusCode, responseHTML } = this.response + const { statusCode, responseHTML, streamBody, fetchResponse } = this.response + + if (streamBody && fetchResponse) { + this.loadStreamingResponse(fetchResponse) + return + } + this.render(async () => { if (this.shouldCacheSnapshot) this.cacheSnapshot() if (this.view.renderPromise) await this.view.renderPromise @@ -222,6 +229,20 @@ export class Visit { } } + async loadStreamingResponse(fetchResponse) { + this.render(async () => { + if (this.view.renderPromise) await this.view.renderPromise + + this.changeHistory() + + const renderer = new StreamingBodyRenderer(fetchResponse, this) + await renderer.render() + + this.adapter.visitRendered(this) + this.complete() + }) + } + getCachedSnapshot() { const snapshot = this.view.getCachedSnapshotForLocation(this.location) || this.getPreloadedSnapshot() @@ -291,8 +312,15 @@ export class Visit { requestPreventedHandlingResponse(_request, _response) {} async requestSucceededWithResponse(request, response) { - const responseHTML = await response.responseHTML const { redirected, statusCode } = response + + if (response.isStreamBody && response.isHTML) { + this.redirectedToLocation = response.redirected ? response.location : undefined + this.recordResponse({ statusCode, redirected, streamBody: true, fetchResponse: response }) + return + } + + const responseHTML = await response.responseHTML if (responseHTML == undefined) { this.recordResponse({ statusCode: SystemStatusCode.contentTypeMismatch, diff --git a/src/http/fetch_response.js b/src/http/fetch_response.js index 3628d2e8f..f19670a16 100644 --- a/src/http/fetch_response.js +++ b/src/http/fetch_response.js @@ -1,5 +1,7 @@ import { expandURL } from "../core/url" +const TURBO_CHUNK_DELIMITER = "" + export class FetchResponse { constructor(response) { this.response = response @@ -41,6 +43,21 @@ export class FetchResponse { return this.header("Content-Type") } + /** + * When true, the server requests progressive rendering. Turbo will stream the response + * body instead of buffering it, and inject chunks delimited by . + */ + get isStreamBody() { + return this.header("X-Turbo-Stream-Body") === "true" + } + + /** + * Target selector for streaming chunks. Defaults to "main" or "body" if not specified. + */ + get streamTarget() { + return this.header("X-Turbo-Stream-Target") || "main" + } + get responseText() { return this.response.clone().text() } @@ -56,4 +73,36 @@ export class FetchResponse { header(name) { return this.response.headers.get(name) } + + /** + * Async generator that yields complete HTML chunks between delimiters. + * Only use when isStreamBody is true — do not call responseHTML after this. + */ + async *streamBodyChunks() { + if (!this.response.body) return + + const reader = this.response.body.getReader() + const decoder = new TextDecoder() + let buffer = "" + + try { + while (true) { + const { done, value } = await reader.read() + if (done) break + + buffer += decoder.decode(value, { stream: true }) + + while (buffer.includes(TURBO_CHUNK_DELIMITER)) { + const [chunk, rest] = buffer.split(TURBO_CHUNK_DELIMITER, 2) + buffer = rest || "" + const trimmed = chunk.trim() + if (trimmed) yield trimmed + } + } + + if (buffer.trim()) yield buffer.trim() + } finally { + reader.releaseLock() + } + } } diff --git a/src/tests/fixtures/drive.html b/src/tests/fixtures/drive.html index 7499a06b0..fe3fb84d9 100644 --- a/src/tests/fixtures/drive.html +++ b/src/tests/fixtures/drive.html @@ -17,5 +17,9 @@

Drive

Drive disabled link
+ +
+ Streaming body link +
diff --git a/src/tests/functional/drive_tests.js b/src/tests/functional/drive_tests.js index 2e6aa180d..7982a1dcf 100644 --- a/src/tests/functional/drive_tests.js +++ b/src/tests/functional/drive_tests.js @@ -30,3 +30,15 @@ test("drive enabled by default; click link inside data-turbo='false'", async ({ await expect(page).toHaveURL(withPathname(path)) expect(await visitAction(page)).toEqual("load") }) + +test("progressively renders streamed response when X-Turbo-Stream-Body is true (fixes #1517)", async ({ + page +}) => { + await page.click("#drive_stream_body") + + await expect(page).toHaveURL(withPathname("/__turbo/stream-body")) + await expect(page.locator("h1")).toHaveText("Streaming Page") + await expect(page.locator("#chunk-1")).toHaveText("First chunk loaded") + await expect(page.locator("#chunk-2")).toHaveText("Second chunk loaded") + await expect(page.locator("#chunk-3")).toHaveText("Third chunk loaded") +}) diff --git a/src/tests/server.mjs b/src/tests/server.mjs index 1978ca601..565bab540 100644 --- a/src/tests/server.mjs +++ b/src/tests/server.mjs @@ -142,6 +142,24 @@ router.get("/stream-response", (request, response) => { } }) +router.get("/stream-body", (request, response) => { + response.set({ + "Content-Type": "text/html; charset=utf-8", + "X-Turbo-Stream-Body": "true" + }) + + const chunk1 = "

Streaming Page

First chunk loaded
" + const chunk2 = "
Second chunk loaded
" + const chunk3 = "
Third chunk loaded
" + + response.write(chunk1) + response.write("") + response.write(chunk2) + response.write("") + response.write(chunk3) + response.end() +}) + router.put("/messages/:id", (request, response) => { const { content, type } = request.body const { id } = request.params