From 283cbf739547e128b36532302ea346fd8bd9f6ab Mon Sep 17 00:00:00 2001 From: Victor Adossi Date: Sun, 15 Feb 2026 21:16:56 +0900 Subject: [PATCH 1/7] test(jco): uncomment s32 stream test --- packages/jco/test/p3/stream.js | 11 +++++------ packages/jco/types/api.d.ts | 14 +++++++------- packages/jco/types/api.d.ts.map | 2 +- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/packages/jco/test/p3/stream.js b/packages/jco/test/p3/stream.js index f9da2b5c3..f229d1ee6 100644 --- a/packages/jco/test/p3/stream.js +++ b/packages/jco/test/p3/stream.js @@ -45,12 +45,11 @@ suite("Stream (WASI P3)", () => { // TODO(tests): we should check that reading with no values remaining blocks? // TODO(tests): we should check that reading when writer is closed throws error? - // TODO(fix): broken stream leftover task - // vals = [-11,-22,-33]; - // stream = await instance['jco:test-components/get-stream-async'].getStreamS32(vals); - // assert.equal(vals[0], await stream.next()); - // assert.equal(vals[1], await stream.next()); - // assert.equal(vals[2], await stream.next()); + vals = [-11, -22, -33]; + stream = await instance["jco:test-components/get-stream-async"].getStreamS32(vals); + assert.equal(vals[0], await stream.next()); + assert.equal(vals[1], await stream.next()); + assert.equal(vals[2], await stream.next()); await cleanup(); }); diff --git a/packages/jco/types/api.d.ts b/packages/jco/types/api.d.ts index 6c9fce0a8..6bef5ad12 100644 --- a/packages/jco/types/api.d.ts +++ b/packages/jco/types/api.d.ts @@ -2,39 +2,39 @@ * @param {Parameters[0]} binary * @return {Promise>} */ -export function print(binary: Parameters[0]): Promise>; +export function print(binary: Parameters[0]): Promise>; /** * @param {Parameters[0]} wat * @return {Promise>} */ -export function parse(wat: Parameters[0]): Promise>; +export function parse(wat: Parameters[0]): Promise>; /** * @param {Parameters[0]} binary * @return {Promise>} */ -export function componentWit(binary: Parameters[0]): Promise>; +export function componentWit(binary: Parameters[0]): Promise>; /** * @param {Parameters[0]} binary * @param {Parameters[1]} adapters * @return {Promise>} */ -export function componentNew(binary: Parameters[0], adapters: Parameters[1]): Promise>; +export function componentNew(binary: Parameters[0], adapters: Parameters[1]): Promise>; /** * @param {Parameters[0]} embedOpts * @return {Promise>} */ -export function componentEmbed(embedOpts: Parameters[0]): Promise>; +export function componentEmbed(embedOpts: Parameters[0]): Promise>; /** * @param {Parameters[0]} binary * @param {Parameters[1]} metadata * @return {Promise>} */ -export function metadataAdd(binary: Parameters[0], metadata: Parameters[1]): Promise>; +export function metadataAdd(binary: Parameters[0], metadata: Parameters[1]): Promise>; /** * @param {Parameters[0]} binary * @return {Promise>} */ -export function metadataShow(binary: Parameters[0]): Promise>; +export function metadataShow(binary: Parameters[0]): Promise>; export function preview1AdapterCommandPath(): URL; export function preview1AdapterReactorPath(): URL; export { optimizeComponent as opt } from "./cmd/opt.js"; diff --git a/packages/jco/types/api.d.ts.map b/packages/jco/types/api.d.ts.map index 9cc909852..0f0967e78 100644 --- a/packages/jco/types/api.d.ts.map +++ b/packages/jco/types/api.d.ts.map @@ -1 +1 @@ -{"version":3,"file":"api.d.ts","sourceRoot":"","sources":["../src/api.js"],"names":[],"mappings":"AAaA;;;GAGG;AACH,8BAHW,UAAU,CAAC,GAAoC,CAAC,CAAC,CAAC,CAAC,GAClD,OAAO,CAAC,UAAU,CAAC,GAAoC,CAAC,CAAC,CAKpE;AACD;;;GAGG;AACH,2BAHW,UAAU,CAAC,GAAoC,CAAC,CAAC,CAAC,CAAC,GAClD,OAAO,CAAC,UAAU,CAAC,GAAoC,CAAC,CAAC,CAKpE;AACD;;;GAGG;AACH,qCAHW,UAAU,CAAC,GAA2C,CAAC,CAAC,CAAC,CAAC,GACzD,OAAO,CAAC,UAAU,CAAC,GAA2C,CAAC,CAAC,CAK3E;AACD;;;;GAIG;AACH,qCAJW,UAAU,CAAC,GAA2C,CAAC,CAAC,CAAC,CAAC,YAC1D,UAAU,CAAC,GAA2C,CAAC,CAAC,CAAC,CAAC,GACzD,OAAO,CAAC,UAAU,CAAC,GAA2C,CAAC,CAAC,CAK3E;AACD;;;GAGG;AACH,0CAHW,UAAU,CAAC,GAA6C,CAAC,CAAC,CAAC,CAAC,GAC3D,OAAO,CAAC,UAAU,CAAC,GAA6C,CAAC,CAAC,CAK7E;AACD;;;;GAIG;AACH,oCAJW,UAAU,CAAC,GAA0C,CAAC,CAAC,CAAC,CAAC,YACzD,UAAU,CAAC,GAA0C,CAAC,CAAC,CAAC,CAAC,GACxD,OAAO,CAAC,UAAU,CAAC,GAA0C,CAAC,CAAC,CAK1E;AACD;;;GAGG;AACH,qCAHW,UAAU,CAAC,GAA2C,CAAC,CAAC,CAAC,CAAC,GACzD,OAAO,CAAC,UAAU,CAAC,GAA2C,CAAC,CAAC,CAK3E;AACD,kDAEC;AACD,kDAEC"} \ No newline at end of file +{"version":3,"file":"api.d.ts","sourceRoot":"","sources":["../src/api.js"],"names":[],"mappings":"AAaA;;;GAGG;AACH,8BAHW,UAAU,CAAC,OAAO,sBAAsB,EAAE,KAAK,CAAC,CAAC,CAAC,CAAC,GAClD,OAAO,CAAC,UAAU,CAAC,OAAO,sBAAsB,EAAE,KAAK,CAAC,CAAC,CAKpE;AACD;;;GAGG;AACH,2BAHW,UAAU,CAAC,OAAO,sBAAsB,EAAE,KAAK,CAAC,CAAC,CAAC,CAAC,GAClD,OAAO,CAAC,UAAU,CAAC,OAAO,sBAAsB,EAAE,KAAK,CAAC,CAAC,CAKpE;AACD;;;GAGG;AACH,qCAHW,UAAU,CAAC,OAAO,sBAAsB,EAAE,YAAY,CAAC,CAAC,CAAC,CAAC,GACzD,OAAO,CAAC,UAAU,CAAC,OAAO,sBAAsB,EAAE,YAAY,CAAC,CAAC,CAK3E;AACD;;;;GAIG;AACH,qCAJW,UAAU,CAAC,OAAO,sBAAsB,EAAE,YAAY,CAAC,CAAC,CAAC,CAAC,YAC1D,UAAU,CAAC,OAAO,sBAAsB,EAAE,YAAY,CAAC,CAAC,CAAC,CAAC,GACzD,OAAO,CAAC,UAAU,CAAC,OAAO,sBAAsB,EAAE,YAAY,CAAC,CAAC,CAK3E;AACD;;;GAGG;AACH,0CAHW,UAAU,CAAC,OAAO,sBAAsB,EAAE,cAAc,CAAC,CAAC,CAAC,CAAC,GAC3D,OAAO,CAAC,UAAU,CAAC,OAAO,sBAAsB,EAAE,cAAc,CAAC,CAAC,CAK7E;AACD;;;;GAIG;AACH,oCAJW,UAAU,CAAC,OAAO,sBAAsB,EAAE,WAAW,CAAC,CAAC,CAAC,CAAC,YACzD,UAAU,CAAC,OAAO,sBAAsB,EAAE,WAAW,CAAC,CAAC,CAAC,CAAC,GACxD,OAAO,CAAC,UAAU,CAAC,OAAO,sBAAsB,EAAE,WAAW,CAAC,CAAC,CAK1E;AACD;;;GAGG;AACH,qCAHW,UAAU,CAAC,OAAO,sBAAsB,EAAE,YAAY,CAAC,CAAC,CAAC,CAAC,GACzD,OAAO,CAAC,UAAU,CAAC,OAAO,sBAAsB,EAAE,YAAY,CAAC,CAAC,CAK3E;AACD,kDAEC;AACD,kDAEC"} \ No newline at end of file From d013159b89da992c12cac604faa7adcd77bfe469 Mon Sep 17 00:00:00 2001 From: Victor Adossi Date: Sun, 15 Feb 2026 21:33:48 +0900 Subject: [PATCH 2/7] chore(ops): add interfaces for expanded stream tests --- crates/test-components/wit/all.wit | 73 ++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/crates/test-components/wit/all.wit b/crates/test-components/wit/all.wit index cd9c43a85..486683910 100644 --- a/crates/test-components/wit/all.wit +++ b/crates/test-components/wit/all.wit @@ -9,10 +9,83 @@ interface async-add-s32 { } interface get-stream-async { + // variant example-variant { + // num(u32), + // str(string), + // float(f32), + // } + + // variant example-enum { + // first, + // second, + // third, + // } + + // record example-record { + // id: u32, + // id-str: string, + // } + + // resource example-resource { + // constructor(id: u32); + // get-id() -> u32; + // } + /// Get a stream that returns the exact same u32s that were passed in get-stream-u32: async func(vals: list) -> result, string>; /// Get a stream that returns the exact same s32s that were passed in get-stream-s32: async func(vals: list) -> result, string>; + + /// Get stream that returns bools (always `true`, then `false`) + //get-stream-bool: async func() -> result, string>; + //get-stream-u8: async func(vals: list) -> result, string>; + //get-stream-s8: async func(vals: list) -> result, string>; + + //get-stream-u16: async func(vals: list) -> result, string>; + //get-stream-s16: async func(vals: list) -> result, string>; + + //get-stream-u64: async func(vals: list) -> result, string>; + //get-stream-s64: async func(vals: list) -> result, string>; + + //get-stream-u64: async func(vals: list) -> result, string>; + //get-stream-s64: async func(vals: list) -> result, string>; + + //get-stream-f32: async func(vals: list) -> result, string>; + + //get-stream-f64: async func(vals: list) -> result, string>; + + //get-stream-char: async func(vals: list) -> result, string>; + + //get-stream-string: async func(vals: list) -> result, string>; + + //get-stream-record: async func(vals: list) -> result, string>; + + //get-stream-variant: async func(vals: list) -> result, string>; + + //get-stream-list-u8: async func(vals: list>) -> result>, string>; + //get-stream-list-string: async func(vals: list>) -> result>, string>; + //get-stream-list-record: async func(vals: list>) -> result>, string>; + + //get-stream-tuple: async func(vals: list>) -> result>, string>; + + //get-stream-flags: async func(vals: list) -> result, string>; + + //get-stream-enum: async func(vals: list) -> result, string>; + + //get-stream-option-string: async func(vals: list>) -> result>, string>; + + //get-stream-result-string: async func(vals: list>) -> result>, string>; + + //get-stream-result-string: async func(vals: list>) -> result>, string>; + + //get-stream-example-resource-borrow: async func(vals: list>) -> result>, string>; + + //get-stream-example-resource-own: async func(vals: list) -> result, string>; + + //get-stream-stream-string: async func(vals: list>) -> result>, string>; + //get-stream-future-string: async func(vals: list>) -> result>, string>; + + //get-stream-err-ctx: async func(vals: list) -> result, string>; } world async-simple-import { From 4b17bd1f91d2e2a8a740f2925c4891b049ced9f2 Mon Sep 17 00:00:00 2001 From: Victor Adossi Date: Wed, 18 Feb 2026 20:26:22 +0900 Subject: [PATCH 3/7] fix(bindgen): stream end handling, event loop --- .../src/function_bindgen.rs | 4 +- .../src/intrinsics/component.rs | 116 ++++++--- .../src/intrinsics/lift.rs | 10 +- .../src/intrinsics/mod.rs | 2 + .../src/intrinsics/p3/async_future.rs | 4 +- .../src/intrinsics/p3/async_stream.rs | 225 ++++++++++-------- .../src/intrinsics/p3/async_task.rs | 111 ++++++--- .../src/intrinsics/p3/host.rs | 4 +- .../src/intrinsics/p3/waitable.rs | 142 +++++------ .../src/transpile_bindgen.rs | 12 +- packages/jco/test/p3/stream.js | 26 +- 11 files changed, 377 insertions(+), 279 deletions(-) diff --git a/crates/js-component-bindgen/src/function_bindgen.rs b/crates/js-component-bindgen/src/function_bindgen.rs index 90ac7e6d7..2f673f5fe 100644 --- a/crates/js-component-bindgen/src/function_bindgen.rs +++ b/crates/js-component-bindgen/src/function_bindgen.rs @@ -2306,7 +2306,7 @@ impl Bindgen for FunctionBindgen<'_> { "stream element type mismatch" ); - let arg_stream_idx = operands + let arg_stream_end_idx = operands .first() .expect("unexpectedly missing stream table idx arg in StreamLift"); @@ -2390,7 +2390,7 @@ impl Bindgen for FunctionBindgen<'_> { const {result_var} = {stream_new_from_lift_fn}({{ componentIdx: {component_idx}, streamTableIdx: {stream_table_idx}, - streamIdx: {arg_stream_idx}, + streamEndIdx: {arg_stream_end_idx}, payloadLiftFn, payloadTypeSize32: {payload_ty_size_js}, payloadLowerFn, diff --git a/crates/js-component-bindgen/src/intrinsics/component.rs b/crates/js-component-bindgen/src/intrinsics/component.rs index 84387f2d9..ee2f5732b 100644 --- a/crates/js-component-bindgen/src/intrinsics/component.rs +++ b/crates/js-component-bindgen/src/intrinsics/component.rs @@ -158,7 +158,6 @@ impl ComponentIntrinsic { #parkedTasks = new Map(); #suspendedTasksByTaskID = new Map(); #suspendedTaskIDs = []; - #pendingTasks = []; #errored = null; #backpressure = 0; @@ -167,10 +166,13 @@ impl ComponentIntrinsic { #handlerMap = new Map(); #nextHandlerID = 0n; - mayLeave = true; - #streams; + #tickLoop = null; + #tickLoopInterval = null; + + mayLeave = true; + waitableSets; waitables; subtasks; @@ -178,7 +180,7 @@ impl ComponentIntrinsic { constructor(args) {{ this.#componentIdx = args.componentIdx; this.waitableSets = new {rep_table_class}({{ target: `component [${{this.#componentIdx}}] waitable sets` }}); - this.waitables = new {rep_table_class}({{ target: `component [${{this.#componentIdx}}] waitables` }}); + this.waitables = new {rep_table_class}({{ target: `component [${{this.#componentIdx}}] waitable objects` }}); this.subtasks = new {rep_table_class}({{ target: `component [${{this.#componentIdx}}] subtasks` }}); this.#streams = new Map(); }}; @@ -325,8 +327,10 @@ impl ComponentIntrinsic { }} // TODO: we might want to check for pre-locked status here - exclusiveLock() {{ - this.#locked = true; + exclusiveLock() {{ this.setLocked(true); }} + + setLocked(locked) {{ + this.#locked = locked; }} exclusiveRelease() {{ @@ -335,7 +339,7 @@ impl ComponentIntrinsic { componentIdx: this.#componentIdx, }}); - this.#locked = false + this.setLocked(false); }} isExclusivelyLocked() {{ return this.#locked === true; }} @@ -363,14 +367,14 @@ impl ComponentIntrinsic { }} }} + // TODO(threads): readyFn is normally on the thread suspendTask(args) {{ - // TODO(threads): readyFn is normally on the thread const {{ task, readyFn }} = args; const taskID = task.id(); {debug_log_fn}('[{class_name}#suspendTask()]', {{ taskID }}); if (this.#getSuspendedTaskMeta(taskID)) {{ - throw new Error('task [' + taskID + '] already suspended'); + throw new Error(`task [${{taskID}}] already suspended`); }} const {{ promise, resolve }} = Promise.withResolvers(); @@ -385,6 +389,12 @@ impl ComponentIntrinsic { }}, }}); + // NOTE: we perform an explicit tick here for suspended tasks that are + // immediately resumable, to ensure they do not fall too far behind + // in the async event queue + const done = this.tick(); + this.runTickLoop(); + return promise; }} @@ -395,6 +405,20 @@ impl ComponentIntrinsic { meta.resume(); }} + async runTickLoop() {{ + if (this.#tickLoop !== null) {{ await this.#tickLoop; }} + this.#tickLoop = new Promise(async (resolve) => {{ + let done = this.tick(); + while (!done) {{ + // TODO(fix): reduce latency on loop + await new Promise((resolve) => setTimeout(resolve, 10)); + done = this.tick(); + }} + this.#tickLoop = null; + resolve(); + }}); + }} + tick() {{ {debug_log_fn}('[{class_name}#tick()]', {{ suspendedTaskIDs: this.#suspendedTaskIDs }}); const resumableTasks = this.#suspendedTaskIDs.filter(t => t !== null); @@ -413,20 +437,17 @@ impl ComponentIntrinsic { return this.#suspendedTaskIDs.filter(t => t !== null).length === 0; }} - addPendingTask(task) {{ - this.#pendingTasks.push(task); - }} - addStreamEnd(args) {{ {debug_log_fn}('[{class_name}#addStreamEnd()] args', args); const {{ tableIdx, streamEnd }} = args; let tbl = this.#streams.get(tableIdx); if (!tbl) {{ - tbl = new {rep_table_class}({{ target: `component [${{this.#componentIdx}}] streams` }}); + tbl = new {rep_table_class}({{ target: `stream table (idx [${{tableIdx}}], component [${{this.#componentIdx}}])` }}); this.#streams.set(tableIdx, tbl); }} + // TODO(fix): streams are waitables so need to go there const streamIdx = tbl.insert(streamEnd); return streamIdx; }} @@ -437,63 +458,82 @@ impl ComponentIntrinsic { if (tableIdx === undefined) {{ throw new Error("missing table idx while adding stream"); }} if (elemMeta === undefined) {{ throw new Error("missing element metadata while adding stream"); }} - let tbl = this.#streams.get(tableIdx); - if (!tbl) {{ - tbl = new {rep_table_class}({{ target: `component [${{this.#componentIdx}}] streams` }}); - this.#streams.set(tableIdx, tbl); + let localStreamTable = this.#streams.get(tableIdx); + if (!localStreamTable) {{ + localStreamTable = new {rep_table_class}({{ target: `component [${{this.#componentIdx}}] streams` }}); + this.#streams.set(tableIdx, localStreamTable); }} const stream = new {internal_stream_class}({{ tableIdx, componentIdx: this.#componentIdx, elemMeta, + localStreamTable, + globalStreamMap: {global_stream_map}, }}); - const writeEndIdx = tbl.insert(stream.getWriteEnd()); - stream.setWriteEndIdx(writeEndIdx); - const readEndIdx = tbl.insert(stream.getReadEnd()); - stream.setReadEndIdx(readEndIdx); - const rep = {global_stream_map}.insert(stream); - stream.setRep(rep); + const writeEndIdx = this.waitables.insert(stream.writeEnd()); + stream.setWriteEndWaitableIdx(writeEndIdx); + + const readEndIdx = this.waitables.insert(stream.readEnd()); + stream.setReadEndWaitableIdx(readEndIdx); return {{ writeEndIdx, readEndIdx }}; }} getStreamEnd(args) {{ {debug_log_fn}('[{class_name}#getStreamEnd()] args', args); - const {{ tableIdx, streamIdx }} = args; - if (tableIdx === undefined) {{ throw new Error('missing table idx while retrieveing stream end'); }} - if (streamIdx === undefined) {{ throw new Error('missing stream idx while retrieveing stream end'); }} + const {{ tableIdx, streamEndIdx }} = args; + if (tableIdx === undefined) {{ throw new Error('missing table idx while getting stream end'); }} + if (streamEndIdx === undefined) {{ throw new Error('missing stream idx while getting stream end'); }} - const tbl = this.#streams.get(tableIdx); - if (!tbl) {{ + const streamEnd = this.waitables.get(streamEndIdx); + if (!streamEnd) {{ throw new Error(`missing stream table [${{tableIdx}}] in component [${{this.#componentIdx}}] while getting stream`); }} + if (streamEnd.streamTableIdx() !== tableIdx) {{ + throw new Error(`stream end table idx [${{streamEnd.streamTableIdx()}}] does not match [${{tableIdx}}]`); + }} - const stream = tbl.get(streamIdx); - return stream; + return streamEnd; }} + // TODO(fix): local/global stream table checks could be simplified/removed, if we centralize tracking removeStreamEnd(args) {{ {debug_log_fn}('[{class_name}#removeStreamEnd()] args', args); - const {{ tableIdx, streamIdx }} = args; + const {{ tableIdx, streamEndIdx }} = args; if (tableIdx === undefined) {{ throw new Error("missing table idx while removing stream end"); }} - if (streamIdx === undefined) {{ throw new Error("missing stream idx while removing stream end"); }} + if (streamEndIdx === undefined) {{ throw new Error("missing stream idx while removing stream end"); }} + + const streamEnd = this.waitables.get(streamEndIdx); + if (!streamEnd) {{ + throw new Error(`missing stream table [${{tableIdx}}] in component [${{this.#componentIdx}}] while getting stream`); + }} + if (streamEnd.streamTableIdx() !== tableIdx) {{ + throw new Error(`stream end table idx [${{streamEnd.streamTableIdx()}}] does not match [${{tableIdx}}]`); + }} const tbl = this.#streams.get(tableIdx); if (!tbl) {{ throw new Error(`missing stream table [${{tableIdx}}] in component [${{this.#componentIdx}}] while removing stream end`); }} - const stream = tbl.get(streamIdx); - if (!stream) {{ throw new Error(`component [${{this.#componentIdx}}] missing stream [${{streamIdx}}]`); }} + let removed = tbl.remove(streamEnd.idx()); + if (!removed) {{ + throw new Error(`failed to remove stream [${{streamEnd.idx()}}] in internal table (table [${{tableIdx}}]), component [${{this.#componentIdx}}] while removing stream end`); + }} + + removed = {global_stream_map}.remove(streamEnd.streamRep()); + if (!removed) {{ + throw new Error(`failed to remove stream [${{streamEnd.rep()}}] in global stream map (component [${{this.#componentIdx}}] while removing stream end`); + }} - const removed = tbl.remove(streamIdx); + removed = this.waitables.remove(streamEndIdx); if (!removed) {{ - throw new Error(`missing stream [${{streamIdx}}] (table [${{tableIdx}}]) in component [${{this.#componentIdx}}] while removing stream end`); + throw new Error(`failed to remove stream [${{streamEndIdx}}] waitable obj in component [${{this.#componentIdx}}] while removing stream end`); }} - return stream; + return streamEnd; }} }} "#, diff --git a/crates/js-component-bindgen/src/intrinsics/lift.rs b/crates/js-component-bindgen/src/intrinsics/lift.rs index 7b993821f..b8a811887 100644 --- a/crates/js-component-bindgen/src/intrinsics/lift.rs +++ b/crates/js-component-bindgen/src/intrinsics/lift.rs @@ -908,19 +908,19 @@ impl LiftIntrinsic { {debug_log_fn}('[{lift_flat_stream_fn}()] args', {{ componentTableIdx, ctx }}); const {{ memory, useDirectParams, params, componentIdx }} = ctx; - const streamIdx = params[0]; - if (!streamIdx) {{ throw new Error('missing stream idx'); }} + const streamEndIdx = params[0]; + if (!streamEndIdx) {{ throw new Error('missing stream idx'); }} const cstate = {get_or_create_async_state_fn}(componentIdx); if (!cstate) {{ throw new Error(`missing async state for component [${{componentIdx}}]`); }} - const streamEnd = cstate.getStreamEnd({{ tableIdx: componentTableIdx, streamIdx }}); + const streamEnd = cstate.getStreamEnd({{ tableIdx: componentTableIdx, streamEndIdx }}); if (!streamEnd) {{ - throw new Error(`missing stream [${{streamIdx}}] (table [${{componentTableIdx}}]) in component [${{componentIdx}}] during lift`); + throw new Error(`missing stream end [${{streamEndIdx}}] (table [${{componentTableIdx}}]) in component [${{componentIdx}}] during lift`); }} const stream = new {external_stream_class}({{ - hostStreamRep: streamEnd.getStreamRep(), + hostStreamRep: streamEnd.streamRep(), isReadable: streamEnd.isReadable(), isWritable: streamEnd.isWritable(), writeFn: (v) => {{ return streamEnd.write(v); }}, diff --git a/crates/js-component-bindgen/src/intrinsics/mod.rs b/crates/js-component-bindgen/src/intrinsics/mod.rs index 866585663..4390485c5 100644 --- a/crates/js-component-bindgen/src/intrinsics/mod.rs +++ b/crates/js-component-bindgen/src/intrinsics/mod.rs @@ -728,6 +728,8 @@ impl Intrinsic { this.target = args?.target; }} + data() {{ return this.#data; }} + insert(val) {{ {debug_log_fn}('[{rep_table_class}#insert()] args', {{ val, target: this.target }}); const freeIdx = this.#data[0]; diff --git a/crates/js-component-bindgen/src/intrinsics/p3/async_future.rs b/crates/js-component-bindgen/src/intrinsics/p3/async_future.rs index 8f470d17a..444d19855 100644 --- a/crates/js-component-bindgen/src/intrinsics/p3/async_future.rs +++ b/crates/js-component-bindgen/src/intrinsics/p3/async_future.rs @@ -404,7 +404,7 @@ impl AsyncFutureIntrinsic { }} if (futureEnd.hasPendingEvent()) {{ - const {{ code, index, payload }} = futureEnd.getEvent(); + const {{ code, payload0: index, payload1 }} = futureEnd.getEvent(); if (code !== eventCode || index != 1) {{ throw new Error('invalid event, does not match expected event code'); }} @@ -469,7 +469,7 @@ impl AsyncFutureIntrinsic { }} }} - const {{ code, index, payload }} = e.getEvent(); + const {{ code, payload0: index, payload1: payload }} = e.getEvent(); if (futureEnd.isCopying()) {{ throw new Error('future end is still in copying state'); }} if (code !== {async_event_code_enum}) {{ throw new Error('unexpected event code [' + code + '], expected [' + {async_event_code_enum} + ']'); }} if (index !== 1) {{ throw new Error('unexpected index, should be 1'); }} diff --git a/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs b/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs index d9315d7b4..f991ea8b1 100644 --- a/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs +++ b/crates/js-component-bindgen/src/intrinsics/p3/async_stream.rs @@ -272,7 +272,6 @@ impl AsyncStreamIntrinsic { }}; #waitable = null; - #waitableRep = null; #tableIdx = null; // stream table that contains the stream end #idx = null; // stream end index in the table @@ -281,8 +280,9 @@ impl AsyncStreamIntrinsic { #dropped = false; #copyState = {stream_end_class}.CopyState.IDLE; + target; + constructor(args) {{ - {debug_log_fn}('[{stream_end_class}#constructor()] args', args); const {{ tableIdx, componentIdx }} = args; if (tableIdx === undefined || typeof tableIdx !== 'number') {{ throw new TypeError(`missing table idx [${{tableIdx}}]`); @@ -290,14 +290,12 @@ impl AsyncStreamIntrinsic { if (tableIdx < 0 || tableIdx > 2_147_483_647) {{ throw new TypeError(`invalid tableIdx [${{tableIdx}}]`); }} - if (!args.waitable || !args.waitableRep) {{ - throw new Error('missing/invalid waitable'); - }} + if (!args.waitable) {{ throw new Error('missing/invalid waitable'); }} this.#tableIdx = args.tableIdx; this.#componentIdx = args.componentIdx ??= null; this.#waitable = args.waitable; - this.#waitableRep = args.waitableRep; + this.target = args.target; }} isHostOwned() {{ return this.#componentIdx === null; }} @@ -306,7 +304,9 @@ impl AsyncStreamIntrinsic { idx() {{ return this.#idx; }} setIdx(idx) {{ this.#idx = idx; }} - getWaitableRep() {{ return this.#waitableRep; }} + setTarget(tgt) {{ this.target = tgt; }} + + getWaitable() {{ return this.#waitable; }} setCopyState(state) {{ this.#copyState = state; }} getCopyState() {{ return this.#copyState; }} @@ -330,7 +330,7 @@ impl AsyncStreamIntrinsic { setPendingEventFn(fn) {{ if (!this.#waitable) {{ throw new Error('missing/invalid waitable'); }} {debug_log_fn}('[{stream_end_class}#setPendingEventFn()]', {{ - waitableRep: this.#waitableRep, + waitable: this.#waitable, waitableinSet: this.#waitable.isInSet(), componentIdx: this.#waitable.componentIdx(), }}); @@ -345,7 +345,7 @@ impl AsyncStreamIntrinsic { getPendingEvent() {{ if (!this.#waitable) {{ throw new Error('missing/invalid waitable'); }} {debug_log_fn}('[{stream_end_class}#getPendingEvent()]', {{ - waitableRep: this.#waitableRep, + waitable: this.#waitable, waitableinSet: this.#waitable.isInSet(), componentIdx: this.#waitable.componentIdx(), }}); @@ -361,7 +361,7 @@ impl AsyncStreamIntrinsic { if (this.#waitable) {{ const w = this.#waitable; this.#waitable = null; - this.#waitable.drop(); + w.drop(); }} this.#dropped = true; @@ -450,8 +450,8 @@ impl AsyncStreamIntrinsic { }} if (buffer.length > 2**28) {{ throw new Error('buffer uses reserved space'); }} - let packedResult = result | (buffer.copied() << 4); - return {{ code: eventCode, index: streamEnd.idx(), payload: packedResult }}; + const packedResult = (buffer.copied() << 4) | result; + return {{ code: eventCode, payload0: streamEnd.idx(), payload1: packedResult }}; }}; const onCopy = (reclaimBufferFn) => {{ @@ -488,7 +488,7 @@ impl AsyncStreamIntrinsic { return; }} - const pendingElemMeta = this.#pendingBufferMeta.elemMeta; + const pendingElemMeta = this.#pendingBufferMeta.buffer.getElemMeta(); const newBufferElemMeta = buffer.getElemMeta(); if (pendingElemMeta.typeIdx !== newBufferElemMeta.typeIdx) {{ throw new Error("trap: stream end type does not match internal buffer"); @@ -569,13 +569,13 @@ impl AsyncStreamIntrinsic { if (bufferRemaining > 0) {{ const count = Math.min(pendingRemaining, bufferRemaining); buffer.write(this.#pendingBufferMeta.buffer.read(count)) - this.#pendingBufferMeta.onCopyFn(() => self.resetPendingBufferMeta()); + this.#pendingBufferMeta.onCopyFn(() => this.resetPendingBufferMeta()); }} onCopyDone({stream_end_class}.CopyResult.COMPLETED); return; }} - this.setAndNotifyPending({stream_end_class}.CopyResult.COMPLETED); + this.resetAndNotifyPending({stream_end_class}.CopyResult.COMPLETED); this.setPendingBufferMeta({{ componentIdx: this.#componentIdx, buffer, onCopy, onCopyDone }}); }} "#, @@ -648,10 +648,26 @@ impl AsyncStreamIntrinsic { const event = this.getPendingEvent(); if (!event) {{ throw new Error("unexpectedly missing pending event"); }} + if (!event.code || !event.payload0 || !event.payload1) {{ + throw new Error("unexpectedly malformed event"); + }} + + const {{ code, payload0: index, payload1: payload }} = event; - const {{ code, index, payload }} = event; if (code !== eventCode || index !== this.#getEndIdxFn() || payload === {async_blocked_const}) {{ - throw new Error('invalid event code/event idx/payload during stream operation'); + const errMsg = "invalid event code/event during stream operation"; + {debug_log_fn}(errMsg, {{ + event, + payload, + payloadIsBlockedConst: payload === {async_blocked_const}, + code, + eventCode, + codeDoesNotMatchEventCode: code !== eventCode, + index, + internalEndIdx: this.#getEndIdxFn(), + indexDoesNotMatch: index !== this.#getEndIdxFn(), + }}); + throw new Error(errMsg); }} return payload; @@ -730,7 +746,7 @@ impl AsyncStreamIntrinsic { }}); let copied = packedResult >> 4; - let result = packedResult & 0xF; + let result = packedResult & 0x000F; const vs = buffer.read(); @@ -752,7 +768,9 @@ impl AsyncStreamIntrinsic { #pendingBufferMeta = null; // held by both write and read ends #getEndIdxFn; + #streamRep; + #streamTableIdx; constructor(args) {{ {debug_log_fn}('[{end_class_name}#constructor()] args', args); @@ -769,16 +787,26 @@ impl AsyncStreamIntrinsic { if (!args.getEndIdxFn) {{ throw new Error('missing/invalid fn for getting table idx'); }} this.#getEndIdxFn = args.getEndIdxFn; + + if (!args.streamRep) {{ throw new Error('missing/invalid rep for stream'); }} + this.#streamRep = args.streamRep; + + if (args.tableIdx === undefined) {{ throw new Error('missing index for stream table idx'); }} + this.#streamTableIdx = args.tableIdx; }} - setStreamRep(rep) {{ this.#streamRep = rep; }} - getStreamRep() {{ return this.#streamRep; }} + streamRep() {{ return this.#streamRep; }} + streamTableIdx() {{ return this.#streamTableIdx; }} + + // NOTE: the stream idx is the waitable idx under which it can be found + idx() {{ return this.#getEndIdxFn(); }} getElemMeta() {{ return {{...this.#elemMeta}}; }} {type_getter_impl} isDone() {{ this.getCopyState() === {stream_end_class}.CopyState.DONE; }} + isCompleted() {{ this.getCopyState() === {stream_end_class}.CopyState.COMPLETED; }} {action_impl} {inner_rw_impl} @@ -797,13 +825,10 @@ impl AsyncStreamIntrinsic { this.setPendingBufferMeta({{ componentIdx: null, buffer: null, onCopy: null, onCopyDone: null }}); }} - setPendingOnCopyFn(f) {{ this.#pendingBufferMeta.onCopyFn = f; }} - setPendingOnCopyDoneFn(f) {{ this.#pendingBufferMeta.onCopyDoneFn = f; }} - resetAndNotifyPending(result) {{ const f = this.#pendingBufferMeta.onCopyDoneFn; this.resetPendingBufferMeta(); - f(result); + if (f) {{ f(result); }} }} cancel() {{ @@ -829,80 +854,69 @@ impl AsyncStreamIntrinsic { let read_end_class = Self::StreamReadableEndClass.name(); let write_end_class = Self::StreamWritableEndClass.name(); let waitable_class = Intrinsic::Waitable(WaitableIntrinsic::WaitableClass).name(); - let get_or_create_async_state_fn = - Intrinsic::Component(ComponentIntrinsic::GetOrCreateAsyncState).name(); output.push_str(&format!( r#" class {internal_stream_class} {{ + #rep; + #idx; + #readEnd; - #readEndIdx; + #readEndWaitableIdx; #writeEnd; - #writeEndIdx; + #writeEndWaitableIdx; #pendingBufferMeta = {{}}; #elemMeta; - #waitable; - #waitableRep; - - // NOTE: pointer into the global stream map which contains `HostStream`/`InternalStream`s - #rep; constructor(args) {{ if (typeof args.componentIdx !== 'number') {{ throw new Error('missing/invalid component idx'); }} if (!args.elemMeta) {{ throw new Error('missing/invalid stream element metadata'); }} if (args.tableIdx === undefined) {{ throw new Error('missing/invalid stream table idx'); }} - const {{ tableIdx, componentIdx, elemMeta }} = args; + const {{ tableIdx, componentIdx, elemMeta, globalStreamMap, localStreamTable }} = args; this.#elemMeta = elemMeta; - const cstate = {get_or_create_async_state_fn}(componentIdx); - this.#waitable = new {waitable_class}({{ componentIdx: componentIdx }}); - this.#waitableRep = cstate.waitables.insert(this.#waitable); + if (args.globalStreamMap) {{ this.#rep = globalStreamMap.insert(this); }} + if (args.localStreamTable) {{ this.#idx = localStreamTable.insert(this); }} this.#readEnd = new {read_end_class}({{ componentIdx, tableIdx, elemMeta: this.#elemMeta, pendingBufferMeta: this.#pendingBufferMeta, - getEndIdxFn: () => this.#readEndIdx, - waitable: this.#waitable, - waitableRep: this.#waitableRep, + streamRep: this.#rep, + getEndIdxFn: () => this.#readEndWaitableIdx, + target: `stream write end (global rep [${{this.#rep}}])`, + waitable: new {waitable_class}({{ + componentIdx, + target: `stream read end (stream local idx [${{this.#rep}}], global rep [${{this.#rep}}])` + }}), }}); + this.#writeEnd = new {write_end_class}({{ componentIdx, tableIdx, elemMeta: this.#elemMeta, pendingBufferMeta: this.#pendingBufferMeta, - getEndIdxFn: () => this.#writeEndIdx, - waitable: this.#waitable, - waitableRep: this.#waitableRep, + getEndIdxFn: () => this.#writeEndWaitableIdx, + streamRep: this.#rep, + target: `stream write end (global rep [${{this.#rep}}])`, + waitable: new {waitable_class}({{ + componentIdx, + target: `stream write end (stream local idx [${{this.#rep}}], global rep [${{this.#rep}}])` + }}), }}); }} - getRep() {{ return this.#rep; }} - setRep(rep) {{ - if (this.#rep !== undefined) {{ throw new Error('cannot set stream rep twice'); }} - this.#rep = rep; - this.#readEnd.setStreamRep(this.#rep); - this.#writeEnd.setStreamRep(this.#rep); - }} + idx() {{ return this.#idx; }} + rep() {{ return this.#rep; }} - getReadEnd() {{ return this.#readEnd; }} - getReadEndIdx() {{ return this.#readEndIdx; }} - setReadEndIdx(idx) {{ - if (this.#readEndIdx !== undefined) {{ throw new Error("read end idx has already been set"); }} - this.#readEndIdx = idx; - this.#readEnd.setIdx(idx); - }} + readEnd() {{ return this.#readEnd; }} + setReadEndWaitableIdx(idx) {{ this.#readEndWaitableIdx = idx; }} - getWriteEnd() {{ return this.#writeEnd; }} - getWriteEndIdx() {{ return this.#writeEndIdx; }} - setWriteEndIdx(idx) {{ - if (this.#writeEndIdx !== undefined) {{ throw new Error("write end idx has already been set"); }} - this.#writeEndIdx = idx; - this.#writeEnd.setIdx(idx); - }} + writeEnd() {{ return this.#writeEnd; }} + setWriteEndWaitableIdx(idx) {{ this.#writeEndWaitableIdx = idx; }} }} "# )); @@ -930,7 +944,7 @@ impl AsyncStreamIntrinsic { r#" class {host_stream_class_name} {{ #componentIdx; - #streamIdx; + #streamEndIdx; #streamTableIdx; #payloadLiftFn; @@ -952,9 +966,9 @@ impl AsyncStreamIntrinsic { if (!args.payloadLowerFn) {{ throw new TypeError("missing payload lower fn"); }} this.#payloadLowerFn = args.payloadLowerFn; - if (args.streamIdx === undefined) {{ throw new Error("missing stream idx"); }} + if (args.streamEndIdx === undefined) {{ throw new Error("missing stream idx"); }} if (args.streamTableIdx === undefined) {{ throw new Error("missing stream table idx"); }} - this.#streamIdx = args.streamIdx; + this.#streamEndIdx = args.streamEndIdx; this.#streamTableIdx = args.streamTableIdx; this.#isUnitStream = args.isUnitStream; @@ -969,9 +983,9 @@ impl AsyncStreamIntrinsic { const cstate = {get_or_create_async_state_fn}(this.#componentIdx); if (!cstate) {{ throw new Error(`missing async state for component [${{this.#componentIdx}}]`); }} - const streamEnd = cstate.getStreamEnd({{ tableIdx: this.#streamTableIdx, streamIdx: this.#streamIdx }}); + const streamEnd = cstate.getStreamEnd({{ tableIdx: this.#streamTableIdx, streamEndIdx: this.#streamEndIdx }}); if (!streamEnd) {{ - throw new Error(`missing stream [${{this.#streamIdx}}] (table [${{this.#streamTableIdx}}], component [${{this.#componentIdx}}]`); + throw new Error(`missing stream [${{this.#streamEndIdx}}] (table [${{this.#streamTableIdx}}], component [${{this.#componentIdx}}]`); }} return new {external_stream_class}({{ @@ -1061,9 +1075,9 @@ impl AsyncStreamIntrinsic { let global_stream_map = Self::GlobalStreamMap.name(); let rep_table_class = Intrinsic::RepTableClass.name(); output.push_str(&format!( - " - const {global_stream_map} = new {rep_table_class}(); - " + r#" + const {global_stream_map} = new {rep_table_class}({{ target: 'global stream map' }}); + "# )); } @@ -1108,7 +1122,7 @@ impl AsyncStreamIntrinsic { elemMeta, }}); - return BigInt(readEndIdx) | (BigInt(writeEndIdx) << 32n); + return (BigInt(writeEndIdx) << 32n) | BigInt(readEndIdx); }} "#)); } @@ -1122,21 +1136,21 @@ impl AsyncStreamIntrinsic { Intrinsic::AsyncStream(AsyncStreamIntrinsic::HostStreamClass).name(); output.push_str(&format!( r#" - function {stream_new_from_lift_fn}(args) {{ - {debug_log_fn}('[{stream_new_from_lift_fn}()] args', args); + function {stream_new_from_lift_fn}(ctx) {{ + {debug_log_fn}('[{stream_new_from_lift_fn}()] args', {{ ctx }}); const {{ componentIdx, - streamIdx, + streamEndIdx, streamTableIdx, payloadLiftFn, payloadTypeSize32, payloadLowerFn, isUnitStream, - }} = args; + }} = ctx; const stream = new {host_stream_class}({{ componentIdx, - streamIdx, + streamEndIdx, streamTableIdx, payloadLiftFn: payloadLiftFn, payloadLowerFn: payloadLowerFn, @@ -1182,16 +1196,16 @@ impl AsyncStreamIntrinsic { count, ) {{ {debug_log_fn}('[{stream_op_fn}()] args', {{ ctx, streamEndIdx, ptr, count }}); - const {{ - componentIdx, - memoryIdx, - getMemoryFn, - reallocIdx, - getReallocFn, - stringEncoding, - isAsync, - streamTableIdx, - }} = ctx; + const {{ + componentIdx, + memoryIdx, + getMemoryFn, + reallocIdx, + getReallocFn, + stringEncoding, + isAsync, + streamTableIdx, + }} = ctx; if (componentIdx === undefined) {{ throw new TypeError("missing/invalid component idx"); }} if (streamTableIdx === undefined) {{ throw new TypeError("missing/invalid stream table idx"); }} @@ -1202,7 +1216,7 @@ impl AsyncStreamIntrinsic { // TODO(fix): check for may block & async - const streamEnd = cstate.getStreamEnd({{ tableIdx: streamTableIdx, streamIdx: streamEndIdx }}); + const streamEnd = cstate.getStreamEnd({{ tableIdx: streamTableIdx, streamEndIdx }}); if (!streamEnd) {{ throw new Error(`missing stream end [${{streamEndIdx}}] (table [${{streamTableIdx}}], component [${{componentIdx}}])`); }} @@ -1213,7 +1227,6 @@ impl AsyncStreamIntrinsic { throw new Error(`stream end table idx [${{streamEnd.getStreamTableIdx()}}] != operation table idx [${{streamTableIdx}}]`); }} - {debug_log_fn}('[{stream_op_fn}()] ABOUT TO DO THE COPY?'); const res = await streamEnd.copy({{ isAsync, memory: getMemoryFn(), @@ -1221,7 +1234,7 @@ impl AsyncStreamIntrinsic { count, eventCode: {event_code}, }}); - {debug_log_fn}('[{stream_op_fn}()] DID THE COPY?', {{ res }}); + return res; }} "#)); @@ -1281,7 +1294,7 @@ impl AsyncStreamIntrinsic { }} }} - const {{ code, index, payload }} = e.getEvent(); + const {{ code, payload0: index, payload1: payload }} = e.getEvent(); if (streamEnd.isCopying()) {{ throw new Error('stream end is still in copying state'); }} if (code !== {event_code_enum}) {{ throw new Error('unexpected event code [' + code + '], expected [' + {event_code_enum} + ']'); }} if (index !== 1) {{ throw new Error('unexpected index, should be 1'); }} @@ -1291,6 +1304,9 @@ impl AsyncStreamIntrinsic { ")); } + // NOTE: as writable drops are called from guests, they may happen *after* + // a host has tried to read off the end (i.e. getting back the async blocked constant), + // when running non-deterministrically (the default) Self::StreamDropReadable | Self::StreamDropWritable => { let debug_log_fn = Intrinsic::DebugLog.name(); let stream_drop_fn = self.name(); @@ -1305,8 +1321,8 @@ impl AsyncStreamIntrinsic { let get_or_create_async_state_fn = Intrinsic::Component(ComponentIntrinsic::GetOrCreateAsyncState).name(); output.push_str(&format!(r#" - function {stream_drop_fn}(ctx, streamIdx) {{ - {debug_log_fn}('[{stream_drop_fn}()] args', {{ ctx, streamIdx }}); + function {stream_drop_fn}(ctx, streamEndIdx) {{ + {debug_log_fn}('[{stream_drop_fn}()] args', {{ ctx, streamEndIdx }}); const {{ streamTableIdx, componentIdx }} = ctx; const task = {current_task_get_fn}(componentIdx); @@ -1315,13 +1331,16 @@ impl AsyncStreamIntrinsic { const cstate = {get_or_create_async_state_fn}(componentIdx); if (!cstate) {{ throw new Error(`missing component state for component idx [${{componentIdx}}]`); }} - const stream = cstate.removeStreamEnd({{ tableIdx: streamTableIdx, streamIdx }}); - if (!stream) {{ - throw new Error(`missing stream [${{streamIdx}}] (table [${{streamTableIdx}}], component [${{componentIdx}}])`); + const streamEnd = cstate.removeStreamEnd({{ tableIdx: streamTableIdx, streamEndIdx }}); + if (!streamEnd) {{ + throw new Error(`missing stream [${{streamEndIdx}}] (table [${{streamTableIdx}}], component [${{componentIdx}}])`); }} - if (!(stream instanceof {stream_end_class})) {{ + + if (!(streamEnd instanceof {stream_end_class})) {{ throw new Error('invalid stream end class, expected [{stream_end_class}]'); }} + + streamEnd.drop(); }} "#)); } @@ -1340,12 +1359,12 @@ impl AsyncStreamIntrinsic { output.push_str(&format!( r#" function {stream_transfer_fn}( - srcStreamIdx, + srcStreamEndIdx, srcTableIdx, destTableIdx, ) {{ {debug_log_fn}('[{stream_transfer_fn}()] args', {{ - srcStreamIdx, + srcStreamEndIdx, srcTableIdx, destTableIdx, }}); @@ -1363,15 +1382,15 @@ impl AsyncStreamIntrinsic { const cstate = {get_or_create_async_state_fn}(componentIdx); if (!cstate) {{ throw new Error(`unexpectedly missing async state for component [${{componentIdx}}]`); }} - const streamEnd = cstate.removeStreamEnd({{ tableIdx: srcTableIdx, streamIdx: srcStreamIdx }}); + const streamEnd = cstate.removeStreamEnd({{ tableIdx: srcTableIdx, streamEndIdx: srcStreamEndIdx }}); if (!streamEnd.isReadable()) {{ throw new Error("writable stream ends cannot be moved"); }} if (streamEnd.isDone()) {{ throw new Error('readable ends cannot be moved once writable ends are dropped'); }} - const streamIdx = cstate.addStreamEnd({{ tableIdx: destTableIdx, streamEnd }}); + const streamEndIdx = cstate.addStreamEnd({{ tableIdx: destTableIdx, streamEnd }}); - return streamIdx; + return streamEndIdx; }} "# )); diff --git a/crates/js-component-bindgen/src/intrinsics/p3/async_task.rs b/crates/js-component-bindgen/src/intrinsics/p3/async_task.rs index e4e9f7dd2..c208ac629 100644 --- a/crates/js-component-bindgen/src/intrinsics/p3/async_task.rs +++ b/crates/js-component-bindgen/src/intrinsics/p3/async_task.rs @@ -392,6 +392,8 @@ impl AsyncTaskIntrinsic { let debug_log_fn = Intrinsic::DebugLog.name(); let task_return_fn = Self::TaskReturn.name(); let current_task_get_fn = Self::GetCurrentTask.name(); + let get_or_create_async_state_fn = + Intrinsic::Component(ComponentIntrinsic::GetOrCreateAsyncState).name(); output.push_str(&format!(r#" function {task_return_fn}(ctx) {{ @@ -442,13 +444,21 @@ impl AsyncTaskIntrinsic { results.push(val); }} - // TODO(opt): during fused guest->guest calls, we have a helper fn for lift/lower + // Register an on-resolve handler that runs a tick loop + task.registerOnResolveHandler(() => {{ + // Trigger ticking for any suspended tasks + const cstate = {get_or_create_async_state_fn}(task.componentIdx()); + cstate.runTickLoop(); + }}); + + // NOTE: during fused guest->guest calls, we have a helper fn for lift/lower // so this task.return could be reduced to ~no-op // // We perform a superfluous lift and resolve in this fn to keep consistent with // the task machinery as it is normally used. task.resolve(results); + // If we are in a subtask, and have a fused helper function provided to use // via PrepareCall, we can use that function rather than performing lifting manually. // @@ -839,9 +849,9 @@ impl AsyncTaskIntrinsic { return this.#callbackFnName; }} - runCallbackFn(...args) {{ + async runCallbackFn(...args) {{ if (!this.#callbackFn) {{ throw new Error('on callback function has been set for task'); }} - return this.#callbackFn.apply(null, args); + return await this.#callbackFn.apply(null, args); }} getCalleeParams() {{ @@ -909,10 +919,12 @@ impl AsyncTaskIntrinsic { wset.incrementNumWaiting(); + // const pendingEventWaitID = wset.registerPendingEventWait(); const keepGoing = await this.suspendUntil({{ readyFn: () => {{ const hasPendingEvent = wset.hasPendingEvent(); - return readyFn() && hasPendingEvent; + const ready = readyFn(); + return ready && hasPendingEvent; }}, cancellable, }}); @@ -922,8 +934,8 @@ impl AsyncTaskIntrinsic { }} else {{ event = {{ code: {event_code_enum}.TASK_CANCELLED, - index: 0, - result: 0, + payload0: 0, + payload1: 0, }}; }} @@ -987,18 +999,18 @@ impl AsyncTaskIntrinsic { {debug_log_fn}('[{task_class}#yieldUntil()] args', {{ taskID: this.#id, cancellable }}); const keepGoing = await this.suspendUntil({{ readyFn, cancellable }}); - if (!keepGoing) {{ + if (keepGoing) {{ return {{ - code: {event_code_enum}.TASK_CANCELLED, - index: 0, - result: 0, + code: {event_code_enum}.NONE, + payload0: 0, + payload1: 0, }}; }} return {{ - code: {event_code_enum}.NONE, - index: 0, - result: 0, + code: {event_code_enum}.TASK_CANCELLED, + payload0: 0, + payload1: 0, }}; }} @@ -1019,12 +1031,12 @@ impl AsyncTaskIntrinsic { {debug_log_fn}('[{task_class}#immediateSuspendUntil()] args', {{ cancellable, readyFn }}); const ready = readyFn(); - if (ready && !{global_async_determinism} && {coin_flip_fn}()) {{ - return true; + if (ready && {global_async_determinism} === 'random') {{ + const coinFlip = {coin_flip_fn}(); + if (coinFlip) {{ return true }} }} - const cstate = {get_or_create_async_state_fn}(this.#componentIdx); - cstate.addPendingTask(this); + // TODO: it is often the case that ready is true, but since we're not doing const keepGoing = await this.immediateSuspend({{ cancellable, readyFn }}); return keepGoing; @@ -1039,12 +1051,7 @@ impl AsyncTaskIntrinsic { if (pendingCancelled) {{ return false; }} const cstate = {get_or_create_async_state_fn}(this.#componentIdx); - - // TODO(fix): update this to tick until there is no more action to take. - setTimeout(() => cstate.tick(), 0); - - const taskWait = await cstate.suspendTask({{ task: this, readyFn }}); - const keepGoing = await taskWait; + const keepGoing = await cstate.suspendTask({{ task: this, readyFn }}); return keepGoing; }} @@ -1260,16 +1267,15 @@ impl AsyncTaskIntrinsic { if (args.waitable) {{ this.#waitable = args.waitable; }} else {{ - const {{ promise, resolve, reject }} = promiseWithResolvers(); - this.#waitableResolve = resolve; - this.#waitableReject = reject; - const state = {get_or_create_async_state_fn}(this.#componentIdx); if (!state) {{ throw new Error('invalid/missing async state for component instance [' + componentIdx + ']'); }} - this.#waitable = new {waitable_class}({{ promise, componentIdx: this.#componentIdx }}); + this.#waitable = new {waitable_class}({{ componentIdx: this.#componentIdx }}); + this.#waitableResolve = () => this.#waitable.resolve(); + this.#waitableReject = () => this.#waitable.reject(); + this.#waitableRep = state.waitables.insert(this.#waitable); }} @@ -1502,12 +1508,10 @@ impl AsyncTaskIntrinsic { } Self::UnpackCallbackResult => { - let debug_log_fn = Intrinsic::DebugLog.name(); let unpack_callback_result_fn = Self::UnpackCallbackResult.name(); let i32_typecheck_fn = Intrinsic::TypeCheckValidI32.name(); output.push_str(&format!(" function {unpack_callback_result_fn}(result) {{ - {debug_log_fn}('[{unpack_callback_result_fn}()] args', {{ result }}); if (!({i32_typecheck_fn}(result))) {{ throw new Error('invalid callback return value [' + result + '], not a valid i32'); }} const eventCode = result & 0xF; if (eventCode < 0 || eventCode > 3) {{ @@ -1642,6 +1646,13 @@ impl AsyncTaskIntrinsic { cancellable: true, readyFn: () => !componentState.isExclusivelyLocked(), }}); + {debug_log_fn}('[{driver_loop_fn}()] finished yield', {{ + fnName, + componentIdx, + callbackFnName, + taskID: task.id(), + asyncRes, + }}); break; case 2: // WAIT for a given waitable set @@ -1653,21 +1664,33 @@ impl AsyncTaskIntrinsic { waitableSetRep, }}); asyncRes = await task.waitUntil({{ - readyFn: () => true, + readyFn: () => !componentState.isExclusivelyLocked(), waitableSetRep, cancellable: true, }}); + {debug_log_fn}('[{driver_loop_fn}()] finished waiting for event', {{ + fnName, + componentIdx, + callbackFnName, + taskID: task.id(), + waitableSetRep, + asyncRes, + }}); break; default: - throw new Error('Unrecognized async function result [' + ret + ']'); + throw new Error(`Unrecognized async function result [${{ret}}]`); }} componentState.exclusiveLock(); + if (asyncRes.code === undefined) {{ throw new Error("missing event code from event"); }} + if (asyncRes.payload0 === undefined) {{ throw new Error("missing payload0 from event"); }} + if (asyncRes.payload1 === undefined) {{ throw new Error("missing payload1 from event"); }} + eventCode = asyncRes.code; // async event enum code - index = asyncRes.index; // idx of related waitable set - result = asyncRes.result; // task state + index = asyncRes.payload0; // varies (e.g. idx of related waitable set) + result = asyncRes.payload1; // varies (e.g. task state) asyncRes = null; {debug_log_fn}('[{driver_loop_fn}()] performing callback', {{ @@ -1679,7 +1702,7 @@ impl AsyncTaskIntrinsic { result }}); - const callbackRes = task.runCallbackFn( + const callbackRes = await task.runCallbackFn( {to_int32_fn}(eventCode), {to_int32_fn}(index), {to_int32_fn}(result), @@ -1688,6 +1711,12 @@ impl AsyncTaskIntrinsic { unpacked = {unpack_callback_result_fn}(callbackRes); callbackCode = unpacked[0]; waitableSetRep = unpacked[1]; + + {debug_log_fn}('[{driver_loop_fn}()] callback result unpacked', {{ + callbackRes, + callbackCode, + waitableSetRep, + }}); }} }} catch (err) {{ {debug_log_fn}('[{driver_loop_fn}()] error during async driver loop', {{ @@ -1698,6 +1727,14 @@ impl AsyncTaskIntrinsic { result, err, }}); + console.error('[{driver_loop_fn}()] error during async driver loop', {{ + fnName, + callbackFnName, + eventCode, + index, + result, + err, + }}); reject(err); }} }} @@ -1772,8 +1809,8 @@ impl AsyncTaskIntrinsic { if (subtask.resolved()) {{ subtask.deliverResolve(); }} return {{ code: {async_event_code_enum}.SUBTASK, - index: rep, - result: subtask.getStateNumber(), + payload0: rep, + payload1: subtask.getStateNumber(), }} }}); }}); diff --git a/crates/js-component-bindgen/src/intrinsics/p3/host.rs b/crates/js-component-bindgen/src/intrinsics/p3/host.rs index 35adc786c..69960c8d4 100644 --- a/crates/js-component-bindgen/src/intrinsics/p3/host.rs +++ b/crates/js-component-bindgen/src/intrinsics/p3/host.rs @@ -312,8 +312,8 @@ impl HostIntrinsic { if (subtask.resolved()) {{ subtask.deliverResolve(); }} return {{ code: {async_event_code_enum}.SUBTASK, - index: rep, - result: subtask.getStateNumber(), + payload0: rep, + payload1: subtask.getStateNumber(), }} }}); }}); diff --git a/crates/js-component-bindgen/src/intrinsics/p3/waitable.rs b/crates/js-component-bindgen/src/intrinsics/p3/waitable.rs index 63a38512f..996161c68 100644 --- a/crates/js-component-bindgen/src/intrinsics/p3/waitable.rs +++ b/crates/js-component-bindgen/src/intrinsics/p3/waitable.rs @@ -3,7 +3,7 @@ use super::async_task::AsyncTaskIntrinsic; use crate::intrinsics::component::ComponentIntrinsic; use crate::intrinsics::p3::host::HostIntrinsic; -use crate::intrinsics::{AsyncDeterminismProfile, Intrinsic, RenderIntrinsicsArgs}; +use crate::intrinsics::{Intrinsic, RenderIntrinsicsArgs}; use crate::source::Source; /// This enum contains intrinsics that enable waitable sets @@ -146,21 +146,14 @@ impl WaitableIntrinsic { } /// Render an intrinsic to a string - pub fn render(&self, output: &mut Source, args: &RenderIntrinsicsArgs<'_>) { + pub fn render(&self, output: &mut Source, _args: &RenderIntrinsicsArgs<'_>) { match self { Self::WaitableSetClass => { let debug_log_fn = Intrinsic::DebugLog.name(); let waitable_set_class = Self::WaitableSetClass.name(); - let get_or_create_async_state_fn = - Intrinsic::Component(ComponentIntrinsic::GetOrCreateAsyncState).name(); - let maybe_shuffle = if args.determinism == AsyncDeterminismProfile::Random { - "this.shuffleWaitables();" - } else { - "" - }; - - output.push_str(&format!(r#" + output.push_str(&format!( + r#" class {waitable_set_class} {{ #componentIdx; #waitables = []; @@ -200,7 +193,8 @@ impl WaitableIntrinsic { hasPendingEvent() {{ {debug_log_fn}('[{waitable_set_class}#hasPendingEvent()] args', {{ - componentIdx: this.#componentIdx + componentIdx: this.#componentIdx, + waitableSet: this, }}); const waitable = this.#waitables.find(w => w.hasPendingEvent()); return waitable !== undefined; @@ -208,7 +202,8 @@ impl WaitableIntrinsic { getPendingEvent() {{ {debug_log_fn}('[{waitable_set_class}#getPendingEvent()] args', {{ - componentIdx: this.#componentIdx + componentIdx: this.#componentIdx, + waitableSet: this, }}); for (const waitable of this.#waitables) {{ if (!waitable.hasPendingEvent()) {{ continue; }} @@ -216,100 +211,87 @@ impl WaitableIntrinsic { }} throw new Error('no waitables had a pending event'); }} - - async poll() {{ - {debug_log_fn}('[{waitable_set_class}#poll()] args', {{ - componentIdx: this.#componentIdx - }}); - - const state = {get_or_create_async_state_fn}(this.#componentIdx); - - {maybe_shuffle} - - for (const waitableRep of this.#waitables) {{ - const w = state.waitables.get(waitableRep); - if (!w) {{ throw new Error('no waitable with rep [' + waitableRep + ']'); }} - waitables.push(w); - }} - - const event = await Promise.race(waitables.map((w) => w.promise)); - - throw new Error('{waitable_set_class}#poll() not implemented'); - }} }} - "#)); + "# + )); } Self::WaitableClass => { let debug_log_fn = Intrinsic::DebugLog.name(); let waitable_class = Self::WaitableClass.name(); - let get_or_create_async_state_fn = - Intrinsic::Component(ComponentIntrinsic::GetOrCreateAsyncState).name(); - output.push_str(&format!(r#" + output.push_str(&format!( + r#" class {waitable_class} {{ - static _ID = 0n; // NOTE: this id is *not* the component model representation (aka 'rep') - - #id; #componentIdx; + #pendingEventFn = null; - #waitableSet; + #promise; + #resolve; + #reject; + + #waitableSet; - constructor({{ promise, componentIdx }}) {{ - this.#id = ++{waitable_class}._ID; + target; + + constructor(args) {{ + const {{ componentIdx, target }} = args; this.#componentIdx = componentIdx; - this.#promise = promise; + this.target = args.target; + this.#resetPromise(); }} componentIdx() {{ return this.#componentIdx; }} isInSet() {{ return this.#waitableSet !== undefined; }} + #resetPromise() {{ + const {{ promise, resolve, reject }} = Promise.withResolvers() + this.#promise = promise; + this.#resolve = resolve; + this.#reject = reject; + }} + + resolve() {{ this.#resolve(); }} + reject(err) {{ this.#reject(err); }} + promise() {{ return this.#promise; }} + hasPendingEvent() {{ {debug_log_fn}('[{waitable_class}#hasPendingEvent()]', {{ componentIdx: this.#componentIdx, + waitable: this, waitableSet: this.#waitableSet, - hasPendingEvent: this.#pendingEventFn, + hasPendingEvent: this.#pendingEventFn !== null, }}); return this.#pendingEventFn !== null; }} setPendingEventFn(fn) {{ - {debug_log_fn}('[{waitable_class}#setPendingEvent()] args', {{ }}); + {debug_log_fn}('[{waitable_class}#setPendingEvent()] args', {{ + waitable: this, + waitableSet: this.#waitableSet, + }}); this.#pendingEventFn = fn; }} getPendingEvent() {{ - {debug_log_fn}('[{waitable_class}#getPendingEvent()] args', {{ }}); + {debug_log_fn}('[{waitable_class}#getPendingEvent()] args', {{ + waitable: this, + waitableSet: this.#waitableSet, + hasPendingEvent: this.#pendingEventFn !== null, + }}); if (this.#pendingEventFn === null) {{ return null; }} - const e = this.#pendingEventFn(); + const eventFn = this.#pendingEventFn; this.#pendingEventFn = null; + const e = eventFn(); + this.#resetPromise(); return e; }} - async poll() {{ - {debug_log_fn}('[{waitable_class}#poll()] args', {{ - componentIdx: this.#componentIdx, - _id: this.#id, - }}); - - const state = {get_or_create_async_state_fn}(this.#componentIdx); - if (!state) {{ - throw new Error('invalid/missing async state for component instance [' + componentIdx + ']'); - }} - - const waitables = []; - for (const waitableRep in waitableSet.waitables) {{ - const w = state.waitables.get(waitableRep); - if (!w) {{ throw new Error('no waitable with rep [' + waitableRep + ']'); }} - waitables.push(w); - }} - - const event = await Promise.race(waitables.map((w) => w.promise)); - - throw new Error('{waitable_class}#poll() not implemented'); - }} - join(waitableSet) {{ + {debug_log_fn}('[{waitable_class}#join()] args', {{ + waitable: this, + waitableSet: waitableSet, + }}); if (this.#waitableSet) {{ this.#waitableSet.removeWaitable(this); }} if (!waitableSet) {{ this.#waitableSet = null; @@ -322,7 +304,7 @@ impl WaitableIntrinsic { drop() {{ {debug_log_fn}('[{waitable_class}#drop()] args', {{ componentIdx: this.#componentIdx, - _id: this.#id, + waitable: this, }}); if (this.hasPendingEvent()) {{ throw new Error('waitables with pending events cannot be dropped'); @@ -405,7 +387,6 @@ impl WaitableIntrinsic { memoryIdx, waitableSetRep, resultPtr, - waitableSetRep, resultPtr }}); @@ -431,9 +412,9 @@ impl WaitableIntrinsic { let event; const cancelDelivered = task.deliverPendingCancel({{ cancelalble: isCancellable }}); if (cancelDelivered) {{ - event = {{ code: {async_event_code_enum}.TASK_CANCELLED, index: 0, result: 0 }}; + event = {{ code: {async_event_code_enum}.TASK_CANCELLED, payload0: 0, payload1: 0 }}; }} else if (!wset.hasPendingEvent()) {{ - event = {{ code: {async_event_code_enum}.NONE, index: 0, result: 0 }}; + event = {{ code: {async_event_code_enum}.NONE, payload0: 0, payload1: 0 }}; }} else {{ event = wset.getPendingEvent(); }} @@ -501,7 +482,7 @@ impl WaitableIntrinsic { let waitable_join_fn = Self::WaitableJoin.name(); let get_or_create_async_state_fn = Intrinsic::Component(ComponentIntrinsic::GetOrCreateAsyncState).name(); - output.push_str(&format!(" + output.push_str(&format!(r#" function {waitable_join_fn}(componentIdx, waitableRep, waitableSetRep) {{ {debug_log_fn}('[{waitable_join_fn}()] args', {{ componentIdx, waitableSetRep, waitableRep }}); @@ -514,10 +495,11 @@ impl WaitableIntrinsic { throw new Error('component instance is not marked as may leave, cannot join waitable'); }} - const waitable = state.waitables.get(waitableRep); - if (!waitable) {{ - throw new Error('failed to find waitable [' + waitableRep + '] in component instance [' + componentIdx + ']'); + const waitableObj = state.waitables.get(waitableRep); + if (!waitableObj) {{ + throw new Error(`missing waitable obj (rep [${{waitableRep}}]), component idx [${{componentIdx}}])`); }} + const waitable = typeof waitableObj.getWaitable === 'function' ? waitableObj.getWaitable() : waitableObj; const waitableSet = waitableSetRep === 0 ? null : state.waitableSets.get(waitableSetRep); if (waitableSetRep !== 0 && !waitableSet) {{ @@ -526,7 +508,7 @@ impl WaitableIntrinsic { waitable.join(waitableSet); }} - ")); + "#)); } } } diff --git a/crates/js-component-bindgen/src/transpile_bindgen.rs b/crates/js-component-bindgen/src/transpile_bindgen.rs index a094e4a9e..9b7156458 100644 --- a/crates/js-component-bindgen/src/transpile_bindgen.rs +++ b/crates/js-component-bindgen/src/transpile_bindgen.rs @@ -2433,8 +2433,18 @@ impl<'a> Instantiator<'a, '_> { GlobalInitializer::ExtractCallback(ExtractCallback { index, def }) => { let callback_idx = index.as_u32(); let core_def = self.core_def(def); + uwriteln!(self.src.js, "let callback_{callback_idx};",); - uwriteln!(self.src.js_init, "callback_{callback_idx} = {core_def};"); + + // If the function returns an async value like a stream or future, + // the callback that is executed in the the event loop (`AsyncTaskIntrinsic::DriverLoop`) + // may attempt to wait due to calling necessarily async host imports like {stream, future}.{write, read}. + // + // Here, we mark the task with an indicator that denotes whether the callback should be run this way. + // + // TODO: can we be more selective here rather than wrapping every callback in WebAssembly.promising? + // every callback *could* do stream.write, but many may not. + uwriteln!(self.src.js_init, "callback_{callback_idx} = WebAssembly.promising({core_def});"); } GlobalInitializer::InstantiateModule(m) => match m { diff --git a/packages/jco/test/p3/stream.js b/packages/jco/test/p3/stream.js index f229d1ee6..b9c80b42f 100644 --- a/packages/jco/test/p3/stream.js +++ b/packages/jco/test/p3/stream.js @@ -1,6 +1,6 @@ import { join } from "node:path"; -import { suite, test, assert } from "vitest"; +import { suite, test, assert, vi, expect } from "vitest"; import { setupAsyncTest } from "../helpers.js"; import { AsyncFunction, LOCAL_TEST_COMPONENTS_DIR } from "../common.js"; @@ -42,14 +42,22 @@ suite("Stream (WASI P3)", () => { assert.equal(vals[0], await stream.next(), "first u32 read is incorrect"); assert.equal(vals[1], await stream.next(), "second u32 read is incorrect"); assert.equal(vals[2], await stream.next(), "third u32 read is incorrect"); - // TODO(tests): we should check that reading with no values remaining blocks? - // TODO(tests): we should check that reading when writer is closed throws error? - - vals = [-11, -22, -33]; - stream = await instance["jco:test-components/get-stream-async"].getStreamS32(vals); - assert.equal(vals[0], await stream.next()); - assert.equal(vals[1], await stream.next()); - assert.equal(vals[2], await stream.next()); + // The fourth read should error, as the writer should have been dropped after writing three values. + // + // If the writer is dropped while the host attempts a read, the reader should error + await expect(vi.waitUntil( + async () => { + await stream.next(); + return true; // we should never get here, as an error should occur + }, + { timeout: 500, interval: 0 }, + )).rejects.toThrowError(/dropped/); + + // vals = [-11, -22, -33]; + // stream = await instance["jco:test-components/get-stream-async"].getStreamS32(vals); + // assert.equal(vals[0], await stream.next()); + // assert.equal(vals[1], await stream.next()); + // assert.equal(vals[2], await stream.next()); await cleanup(); }); From 07875421a2804e11d4024ed6059c82d1019077b6 Mon Sep 17 00:00:00 2001 From: Victor Adossi Date: Thu, 19 Feb 2026 00:48:40 +0900 Subject: [PATCH 4/7] test(jco): re-enable stream tests --- packages/jco/test/p3/stream.js | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/packages/jco/test/p3/stream.js b/packages/jco/test/p3/stream.js index b9c80b42f..2f1175ce9 100644 --- a/packages/jco/test/p3/stream.js +++ b/packages/jco/test/p3/stream.js @@ -42,22 +42,25 @@ suite("Stream (WASI P3)", () => { assert.equal(vals[0], await stream.next(), "first u32 read is incorrect"); assert.equal(vals[1], await stream.next(), "second u32 read is incorrect"); assert.equal(vals[2], await stream.next(), "third u32 read is incorrect"); - // The fourth read should error, as the writer should have been dropped after writing three values. + + // TODO(fix): re-enable this test, once we wait for writes and reject after drop()/closure of writer // - // If the writer is dropped while the host attempts a read, the reader should error - await expect(vi.waitUntil( - async () => { - await stream.next(); - return true; // we should never get here, as an error should occur - }, - { timeout: 500, interval: 0 }, - )).rejects.toThrowError(/dropped/); + // // The fourth read should error, as the writer should have been dropped after writing three values. + // // + // // If the writer is dropped while the host attempts a read, the reader should error + // await expect(vi.waitUntil( + // async () => { + // await stream.next(); + // return true; // we should never get here, as an error should occur + // }, + // { timeout: 500, interval: 0 }, + // )).rejects.toThrowError(/dropped/); - // vals = [-11, -22, -33]; - // stream = await instance["jco:test-components/get-stream-async"].getStreamS32(vals); - // assert.equal(vals[0], await stream.next()); - // assert.equal(vals[1], await stream.next()); - // assert.equal(vals[2], await stream.next()); + vals = [-11, -22, -33]; + stream = await instance["jco:test-components/get-stream-async"].getStreamS32(vals); + assert.equal(vals[0], await stream.next()); + assert.equal(vals[1], await stream.next()); + assert.equal(vals[2], await stream.next()); await cleanup(); }); From 005da8b1b873c3b17e95839e66a72a773b707477 Mon Sep 17 00:00:00 2001 From: Victor Adossi Date: Thu, 19 Feb 2026 01:12:00 +0900 Subject: [PATCH 5/7] feat(bindgen): add basic avoidance of overlapping async task runs --- .../src/function_bindgen.rs | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/crates/js-component-bindgen/src/function_bindgen.rs b/crates/js-component-bindgen/src/function_bindgen.rs index 2f673f5fe..8b9beecac 100644 --- a/crates/js-component-bindgen/src/function_bindgen.rs +++ b/crates/js-component-bindgen/src/function_bindgen.rs @@ -358,8 +358,31 @@ impl FunctionBindgen<'_> { let start_current_task_fn = self.intrinsic(Intrinsic::AsyncTask( AsyncTaskIntrinsic::CreateNewCurrentTask, )); + let global_task_map = self.intrinsic(Intrinsic::AsyncTask( + AsyncTaskIntrinsic::GlobalAsyncCurrentTaskMap, + )); let component_instance_idx = self.canon_opts.instance.as_u32(); + // If we're within an async function, wait for all top level previous tasks to finish before running + // to ensure that guests do not try to run two tasks at the same time. + if is_async && self.requires_async_porcelain { + uwriteln!( + self.src, + r#" + // All other tasks must finish before we can start this one + const taskMetas = {global_task_map}.get({component_instance_idx}); + if (taskMetas) {{ + const taskPromises = taskMetas + .filter(mt => mt.componentIdx === {component_instance_idx}) + .map(mt => mt.task) + .filter(t => !t.getParentSubtask()) + .map(t => t.completionPromise()); + await Promise.all(taskPromises); + }} + "#, + ); + } + uwriteln!( self.src, r#" From 721a18fc3e5a0d7d6a1eec058f402aec3334412a Mon Sep 17 00:00:00 2001 From: Victor Adossi Date: Thu, 19 Feb 2026 02:04:33 +0900 Subject: [PATCH 6/7] chore(bindgen): fmt --- crates/js-component-bindgen/src/transpile_bindgen.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/js-component-bindgen/src/transpile_bindgen.rs b/crates/js-component-bindgen/src/transpile_bindgen.rs index 9b7156458..892ec692d 100644 --- a/crates/js-component-bindgen/src/transpile_bindgen.rs +++ b/crates/js-component-bindgen/src/transpile_bindgen.rs @@ -2444,7 +2444,10 @@ impl<'a> Instantiator<'a, '_> { // // TODO: can we be more selective here rather than wrapping every callback in WebAssembly.promising? // every callback *could* do stream.write, but many may not. - uwriteln!(self.src.js_init, "callback_{callback_idx} = WebAssembly.promising({core_def});"); + uwriteln!( + self.src.js_init, + "callback_{callback_idx} = WebAssembly.promising({core_def});" + ); } GlobalInitializer::InstantiateModule(m) => match m { From a5ab81325d3eb75af24637ab9191414848c0e6bb Mon Sep 17 00:00:00 2001 From: Victor Adossi Date: Thu, 19 Feb 2026 02:05:14 +0900 Subject: [PATCH 7/7] chore(jco): lint --- packages/jco/test/p3/stream.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/jco/test/p3/stream.js b/packages/jco/test/p3/stream.js index 2f1175ce9..b08a025de 100644 --- a/packages/jco/test/p3/stream.js +++ b/packages/jco/test/p3/stream.js @@ -1,6 +1,6 @@ import { join } from "node:path"; -import { suite, test, assert, vi, expect } from "vitest"; +import { suite, test, assert } from "vitest"; import { setupAsyncTest } from "../helpers.js"; import { AsyncFunction, LOCAL_TEST_COMPONENTS_DIR } from "../common.js";