Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions crates/js-component-bindgen/src/function_bindgen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,31 @@ impl FunctionBindgen<'_> {
let start_current_task_fn = self.intrinsic(Intrinsic::AsyncTask(
AsyncTaskIntrinsic::CreateNewCurrentTask,
));
let global_task_map = self.intrinsic(Intrinsic::AsyncTask(
AsyncTaskIntrinsic::GlobalAsyncCurrentTaskMap,
));
let component_instance_idx = self.canon_opts.instance.as_u32();

// If we're within an async function, wait for all top level previous tasks to finish before running
// to ensure that guests do not try to run two tasks at the same time.
if is_async && self.requires_async_porcelain {
uwriteln!(
self.src,
r#"
// All other tasks must finish before we can start this one
const taskMetas = {global_task_map}.get({component_instance_idx});
if (taskMetas) {{
const taskPromises = taskMetas
.filter(mt => mt.componentIdx === {component_instance_idx})
.map(mt => mt.task)
.filter(t => !t.getParentSubtask())
.map(t => t.completionPromise());
await Promise.all(taskPromises);
}}
"#,
);
}

uwriteln!(
self.src,
r#"
Expand Down Expand Up @@ -2306,7 +2329,7 @@ impl Bindgen for FunctionBindgen<'_> {
"stream element type mismatch"
);

let arg_stream_idx = operands
let arg_stream_end_idx = operands
.first()
.expect("unexpectedly missing stream table idx arg in StreamLift");

Expand Down Expand Up @@ -2390,7 +2413,7 @@ impl Bindgen for FunctionBindgen<'_> {
const {result_var} = {stream_new_from_lift_fn}({{
componentIdx: {component_idx},
streamTableIdx: {stream_table_idx},
streamIdx: {arg_stream_idx},
streamEndIdx: {arg_stream_end_idx},
payloadLiftFn,
payloadTypeSize32: {payload_ty_size_js},
payloadLowerFn,
Expand Down
116 changes: 78 additions & 38 deletions crates/js-component-bindgen/src/intrinsics/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ impl ComponentIntrinsic {
#parkedTasks = new Map();
#suspendedTasksByTaskID = new Map();
#suspendedTaskIDs = [];
#pendingTasks = [];
#errored = null;

#backpressure = 0;
Expand All @@ -167,18 +166,21 @@ impl ComponentIntrinsic {
#handlerMap = new Map();
#nextHandlerID = 0n;

mayLeave = true;

#streams;

#tickLoop = null;
#tickLoopInterval = null;

mayLeave = true;

waitableSets;
waitables;
subtasks;

constructor(args) {{
this.#componentIdx = args.componentIdx;
this.waitableSets = new {rep_table_class}({{ target: `component [${{this.#componentIdx}}] waitable sets` }});
this.waitables = new {rep_table_class}({{ target: `component [${{this.#componentIdx}}] waitables` }});
this.waitables = new {rep_table_class}({{ target: `component [${{this.#componentIdx}}] waitable objects` }});
this.subtasks = new {rep_table_class}({{ target: `component [${{this.#componentIdx}}] subtasks` }});
this.#streams = new Map();
}};
Expand Down Expand Up @@ -325,8 +327,10 @@ impl ComponentIntrinsic {
}}

// TODO: we might want to check for pre-locked status here
exclusiveLock() {{
this.#locked = true;
exclusiveLock() {{ this.setLocked(true); }}

setLocked(locked) {{
this.#locked = locked;
}}

exclusiveRelease() {{
Expand All @@ -335,7 +339,7 @@ impl ComponentIntrinsic {
componentIdx: this.#componentIdx,
}});

this.#locked = false
this.setLocked(false);
}}

isExclusivelyLocked() {{ return this.#locked === true; }}
Expand Down Expand Up @@ -363,14 +367,14 @@ impl ComponentIntrinsic {
}}
}}

// TODO(threads): readyFn is normally on the thread
suspendTask(args) {{
// TODO(threads): readyFn is normally on the thread
const {{ task, readyFn }} = args;
const taskID = task.id();
{debug_log_fn}('[{class_name}#suspendTask()]', {{ taskID }});

if (this.#getSuspendedTaskMeta(taskID)) {{
throw new Error('task [' + taskID + '] already suspended');
throw new Error(`task [${{taskID}}] already suspended`);
}}

const {{ promise, resolve }} = Promise.withResolvers();
Expand All @@ -385,6 +389,12 @@ impl ComponentIntrinsic {
}},
}});

// NOTE: we perform an explicit tick here for suspended tasks that are
// immediately resumable, to ensure they do not fall too far behind
// in the async event queue
const done = this.tick();
this.runTickLoop();

return promise;
}}

Expand All @@ -395,6 +405,20 @@ impl ComponentIntrinsic {
meta.resume();
}}

async runTickLoop() {{
if (this.#tickLoop !== null) {{ await this.#tickLoop; }}
this.#tickLoop = new Promise(async (resolve) => {{
let done = this.tick();
while (!done) {{
// TODO(fix): reduce latency on loop
await new Promise((resolve) => setTimeout(resolve, 10));
done = this.tick();
}}
this.#tickLoop = null;
resolve();
}});
}}

tick() {{
{debug_log_fn}('[{class_name}#tick()]', {{ suspendedTaskIDs: this.#suspendedTaskIDs }});
const resumableTasks = this.#suspendedTaskIDs.filter(t => t !== null);
Expand All @@ -413,20 +437,17 @@ impl ComponentIntrinsic {
return this.#suspendedTaskIDs.filter(t => t !== null).length === 0;
}}

addPendingTask(task) {{
this.#pendingTasks.push(task);
}}

addStreamEnd(args) {{
{debug_log_fn}('[{class_name}#addStreamEnd()] args', args);
const {{ tableIdx, streamEnd }} = args;

let tbl = this.#streams.get(tableIdx);
if (!tbl) {{
tbl = new {rep_table_class}({{ target: `component [${{this.#componentIdx}}] streams` }});
tbl = new {rep_table_class}({{ target: `stream table (idx [${{tableIdx}}], component [${{this.#componentIdx}}])` }});
this.#streams.set(tableIdx, tbl);
}}

// TODO(fix): streams are waitables so need to go there
const streamIdx = tbl.insert(streamEnd);
return streamIdx;
}}
Expand All @@ -437,63 +458,82 @@ impl ComponentIntrinsic {
if (tableIdx === undefined) {{ throw new Error("missing table idx while adding stream"); }}
if (elemMeta === undefined) {{ throw new Error("missing element metadata while adding stream"); }}

let tbl = this.#streams.get(tableIdx);
if (!tbl) {{
tbl = new {rep_table_class}({{ target: `component [${{this.#componentIdx}}] streams` }});
this.#streams.set(tableIdx, tbl);
let localStreamTable = this.#streams.get(tableIdx);
if (!localStreamTable) {{
localStreamTable = new {rep_table_class}({{ target: `component [${{this.#componentIdx}}] streams` }});
this.#streams.set(tableIdx, localStreamTable);
}}

const stream = new {internal_stream_class}({{
tableIdx,
componentIdx: this.#componentIdx,
elemMeta,
localStreamTable,
globalStreamMap: {global_stream_map},
}});
const writeEndIdx = tbl.insert(stream.getWriteEnd());
stream.setWriteEndIdx(writeEndIdx);
const readEndIdx = tbl.insert(stream.getReadEnd());
stream.setReadEndIdx(readEndIdx);

const rep = {global_stream_map}.insert(stream);
stream.setRep(rep);
const writeEndIdx = this.waitables.insert(stream.writeEnd());
stream.setWriteEndWaitableIdx(writeEndIdx);

const readEndIdx = this.waitables.insert(stream.readEnd());
stream.setReadEndWaitableIdx(readEndIdx);

return {{ writeEndIdx, readEndIdx }};
}}

getStreamEnd(args) {{
{debug_log_fn}('[{class_name}#getStreamEnd()] args', args);
const {{ tableIdx, streamIdx }} = args;
if (tableIdx === undefined) {{ throw new Error('missing table idx while retrieveing stream end'); }}
if (streamIdx === undefined) {{ throw new Error('missing stream idx while retrieveing stream end'); }}
const {{ tableIdx, streamEndIdx }} = args;
if (tableIdx === undefined) {{ throw new Error('missing table idx while getting stream end'); }}
if (streamEndIdx === undefined) {{ throw new Error('missing stream idx while getting stream end'); }}

const tbl = this.#streams.get(tableIdx);
if (!tbl) {{
const streamEnd = this.waitables.get(streamEndIdx);
if (!streamEnd) {{
throw new Error(`missing stream table [${{tableIdx}}] in component [${{this.#componentIdx}}] while getting stream`);
}}
if (streamEnd.streamTableIdx() !== tableIdx) {{
throw new Error(`stream end table idx [${{streamEnd.streamTableIdx()}}] does not match [${{tableIdx}}]`);
}}

const stream = tbl.get(streamIdx);
return stream;
return streamEnd;
}}

// TODO(fix): local/global stream table checks could be simplified/removed, if we centralize tracking
removeStreamEnd(args) {{
{debug_log_fn}('[{class_name}#removeStreamEnd()] args', args);
const {{ tableIdx, streamIdx }} = args;
const {{ tableIdx, streamEndIdx }} = args;
if (tableIdx === undefined) {{ throw new Error("missing table idx while removing stream end"); }}
if (streamIdx === undefined) {{ throw new Error("missing stream idx while removing stream end"); }}
if (streamEndIdx === undefined) {{ throw new Error("missing stream idx while removing stream end"); }}

const streamEnd = this.waitables.get(streamEndIdx);
if (!streamEnd) {{
throw new Error(`missing stream table [${{tableIdx}}] in component [${{this.#componentIdx}}] while getting stream`);
}}
if (streamEnd.streamTableIdx() !== tableIdx) {{
throw new Error(`stream end table idx [${{streamEnd.streamTableIdx()}}] does not match [${{tableIdx}}]`);
}}

const tbl = this.#streams.get(tableIdx);
if (!tbl) {{
throw new Error(`missing stream table [${{tableIdx}}] in component [${{this.#componentIdx}}] while removing stream end`);
}}

const stream = tbl.get(streamIdx);
if (!stream) {{ throw new Error(`component [${{this.#componentIdx}}] missing stream [${{streamIdx}}]`); }}
let removed = tbl.remove(streamEnd.idx());
if (!removed) {{
throw new Error(`failed to remove stream [${{streamEnd.idx()}}] in internal table (table [${{tableIdx}}]), component [${{this.#componentIdx}}] while removing stream end`);
}}

removed = {global_stream_map}.remove(streamEnd.streamRep());
if (!removed) {{
throw new Error(`failed to remove stream [${{streamEnd.rep()}}] in global stream map (component [${{this.#componentIdx}}] while removing stream end`);
}}

const removed = tbl.remove(streamIdx);
removed = this.waitables.remove(streamEndIdx);
if (!removed) {{
throw new Error(`missing stream [${{streamIdx}}] (table [${{tableIdx}}]) in component [${{this.#componentIdx}}] while removing stream end`);
throw new Error(`failed to remove stream [${{streamEndIdx}}] waitable obj in component [${{this.#componentIdx}}] while removing stream end`);
}}

return stream;
return streamEnd;
}}
}}
"#,
Expand Down
10 changes: 5 additions & 5 deletions crates/js-component-bindgen/src/intrinsics/lift.rs
Original file line number Diff line number Diff line change
Expand Up @@ -908,19 +908,19 @@ impl LiftIntrinsic {
{debug_log_fn}('[{lift_flat_stream_fn}()] args', {{ componentTableIdx, ctx }});
const {{ memory, useDirectParams, params, componentIdx }} = ctx;

const streamIdx = params[0];
if (!streamIdx) {{ throw new Error('missing stream idx'); }}
const streamEndIdx = params[0];
if (!streamEndIdx) {{ throw new Error('missing stream idx'); }}

const cstate = {get_or_create_async_state_fn}(componentIdx);
if (!cstate) {{ throw new Error(`missing async state for component [${{componentIdx}}]`); }}

const streamEnd = cstate.getStreamEnd({{ tableIdx: componentTableIdx, streamIdx }});
const streamEnd = cstate.getStreamEnd({{ tableIdx: componentTableIdx, streamEndIdx }});
if (!streamEnd) {{
throw new Error(`missing stream [${{streamIdx}}] (table [${{componentTableIdx}}]) in component [${{componentIdx}}] during lift`);
throw new Error(`missing stream end [${{streamEndIdx}}] (table [${{componentTableIdx}}]) in component [${{componentIdx}}] during lift`);
}}

const stream = new {external_stream_class}({{
hostStreamRep: streamEnd.getStreamRep(),
hostStreamRep: streamEnd.streamRep(),
isReadable: streamEnd.isReadable(),
isWritable: streamEnd.isWritable(),
writeFn: (v) => {{ return streamEnd.write(v); }},
Expand Down
2 changes: 2 additions & 0 deletions crates/js-component-bindgen/src/intrinsics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,8 @@ impl Intrinsic {
this.target = args?.target;
}}

data() {{ return this.#data; }}

insert(val) {{
{debug_log_fn}('[{rep_table_class}#insert()] args', {{ val, target: this.target }});
const freeIdx = this.#data[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ impl AsyncFutureIntrinsic {
}}

if (futureEnd.hasPendingEvent()) {{
const {{ code, index, payload }} = futureEnd.getEvent();
const {{ code, payload0: index, payload1 }} = futureEnd.getEvent();
if (code !== eventCode || index != 1) {{
throw new Error('invalid event, does not match expected event code');
}}
Expand Down Expand Up @@ -469,7 +469,7 @@ impl AsyncFutureIntrinsic {
}}
}}

const {{ code, index, payload }} = e.getEvent();
const {{ code, payload0: index, payload1: payload }} = e.getEvent();
if (futureEnd.isCopying()) {{ throw new Error('future end is still in copying state'); }}
if (code !== {async_event_code_enum}) {{ throw new Error('unexpected event code [' + code + '], expected [' + {async_event_code_enum} + ']'); }}
if (index !== 1) {{ throw new Error('unexpected index, should be 1'); }}
Expand Down
Loading
Loading