diff --git a/Cargo.lock b/Cargo.lock index 06c185cb39..28524d6867 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -172,18 +172,6 @@ dependencies = [ "syn 2.0.90", ] -[[package]] -name = "auditable-serde" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c7bf8143dfc3c0258df908843e169b5cc5fcf76c7718bd66135ef4a9cd558c5" -dependencies = [ - "semver", - "serde", - "serde_json", - "topological-sort", -] - [[package]] name = "autocfg" version = "1.1.0" @@ -3307,15 +3295,6 @@ dependencies = [ "id-arena", ] -[[package]] -name = "spdx" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2971cb691ca629f46174f73b1f95356c5617f89b4167f04107167c3dccb8dd89" -dependencies = [ - "smallvec", -] - [[package]] name = "spin" version = "0.9.8" @@ -3709,12 +3688,6 @@ dependencies = [ "winnow", ] -[[package]] -name = "topological-sort" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea68304e134ecd095ac6c3574494fc62b909f416c4fca77e440530221e549d3d" - [[package]] name = "torch-sys" version = "0.17.0" @@ -4127,7 +4100,7 @@ checksum = "6ee99da9c5ba11bd675621338ef6fa52296b76b83305e9b6e5c77d4c286d6d49" [[package]] name = "wasm-compose" version = "0.227.1" -source = "git+https://github.com/bytecodealliance/wasm-tools?rev=43556041#43556041e072465c4a658da3d5b3ee7393211cc9" +source = "git+https://github.com/bytecodealliance/wasm-tools#8bd9442a363777a7ac289605247bb277f088532a" dependencies = [ "anyhow", "heck 0.4.1", @@ -4147,7 +4120,7 @@ dependencies = [ [[package]] name = "wasm-encoder" version = "0.227.1" -source = "git+https://github.com/bytecodealliance/wasm-tools?rev=43556041#43556041e072465c4a658da3d5b3ee7393211cc9" +source = "git+https://github.com/bytecodealliance/wasm-tools#8bd9442a363777a7ac289605247bb277f088532a" dependencies = [ "leb128fmt", "wasmparser", @@ -4156,17 +4129,10 @@ dependencies = [ [[package]] name = "wasm-metadata" version = "0.227.1" -source = "git+https://github.com/bytecodealliance/wasm-tools?rev=43556041#43556041e072465c4a658da3d5b3ee7393211cc9" +source = "git+https://github.com/bytecodealliance/wasm-tools#8bd9442a363777a7ac289605247bb277f088532a" dependencies = [ "anyhow", - "auditable-serde", - "flate2", "indexmap 2.7.0", - "serde", - "serde_derive", - "serde_json", - "spdx", - "url", "wasm-encoder", "wasmparser", ] @@ -4174,7 +4140,7 @@ dependencies = [ [[package]] name = "wasm-mutate" version = "0.227.1" -source = "git+https://github.com/bytecodealliance/wasm-tools?rev=43556041#43556041e072465c4a658da3d5b3ee7393211cc9" +source = "git+https://github.com/bytecodealliance/wasm-tools#8bd9442a363777a7ac289605247bb277f088532a" dependencies = [ "egg", "log", @@ -4187,7 +4153,7 @@ dependencies = [ [[package]] name = "wasm-smith" version = "0.227.1" -source = "git+https://github.com/bytecodealliance/wasm-tools?rev=43556041#43556041e072465c4a658da3d5b3ee7393211cc9" +source = "git+https://github.com/bytecodealliance/wasm-tools#8bd9442a363777a7ac289605247bb277f088532a" dependencies = [ "anyhow", "arbitrary", @@ -4206,7 +4172,7 @@ dependencies = [ [[package]] name = "wasm-wave" version = "0.227.1" -source = "git+https://github.com/bytecodealliance/wasm-tools?rev=43556041#43556041e072465c4a658da3d5b3ee7393211cc9" +source = "git+https://github.com/bytecodealliance/wasm-tools#8bd9442a363777a7ac289605247bb277f088532a" dependencies = [ "indexmap 2.7.0", "logos", @@ -4261,7 +4227,7 @@ dependencies = [ [[package]] name = "wasmparser" version = "0.227.1" -source = "git+https://github.com/bytecodealliance/wasm-tools?rev=43556041#43556041e072465c4a658da3d5b3ee7393211cc9" +source = "git+https://github.com/bytecodealliance/wasm-tools#8bd9442a363777a7ac289605247bb277f088532a" dependencies = [ "bitflags 2.6.0", "hashbrown 0.15.2", @@ -4282,7 +4248,7 @@ dependencies = [ [[package]] name = "wasmprinter" version = "0.227.1" -source = "git+https://github.com/bytecodealliance/wasm-tools?rev=43556041#43556041e072465c4a658da3d5b3ee7393211cc9" +source = "git+https://github.com/bytecodealliance/wasm-tools#8bd9442a363777a7ac289605247bb277f088532a" dependencies = [ "anyhow", "termcolor", @@ -4937,7 +4903,7 @@ dependencies = [ [[package]] name = "wast" version = "227.0.1" -source = "git+https://github.com/bytecodealliance/wasm-tools?rev=43556041#43556041e072465c4a658da3d5b3ee7393211cc9" +source = "git+https://github.com/bytecodealliance/wasm-tools#8bd9442a363777a7ac289605247bb277f088532a" dependencies = [ "bumpalo", "leb128fmt", @@ -4949,7 +4915,7 @@ dependencies = [ [[package]] name = "wat" version = "1.227.1" -source = "git+https://github.com/bytecodealliance/wasm-tools?rev=43556041#43556041e072465c4a658da3d5b3ee7393211cc9" +source = "git+https://github.com/bytecodealliance/wasm-tools#8bd9442a363777a7ac289605247bb277f088532a" dependencies = [ "wast 227.0.1", ] @@ -5317,7 +5283,7 @@ dependencies = [ [[package]] name = "wit-bindgen" version = "0.41.0" -source = "git+https://github.com/bytecodealliance/wit-bindgen?rev=ec56282b#ec56282b62be6b9619df4ea0cb8b79d828b5d070" +source = "git+https://github.com/bytecodealliance/witx-bindgen#40ca9b144f88528119fa1e5307df514a0181a1cd" dependencies = [ "wit-bindgen-rt 0.41.0", "wit-bindgen-rust-macro", @@ -5326,7 +5292,7 @@ dependencies = [ [[package]] name = "wit-bindgen-core" version = "0.41.0" -source = "git+https://github.com/bytecodealliance/wit-bindgen?rev=ec56282b#ec56282b62be6b9619df4ea0cb8b79d828b5d070" +source = "git+https://github.com/bytecodealliance/witx-bindgen#40ca9b144f88528119fa1e5307df514a0181a1cd" dependencies = [ "anyhow", "heck 0.5.0", @@ -5345,7 +5311,7 @@ dependencies = [ [[package]] name = "wit-bindgen-rt" version = "0.41.0" -source = "git+https://github.com/bytecodealliance/wit-bindgen?rev=ec56282b#ec56282b62be6b9619df4ea0cb8b79d828b5d070" +source = "git+https://github.com/bytecodealliance/witx-bindgen#40ca9b144f88528119fa1e5307df514a0181a1cd" dependencies = [ "bitflags 2.6.0", "futures", @@ -5355,7 +5321,7 @@ dependencies = [ [[package]] name = "wit-bindgen-rust" version = "0.41.0" -source = "git+https://github.com/bytecodealliance/wit-bindgen?rev=ec56282b#ec56282b62be6b9619df4ea0cb8b79d828b5d070" +source = "git+https://github.com/bytecodealliance/witx-bindgen#40ca9b144f88528119fa1e5307df514a0181a1cd" dependencies = [ "anyhow", "heck 0.5.0", @@ -5370,7 +5336,7 @@ dependencies = [ [[package]] name = "wit-bindgen-rust-macro" version = "0.41.0" -source = "git+https://github.com/bytecodealliance/wit-bindgen?rev=ec56282b#ec56282b62be6b9619df4ea0cb8b79d828b5d070" +source = "git+https://github.com/bytecodealliance/witx-bindgen#40ca9b144f88528119fa1e5307df514a0181a1cd" dependencies = [ "anyhow", "prettyplease", @@ -5384,7 +5350,7 @@ dependencies = [ [[package]] name = "wit-component" version = "0.227.1" -source = "git+https://github.com/bytecodealliance/wasm-tools?rev=43556041#43556041e072465c4a658da3d5b3ee7393211cc9" +source = "git+https://github.com/bytecodealliance/wasm-tools#8bd9442a363777a7ac289605247bb277f088532a" dependencies = [ "anyhow", "bitflags 2.6.0", @@ -5402,7 +5368,7 @@ dependencies = [ [[package]] name = "wit-parser" version = "0.227.1" -source = "git+https://github.com/bytecodealliance/wasm-tools?rev=43556041#43556041e072465c4a658da3d5b3ee7393211cc9" +source = "git+https://github.com/bytecodealliance/wasm-tools#8bd9442a363777a7ac289605247bb277f088532a" dependencies = [ "anyhow", "id-arena", diff --git a/Cargo.toml b/Cargo.toml index 109a25c444..77969d6b03 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -300,24 +300,22 @@ system-interface = { version = "0.27.1", features = ["cap_std_impls"] } io-lifetimes = { version = "2.0.3", default-features = false } io-extras = "0.18.1" rustix = "0.38.43" -# TODO: switch back to release: -wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "ec56282b", default-features = false } -wit-bindgen-rt = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "ec56282b", default-features = false } -wit-bindgen-rust-macro = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "ec56282b", default-features = false } +wit-bindgen = { version = "0.41.0", default-features = false } +wit-bindgen-rt = { version = "0.41.0", default-features = false } +wit-bindgen-rust-macro = { version = "0.41.0", default-features = false } # wasm-tools family: -# TODO: switch back to release: -wasmparser = { git = "https://github.com/bytecodealliance/wasm-tools", rev = "43556041", default-features = false, features = ['simd'] } -wat = { git = "https://github.com/bytecodealliance/wasm-tools", rev = "43556041" } -wast = { git = "https://github.com/bytecodealliance/wasm-tools", rev = "43556041" } -wasmprinter = { git = "https://github.com/bytecodealliance/wasm-tools", rev = "43556041" } -wasm-encoder = { git = "https://github.com/bytecodealliance/wasm-tools", rev = "43556041" } -wasm-smith = { git = "https://github.com/bytecodealliance/wasm-tools", rev = "43556041" } -wasm-mutate = { git = "https://github.com/bytecodealliance/wasm-tools", rev = "43556041" } -wit-parser = { git = "https://github.com/bytecodealliance/wasm-tools", rev = "43556041" } -wit-component = { git = "https://github.com/bytecodealliance/wasm-tools", rev = "43556041" } -wasm-wave = { git = "https://github.com/bytecodealliance/wasm-tools", rev = "43556041" } -wasm-compose = { git = "https://github.com/bytecodealliance/wasm-tools", rev = "43556041" } +wasmparser = { version = "0.227.0", default-features = false, features = ['simd'] } +wat = "1.227.0" +wast = "227.0.0" +wasmprinter = "0.227.0" +wasm-encoder = "0.227.0" +wasm-smith = "0.227.0" +wasm-mutate = "0.227.0" +wit-parser = "0.227.0" +wit-component = "0.227.0" +wasm-wave = "0.227.0" +wasm-compose = "0.227.0" # Non-Bytecode Alliance maintained dependencies: # -------------------------- @@ -572,17 +570,36 @@ inherits = "release" codegen-units = 1 lto = true -# TODO: remove this once we've switched to a wasm-tools release: +# TODO: remove this once we've switched to a wasm-tools/wit-bindgen release: [patch.crates-io] -wasmparser = { git = "https://github.com/bytecodealliance/wasm-tools", rev = "43556041" } -wat = { git = "https://github.com/bytecodealliance/wasm-tools", rev = "43556041" } -wast = { git = "https://github.com/bytecodealliance/wasm-tools", rev = "43556041" } -wasmprinter = { git = "https://github.com/bytecodealliance/wasm-tools", rev = "43556041" } -wasm-encoder = { git = "https://github.com/bytecodealliance/wasm-tools", rev = "43556041" } -wasm-smith = { git = "https://github.com/bytecodealliance/wasm-tools", rev = "43556041" } -wasm-mutate = { git = "https://github.com/bytecodealliance/wasm-tools", rev = "43556041" } -wit-parser = { git = "https://github.com/bytecodealliance/wasm-tools", rev = "43556041" } -wit-component = { git = "https://github.com/bytecodealliance/wasm-tools", rev = "43556041" } -wasm-wave = { git = "https://github.com/bytecodealliance/wasm-tools", rev = "43556041" } -wasm-compose = { git = "https://github.com/bytecodealliance/wasm-tools", rev = "43556041" } -wasm-metadata = { git = "https://github.com/bytecodealliance/wasm-tools", rev = "43556041" } +wasmparser = { git = "https://github.com/bytecodealliance/wasm-tools" } +wat = { git = "https://github.com/bytecodealliance/wasm-tools" } +wast = { git = "https://github.com/bytecodealliance/wasm-tools" } +wasmprinter = { git = "https://github.com/bytecodealliance/wasm-tools" } +wasm-encoder = { git = "https://github.com/bytecodealliance/wasm-tools" } +wasm-smith = { git = "https://github.com/bytecodealliance/wasm-tools" } +wasm-mutate = { git = "https://github.com/bytecodealliance/wasm-tools" } +wit-parser = { git = "https://github.com/bytecodealliance/wasm-tools" } +wit-component = { git = "https://github.com/bytecodealliance/wasm-tools" } +wasm-wave = { git = "https://github.com/bytecodealliance/wasm-tools" } +wasm-compose = { git = "https://github.com/bytecodealliance/wasm-tools" } +wasm-metadata = { git = "https://github.com/bytecodealliance/wasm-tools" } +wit-bindgen = { git = "https://github.com/bytecodealliance/witx-bindgen" } +wit-bindgen-rt = { git = "https://github.com/bytecodealliance/witx-bindgen" } +wit-bindgen-rust-macro = { git = "https://github.com/bytecodealliance/witx-bindgen" } + +# wasmparser = { path = '../wasm-tools/crates/wasmparser' } +# wat = { path = '../wasm-tools/crates/wat' } +# wast = { path = '../wasm-tools/crates/wast' } +# wasmprinter = { path = '../wasm-tools/crates/wasmprinter' } +# wasm-encoder = { path = '../wasm-tools/crates/wasm-encoder' } +# wasm-smith = { path = '../wasm-tools/crates/wasm-smith' } +# wasm-mutate = { path = '../wasm-tools/crates/wasm-mutate' } +# wit-parser = { path = '../wasm-tools/crates/wit-parser' } +# wit-component = { path = '../wasm-tools/crates/wit-component' } +# wasm-wave = { path = '../wasm-tools/crates/wasm-wave' } +# wasm-compose = { path = '../wasm-tools/crates/wasm-compose' } +# wasm-metadata = { path = '../wasm-tools/crates/wasm-metadata' } +# wit-bindgen = { path = '../wit-bindgen/crates/guest-rust' } +# wit-bindgen-rt = { path = '../wit-bindgen/crates/guest-rust/rt' } +# wit-bindgen-rust-macro = { path = '../wit-bindgen/crates/guest-rust/macro' } diff --git a/crates/cranelift/src/compiler/component.rs b/crates/cranelift/src/compiler/component.rs index e19a0a1679..bccd257933 100644 --- a/crates/cranelift/src/compiler/component.rs +++ b/crates/cranelift/src/compiler/component.rs @@ -140,12 +140,8 @@ impl<'a> TrampolineCompiler<'a> { host::stream_new, TrapSentinel::NegativeOne, ), - Trampoline::StreamRead { - ty, - err_ctx_ty, - options, - } => { - let tys = &[ty.as_u32(), err_ctx_ty.as_u32()]; + Trampoline::StreamRead { ty, options } => { + let tys = &[ty.as_u32()]; if let Some(info) = self.flat_stream_element_info(*ty).cloned() { self.translate_flat_stream_call(tys, options, host::flat_stream_read, &info) } else { @@ -157,12 +153,8 @@ impl<'a> TrampolineCompiler<'a> { ) } } - Trampoline::StreamWrite { - ty, - err_ctx_ty, - options, - } => { - let tys = &[ty.as_u32(), err_ctx_ty.as_u32()]; + Trampoline::StreamWrite { ty, options } => { + let tys = &[ty.as_u32()]; if let Some(info) = self.flat_stream_element_info(*ty).cloned() { self.translate_flat_stream_call(tys, options, host::flat_stream_write, &info) } else { @@ -180,42 +172,32 @@ impl<'a> TrampolineCompiler<'a> { Trampoline::StreamCancelWrite { ty, async_ } => { self.translate_cancel_call(ty.as_u32(), *async_, host::stream_cancel_write) } - Trampoline::StreamCloseReadable { ty, err_ctx_ty } => self - .translate_future_or_stream_call( - &[ty.as_u32(), err_ctx_ty.as_u32()], - None, - host::stream_close_readable, - TrapSentinel::Falsy, - ), - Trampoline::StreamCloseWritable { ty, err_ctx_ty } => self - .translate_future_or_stream_call( - &[ty.as_u32(), err_ctx_ty.as_u32()], - None, - host::stream_close_writable, - TrapSentinel::Falsy, - ), + Trampoline::StreamCloseReadable { ty } => self.translate_future_or_stream_call( + &[ty.as_u32()], + None, + host::stream_close_readable, + TrapSentinel::Falsy, + ), + Trampoline::StreamCloseWritable { ty } => self.translate_future_or_stream_call( + &[ty.as_u32()], + None, + host::stream_close_writable, + TrapSentinel::Falsy, + ), Trampoline::FutureNew { ty } => self.translate_future_or_stream_call( &[ty.as_u32()], None, host::future_new, TrapSentinel::NegativeOne, ), - Trampoline::FutureRead { - ty, - err_ctx_ty, - options, - } => self.translate_future_or_stream_call( - &[ty.as_u32(), err_ctx_ty.as_u32()], + Trampoline::FutureRead { ty, options } => self.translate_future_or_stream_call( + &[ty.as_u32()], Some(&options), host::future_read, TrapSentinel::NegativeOne, ), - Trampoline::FutureWrite { - ty, - err_ctx_ty, - options, - } => self.translate_future_or_stream_call( - &[ty.as_u32(), err_ctx_ty.as_u32()], + Trampoline::FutureWrite { ty, options } => self.translate_future_or_stream_call( + &[ty.as_u32()], Some(options), host::future_write, TrapSentinel::NegativeOne, @@ -226,20 +208,18 @@ impl<'a> TrampolineCompiler<'a> { Trampoline::FutureCancelWrite { ty, async_ } => { self.translate_cancel_call(ty.as_u32(), *async_, host::future_cancel_write) } - Trampoline::FutureCloseReadable { ty, err_ctx_ty } => self - .translate_future_or_stream_call( - &[ty.as_u32(), err_ctx_ty.as_u32()], - None, - host::future_close_readable, - TrapSentinel::Falsy, - ), - Trampoline::FutureCloseWritable { ty, err_ctx_ty } => self - .translate_future_or_stream_call( - &[ty.as_u32(), err_ctx_ty.as_u32()], - None, - host::future_close_writable, - TrapSentinel::Falsy, - ), + Trampoline::FutureCloseReadable { ty } => self.translate_future_or_stream_call( + &[ty.as_u32()], + None, + host::future_close_readable, + TrapSentinel::Falsy, + ), + Trampoline::FutureCloseWritable { ty } => self.translate_future_or_stream_call( + &[ty.as_u32()], + None, + host::future_close_writable, + TrapSentinel::Falsy, + ), Trampoline::ErrorContextNew { ty, options } => self.translate_error_context_call( *ty, options, @@ -256,12 +236,12 @@ impl<'a> TrampolineCompiler<'a> { Trampoline::ErrorContextDrop { ty } => self.translate_error_context_drop_call(*ty), Trampoline::ResourceTransferOwn => { self.translate_host_libcall(host::resource_transfer_own, |me, rets| { - rets[0] = me.raise_if_negative_one(rets[0]); + rets[0] = me.raise_if_negative_one_and_truncate(rets[0]); }) } Trampoline::ResourceTransferBorrow => { self.translate_host_libcall(host::resource_transfer_borrow, |me, rets| { - rets[0] = me.raise_if_negative_one(rets[0]); + rets[0] = me.raise_if_negative_one_and_truncate(rets[0]); }) } Trampoline::ResourceEnterCall => { @@ -281,17 +261,17 @@ impl<'a> TrampolineCompiler<'a> { } => self.translate_async_exit(*callback, *post_return), Trampoline::FutureTransfer => { self.translate_host_libcall(host::future_transfer, |me, rets| { - rets[0] = me.raise_if_negative_one(rets[0]); + rets[0] = me.raise_if_negative_one_and_truncate(rets[0]); }) } Trampoline::StreamTransfer => { self.translate_host_libcall(host::stream_transfer, |me, rets| { - rets[0] = me.raise_if_negative_one(rets[0]); + rets[0] = me.raise_if_negative_one_and_truncate(rets[0]); }) } Trampoline::ErrorContextTransfer => { self.translate_host_libcall(host::error_context_transfer, |me, rets| { - rets[0] = me.raise_if_negative_one(rets[0]); + rets[0] = me.raise_if_negative_one_and_truncate(rets[0]); }) } Trampoline::ContextGet(i) => self.translate_context_get(*i), @@ -379,12 +359,22 @@ impl<'a> TrampolineCompiler<'a> { let call = self.call_libcall(vmctx, get_libcall, args); let result = self.builder.func.dfg.inst_results(call)[0]; + let result_ty = self.builder.func.dfg.value_type(result); + let expected = &self.builder.func.signature.returns; match sentinel { TrapSentinel::NegativeOne => { - let result = self.raise_if_negative_one(result); + assert_eq!(expected.len(), 1); + let result = match (result_ty, expected[0].value_type) { + (ir::types::I64, ir::types::I32) => { + self.raise_if_negative_one_and_truncate(result) + } + (ir::types::I64, ir::types::I64) => self.raise_if_negative_one(result), + other => panic!("unsupported NegativeOne combo {other:?}"), + }; self.abi_store_results(&[result]); } TrapSentinel::Falsy => { + assert_eq!(expected.len(), 0); self.raise_if_host_trapped(result); self.builder.ins().return_(&[]); } @@ -923,7 +913,7 @@ impl<'a> TrampolineCompiler<'a> { ); let call = self.call_libcall(vmctx, host::resource_new32, &host_args); let result = self.builder.func.dfg.inst_results(call)[0]; - let result = self.raise_if_negative_one(result); + let result = self.raise_if_negative_one_and_truncate(result); self.abi_store_results(&[result]); } @@ -952,7 +942,7 @@ impl<'a> TrampolineCompiler<'a> { ); let call = self.call_libcall(vmctx, host::resource_rep32, &host_args); let result = self.builder.func.dfg.inst_results(call)[0]; - let result = self.raise_if_negative_one(result); + let result = self.raise_if_negative_one_and_truncate(result); self.abi_store_results(&[result]); } @@ -1479,11 +1469,16 @@ impl<'a> TrampolineCompiler<'a> { self.raise_if_host_trapped(succeeded); } + fn raise_if_negative_one_and_truncate(&mut self, ret: ir::Value) -> ir::Value { + let ret = self.raise_if_negative_one(ret); + self.builder.ins().ireduce(ir::types::I32, ret) + } + fn raise_if_negative_one(&mut self, ret: ir::Value) -> ir::Value { let minus_one = self.builder.ins().iconst(ir::types::I64, -1); let succeeded = self.builder.ins().icmp(IntCC::NotEqual, ret, minus_one); self.raise_if_host_trapped(succeeded); - self.builder.ins().ireduce(ir::types::I32, ret) + ret } fn call_libcall( diff --git a/crates/environ/src/component.rs b/crates/environ/src/component.rs index e9c9db9660..f939bd5884 100644 --- a/crates/environ/src/component.rs +++ b/crates/environ/src/component.rs @@ -112,35 +112,35 @@ macro_rules! foreach_builtin_component_function { #[cfg(feature = "component-model-async")] future_new(vmctx: vmctx, ty: u32) -> u64; #[cfg(feature = "component-model-async")] - future_write(vmctx: vmctx, memory: ptr_u8, realloc: ptr_u8, string_encoding: u8, async_: u8, ty: u32, err_ctx_ty: u32, future: u32, address: u32) -> u64; + future_write(vmctx: vmctx, memory: ptr_u8, realloc: ptr_u8, string_encoding: u8, async_: u8, ty: u32, future: u32, address: u32) -> u64; #[cfg(feature = "component-model-async")] - future_read(vmctx: vmctx, memory: ptr_u8, realloc: ptr_u8, string_encoding: u8, async_: u8, ty: u32, err_ctx_ty: u32, future: u32, address: u32) -> u64; + future_read(vmctx: vmctx, memory: ptr_u8, realloc: ptr_u8, string_encoding: u8, async_: u8, ty: u32, future: u32, address: u32) -> u64; #[cfg(feature = "component-model-async")] future_cancel_write(vmctx: vmctx, ty: u32, async_: u8, writer: u32) -> u64; #[cfg(feature = "component-model-async")] future_cancel_read(vmctx: vmctx, ty: u32, async_: u8, reader: u32) -> u64; #[cfg(feature = "component-model-async")] - future_close_writable(vmctx: vmctx, ty: u32, err_ctx_ty: u32, writer: u32, error: u32) -> bool; + future_close_writable(vmctx: vmctx, ty: u32, writer: u32) -> bool; #[cfg(feature = "component-model-async")] - future_close_readable(vmctx: vmctx, ty: u32, err_ctx_ty: u32, reader: u32, error: u32) -> bool; + future_close_readable(vmctx: vmctx, ty: u32, reader: u32) -> bool; #[cfg(feature = "component-model-async")] stream_new(vmctx: vmctx, ty: u32) -> u64; #[cfg(feature = "component-model-async")] - stream_write(vmctx: vmctx, memory: ptr_u8, realloc: ptr_u8, string_encoding: u8, async_: u8, ty: u32, err_ctx_ty: u32, stream: u32, address: u32, count: u32) -> u64; + stream_write(vmctx: vmctx, memory: ptr_u8, realloc: ptr_u8, string_encoding: u8, async_: u8, ty: u32, stream: u32, address: u32, count: u32) -> u64; #[cfg(feature = "component-model-async")] - stream_read(vmctx: vmctx, memory: ptr_u8, realloc: ptr_u8, string_encoding: u8, async_: u8, ty: u32, err_ctx_ty: u32, stream: u32, address: u32, count: u32) -> u64; + stream_read(vmctx: vmctx, memory: ptr_u8, realloc: ptr_u8, string_encoding: u8, async_: u8, ty: u32, stream: u32, address: u32, count: u32) -> u64; #[cfg(feature = "component-model-async")] stream_cancel_write(vmctx: vmctx, ty: u32, async_: u8, writer: u32) -> u64; #[cfg(feature = "component-model-async")] stream_cancel_read(vmctx: vmctx, ty: u32, async_: u8, reader: u32) -> u64; #[cfg(feature = "component-model-async")] - stream_close_writable(vmctx: vmctx, ty: u32, err_ctx_ty: u32, writer: u32, error: u32) -> bool; + stream_close_writable(vmctx: vmctx, ty: u32, writer: u32) -> bool; #[cfg(feature = "component-model-async")] - stream_close_readable(vmctx: vmctx, ty: u32, err_ctx_ty: u32, reader: u32, error: u32) -> bool; + stream_close_readable(vmctx: vmctx, ty: u32, reader: u32) -> bool; #[cfg(feature = "component-model-async")] - flat_stream_write(vmctx: vmctx, memory: ptr_u8, realloc: ptr_u8, async_: u8, ty: u32, err_ctx_ty: u32, payload_size: u32, payload_align: u32, stream: u32, address: u32, count: u32) -> u64; + flat_stream_write(vmctx: vmctx, memory: ptr_u8, realloc: ptr_u8, async_: u8, ty: u32, payload_size: u32, payload_align: u32, stream: u32, address: u32, count: u32) -> u64; #[cfg(feature = "component-model-async")] - flat_stream_read(vmctx: vmctx, memory: ptr_u8, realloc: ptr_u8, async_: u8, ty: u32, err_ctx_ty: u32, payload_size: u32, payload_align: u32, stream: u32, address: u32, count: u32) -> u64; + flat_stream_read(vmctx: vmctx, memory: ptr_u8, realloc: ptr_u8, async_: u8, ty: u32, payload_size: u32, payload_align: u32, stream: u32, address: u32, count: u32) -> u64; #[cfg(feature = "component-model-async")] error_context_new(vmctx: vmctx, memory: ptr_u8, realloc: ptr_u8, string_encoding: u8, ty: u32, debug_msg_address: u32, debug_msg_len: u32) -> u64; #[cfg(feature = "component-model-async")] diff --git a/crates/environ/src/component/dfg.rs b/crates/environ/src/component/dfg.rs index c6f5c9da2b..b58c52e61a 100644 --- a/crates/environ/src/component/dfg.rs +++ b/crates/environ/src/component/dfg.rs @@ -320,12 +320,10 @@ pub enum Trampoline { }, StreamRead { ty: TypeStreamTableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, options: CanonicalOptions, }, StreamWrite { ty: TypeStreamTableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, options: CanonicalOptions, }, StreamCancelRead { @@ -338,23 +336,19 @@ pub enum Trampoline { }, StreamCloseReadable { ty: TypeStreamTableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, }, StreamCloseWritable { ty: TypeStreamTableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, }, FutureNew { ty: TypeFutureTableIndex, }, FutureRead { ty: TypeFutureTableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, options: CanonicalOptions, }, FutureWrite { ty: TypeFutureTableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, options: CanonicalOptions, }, FutureCancelRead { @@ -367,11 +361,9 @@ pub enum Trampoline { }, FutureCloseReadable { ty: TypeFutureTableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, }, FutureCloseWritable { ty: TypeFutureTableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, }, ErrorContextNew { ty: TypeComponentLocalErrorContextTableIndex, @@ -831,22 +823,12 @@ impl LinearizeDfg<'_> { instance: *instance, }, Trampoline::StreamNew { ty } => info::Trampoline::StreamNew { ty: *ty }, - Trampoline::StreamRead { - ty, - err_ctx_ty, - options, - } => info::Trampoline::StreamRead { + Trampoline::StreamRead { ty, options } => info::Trampoline::StreamRead { ty: *ty, - err_ctx_ty: *err_ctx_ty, options: self.options(options), }, - Trampoline::StreamWrite { - ty, - err_ctx_ty, - options, - } => info::Trampoline::StreamWrite { + Trampoline::StreamWrite { ty, options } => info::Trampoline::StreamWrite { ty: *ty, - err_ctx_ty: *err_ctx_ty, options: self.options(options), }, Trampoline::StreamCancelRead { ty, async_ } => info::Trampoline::StreamCancelRead { @@ -857,35 +839,19 @@ impl LinearizeDfg<'_> { ty: *ty, async_: *async_, }, - Trampoline::StreamCloseReadable { ty, err_ctx_ty } => { - info::Trampoline::StreamCloseReadable { - ty: *ty, - err_ctx_ty: *err_ctx_ty, - } + Trampoline::StreamCloseReadable { ty } => { + info::Trampoline::StreamCloseReadable { ty: *ty } } - Trampoline::StreamCloseWritable { ty, err_ctx_ty } => { - info::Trampoline::StreamCloseWritable { - ty: *ty, - err_ctx_ty: *err_ctx_ty, - } + Trampoline::StreamCloseWritable { ty } => { + info::Trampoline::StreamCloseWritable { ty: *ty } } Trampoline::FutureNew { ty } => info::Trampoline::FutureNew { ty: *ty }, - Trampoline::FutureRead { - ty, - err_ctx_ty, - options, - } => info::Trampoline::FutureRead { + Trampoline::FutureRead { ty, options } => info::Trampoline::FutureRead { ty: *ty, - err_ctx_ty: *err_ctx_ty, options: self.options(options), }, - Trampoline::FutureWrite { - ty, - err_ctx_ty, - options, - } => info::Trampoline::FutureWrite { + Trampoline::FutureWrite { ty, options } => info::Trampoline::FutureWrite { ty: *ty, - err_ctx_ty: *err_ctx_ty, options: self.options(options), }, Trampoline::FutureCancelRead { ty, async_ } => info::Trampoline::FutureCancelRead { @@ -896,17 +862,11 @@ impl LinearizeDfg<'_> { ty: *ty, async_: *async_, }, - Trampoline::FutureCloseReadable { ty, err_ctx_ty } => { - info::Trampoline::FutureCloseReadable { - ty: *ty, - err_ctx_ty: *err_ctx_ty, - } + Trampoline::FutureCloseReadable { ty } => { + info::Trampoline::FutureCloseReadable { ty: *ty } } - Trampoline::FutureCloseWritable { ty, err_ctx_ty } => { - info::Trampoline::FutureCloseWritable { - ty: *ty, - err_ctx_ty: *err_ctx_ty, - } + Trampoline::FutureCloseWritable { ty } => { + info::Trampoline::FutureCloseWritable { ty: *ty } } Trampoline::ErrorContextNew { ty, options } => info::Trampoline::ErrorContextNew { ty: *ty, diff --git a/crates/environ/src/component/info.rs b/crates/environ/src/component/info.rs index 8b8f382fa2..842440e035 100644 --- a/crates/environ/src/component/info.rs +++ b/crates/environ/src/component/info.rs @@ -764,9 +764,6 @@ pub enum Trampoline { /// The table index for the specific `stream` type and caller instance. ty: TypeStreamTableIndex, - /// The table index for the `error-context` type in the caller instance. - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, - /// Any options (e.g. string encoding) to use when storing values to /// memory. options: CanonicalOptions, @@ -777,9 +774,6 @@ pub enum Trampoline { /// The table index for the specific `stream` type and caller instance. ty: TypeStreamTableIndex, - /// The table index for the `error-context` type in the caller instance. - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, - /// Any options (e.g. string encoding) to use when storing values to /// memory. options: CanonicalOptions, @@ -810,9 +804,6 @@ pub enum Trampoline { StreamCloseReadable { /// The table index for the specific `stream` type and caller instance. ty: TypeStreamTableIndex, - - /// The table index for the `error-context` type in the caller instance. - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, }, /// A `stream.close-writable` intrinsic to close the writable end of a @@ -820,9 +811,6 @@ pub enum Trampoline { StreamCloseWritable { /// The table index for the specific `stream` type and caller instance. ty: TypeStreamTableIndex, - - /// The table index for the `error-context` type in the caller instance. - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, }, /// A `future.new` intrinsic to create a new `future` handle of the @@ -837,9 +825,6 @@ pub enum Trampoline { /// The table index for the specific `future` type and caller instance. ty: TypeFutureTableIndex, - /// The table index for the `error-context` type in the caller instance. - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, - /// Any options (e.g. string encoding) to use when storing values to /// memory. options: CanonicalOptions, @@ -850,9 +835,6 @@ pub enum Trampoline { /// The table index for the specific `future` type and caller instance. ty: TypeFutureTableIndex, - /// The table index for the `error-context` type in the caller instance. - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, - /// Any options (e.g. string encoding) to use when storing values to /// memory. options: CanonicalOptions, @@ -883,9 +865,6 @@ pub enum Trampoline { FutureCloseReadable { /// The table index for the specific `future` type and caller instance. ty: TypeFutureTableIndex, - - /// The table index for the `error-context` type in the caller instance. - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, }, /// A `future.close-writable` intrinsic to close the writable end of a @@ -893,9 +872,6 @@ pub enum Trampoline { FutureCloseWritable { /// The table index for the specific `future` type and caller instance. ty: TypeFutureTableIndex, - - /// The table index for the `error-context` type in the caller instance. - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, }, /// A `error-context.new` intrinsic to create a new `error-context` with a diff --git a/crates/environ/src/component/translate/inline.rs b/crates/environ/src/component/translate/inline.rs index 5905e17647..0b6f77f6a0 100644 --- a/crates/environ/src/component/translate/inline.rs +++ b/crates/environ/src/component/translate/inline.rs @@ -792,17 +792,12 @@ impl<'a> Inliner<'a> { else { unreachable!() }; - let err_ctx_ty = types.error_context_table_type()?; let options = self.adapter_options(frame, types, options); let options = self.canonical_options(options); - let index = self.result.trampolines.push(( - *func, - dfg::Trampoline::StreamRead { - ty, - err_ctx_ty, - options, - }, - )); + let index = self + .result + .trampolines + .push((*func, dfg::Trampoline::StreamRead { ty, options })); frame.funcs.push(dfg::CoreDef::Trampoline(index)); } StreamWrite { ty, func, options } => { @@ -811,17 +806,12 @@ impl<'a> Inliner<'a> { else { unreachable!() }; - let err_ctx_ty = types.error_context_table_type()?; let options = self.adapter_options(frame, types, options); let options = self.canonical_options(options); - let index = self.result.trampolines.push(( - *func, - dfg::Trampoline::StreamWrite { - ty, - err_ctx_ty, - options, - }, - )); + let index = self + .result + .trampolines + .push((*func, dfg::Trampoline::StreamWrite { ty, options })); frame.funcs.push(dfg::CoreDef::Trampoline(index)); } StreamCancelRead { ty, func, async_ } => { @@ -860,11 +850,10 @@ impl<'a> Inliner<'a> { else { unreachable!() }; - let err_ctx_ty = types.error_context_table_type()?; - let index = self.result.trampolines.push(( - *func, - dfg::Trampoline::StreamCloseReadable { ty, err_ctx_ty }, - )); + let index = self + .result + .trampolines + .push((*func, dfg::Trampoline::StreamCloseReadable { ty })); frame.funcs.push(dfg::CoreDef::Trampoline(index)); } StreamCloseWritable { ty, func } => { @@ -873,11 +862,10 @@ impl<'a> Inliner<'a> { else { unreachable!() }; - let err_ctx_ty = types.error_context_table_type()?; - let index = self.result.trampolines.push(( - *func, - dfg::Trampoline::StreamCloseWritable { ty, err_ctx_ty }, - )); + let index = self + .result + .trampolines + .push((*func, dfg::Trampoline::StreamCloseWritable { ty })); frame.funcs.push(dfg::CoreDef::Trampoline(index)); } FutureNew { ty, func } => { @@ -898,17 +886,12 @@ impl<'a> Inliner<'a> { else { unreachable!() }; - let err_ctx_ty = types.error_context_table_type()?; let options = self.adapter_options(frame, types, options); let options = self.canonical_options(options); - let index = self.result.trampolines.push(( - *func, - dfg::Trampoline::FutureRead { - ty, - err_ctx_ty, - options, - }, - )); + let index = self + .result + .trampolines + .push((*func, dfg::Trampoline::FutureRead { ty, options })); frame.funcs.push(dfg::CoreDef::Trampoline(index)); } FutureWrite { ty, func, options } => { @@ -917,17 +900,12 @@ impl<'a> Inliner<'a> { else { unreachable!() }; - let err_ctx_ty = types.error_context_table_type()?; let options = self.adapter_options(frame, types, options); let options = self.canonical_options(options); - let index = self.result.trampolines.push(( - *func, - dfg::Trampoline::FutureWrite { - ty, - err_ctx_ty, - options, - }, - )); + let index = self + .result + .trampolines + .push((*func, dfg::Trampoline::FutureWrite { ty, options })); frame.funcs.push(dfg::CoreDef::Trampoline(index)); } FutureCancelRead { ty, func, async_ } => { @@ -966,11 +944,10 @@ impl<'a> Inliner<'a> { else { unreachable!() }; - let err_ctx_ty = types.error_context_table_type()?; - let index = self.result.trampolines.push(( - *func, - dfg::Trampoline::FutureCloseReadable { ty, err_ctx_ty }, - )); + let index = self + .result + .trampolines + .push((*func, dfg::Trampoline::FutureCloseReadable { ty })); frame.funcs.push(dfg::CoreDef::Trampoline(index)); } FutureCloseWritable { ty, func } => { @@ -979,11 +956,10 @@ impl<'a> Inliner<'a> { else { unreachable!() }; - let err_ctx_ty = types.error_context_table_type()?; - let index = self.result.trampolines.push(( - *func, - dfg::Trampoline::FutureCloseWritable { ty, err_ctx_ty }, - )); + let index = self + .result + .trampolines + .push((*func, dfg::Trampoline::FutureCloseWritable { ty })); frame.funcs.push(dfg::CoreDef::Trampoline(index)); } ErrorContextNew { func, options } => { diff --git a/crates/misc/component-async-tests/tests/scenario/error_context.rs b/crates/misc/component-async-tests/tests/scenario/error_context.rs index 94ccdb822c..25c07f2115 100644 --- a/crates/misc/component-async-tests/tests/scenario/error_context.rs +++ b/crates/misc/component-async-tests/tests/scenario/error_context.rs @@ -26,49 +26,3 @@ async fn async_error_context_roundtrip() -> Result<()> { let callee = &fs::read(test_programs_artifacts::ASYNC_ERROR_CONTEXT_CALLEE_COMPONENT).await?; test_run(&compose(caller, callee).await?).await } - -// No-op function; we only test this by composing it in `async_error_context_stream_callee` -#[allow( - dead_code, - reason = "here only to make the `assert_test_exists` macro happy" -)] -pub fn async_error_context_stream_callee() {} - -// No-op function; we only test this by composing it in `async_error_context_stream_caller` -#[allow( - dead_code, - reason = "here only to make the `assert_test_exists` macro happy" -)] -pub fn async_error_context_stream_caller() {} - -#[tokio::test] -async fn async_stream_end_err() -> Result<()> { - let caller = - &fs::read(test_programs_artifacts::ASYNC_ERROR_CONTEXT_STREAM_CALLER_COMPONENT).await?; - let callee = - &fs::read(test_programs_artifacts::ASYNC_ERROR_CONTEXT_STREAM_CALLEE_COMPONENT).await?; - test_run(&compose(caller, callee).await?).await -} - -// No-op function; we only test this by composing it in `async_future_end_err` -#[allow( - dead_code, - reason = "here only to make the `assert_test_exists` macro happy" -)] -pub fn async_error_context_future_callee() {} - -// No-op function; we only test this by composing it in `async_future_end_err` -#[allow( - dead_code, - reason = "here only to make the `assert_test_exists` macro happy" -)] -pub fn async_error_context_future_caller() {} - -#[tokio::test] -async fn async_future_end_err() -> Result<()> { - let caller = - &fs::read(test_programs_artifacts::ASYNC_ERROR_CONTEXT_FUTURE_CALLER_COMPONENT).await?; - let callee = - &fs::read(test_programs_artifacts::ASYNC_ERROR_CONTEXT_FUTURE_CALLEE_COMPONENT).await?; - test_run(&compose(caller, callee).await?).await -} diff --git a/crates/misc/component-async-tests/tests/test_all.rs b/crates/misc/component-async-tests/tests/test_all.rs index 515a273c28..f5291206bd 100644 --- a/crates/misc/component-async-tests/tests/test_all.rs +++ b/crates/misc/component-async-tests/tests/test_all.rs @@ -15,8 +15,6 @@ use scenario::backpressure::{async_backpressure_callee, async_backpressure_calle use scenario::borrowing::{async_borrowing_callee, async_borrowing_caller}; use scenario::error_context::{ async_error_context, async_error_context_callee, async_error_context_caller, - async_error_context_future_callee, async_error_context_future_caller, - async_error_context_stream_callee, async_error_context_stream_caller, }; use scenario::post_return::{async_post_return_callee, async_post_return_caller}; use scenario::proxy::{async_http_echo, async_http_middleware, async_http_middleware_with_chain}; diff --git a/crates/test-programs/src/bin/async_closed_streams.rs b/crates/test-programs/src/bin/async_closed_streams.rs index 53540bb226..931a94ad5e 100644 --- a/crates/test-programs/src/bin/async_closed_streams.rs +++ b/crates/test-programs/src/bin/async_closed_streams.rs @@ -11,18 +11,20 @@ mod bindings { use { bindings::exports::local::local::closed::Guest, - wit_bindgen_rt::async_support::{futures::StreamExt, FutureReader, StreamReader}, + wit_bindgen_rt::async_support::{FutureReader, StreamReader, StreamResult}, }; struct Component; impl Guest for Component { async fn read_stream(mut rx: StreamReader, expected: Vec) { - assert_eq!(rx.next().await.unwrap().unwrap(), expected); + let (result, buf) = rx.read(Vec::with_capacity(expected.len())).await; + assert_eq!(result, StreamResult::Complete(expected.len())); + assert_eq!(buf, expected); } async fn read_future(rx: FutureReader, expected: u8, _rx_ignored: FutureReader) { - assert_eq!(rx.await.unwrap().unwrap(), expected); + assert_eq!(rx.await.unwrap(), expected); } } diff --git a/crates/test-programs/src/bin/async_error_context_future_callee.rs b/crates/test-programs/src/bin/async_error_context_future_callee.rs deleted file mode 100644 index 98a27d951c..0000000000 --- a/crates/test-programs/src/bin/async_error_context_future_callee.rs +++ /dev/null @@ -1,37 +0,0 @@ -mod bindings { - wit_bindgen::generate!({ - path: "../misc/component-async-tests/wit", - world: "error-context-future-callee", - async: { - exports: [ - "local:local/run#run", - "local:local/run-future#produce-then-error", - ], - } - }); - - use super::Component; - export!(Component); -} - -use bindings::wit_future; -use wit_bindgen_rt::async_support::{self, ErrorContext, FutureReader}; - -struct Component; - -impl bindings::exports::local::local::run_future::Guest for Component { - async fn produce_then_error() -> FutureReader<()> { - let (mut tx, rx) = wit_future::new(); - async_support::spawn(async move { - tx.close_with_error(ErrorContext::new("error".into())); - }); - rx - } -} - -impl bindings::exports::local::local::run::Guest for Component { - async fn run() {} -} - -// Unused function; required since this file is built as a `bin`: -fn main() {} diff --git a/crates/test-programs/src/bin/async_error_context_future_caller.rs b/crates/test-programs/src/bin/async_error_context_future_caller.rs deleted file mode 100644 index d209a065a4..0000000000 --- a/crates/test-programs/src/bin/async_error_context_future_caller.rs +++ /dev/null @@ -1,35 +0,0 @@ -mod bindings { - wit_bindgen::generate!({ - path: "../misc/component-async-tests/wit", - world: "error-context-future-caller", - async: { - imports: [ - "local:local/run-future#run-error", - ], - exports: [ - "local:local/run#run", - ], - } - }); - - use super::Component; - export!(Component); -} -use std::future::IntoFuture; - -use bindings::exports::local::local::run::Guest; - -struct Component; - -impl Guest for Component { - async fn run() { - let future = bindings::local::local::run_future::produce_then_error(); - let Some(Err(e)) = future.into_future().await else { - panic!("missing expected error"); - }; - let _ = e.debug_message(); - } -} - -// Unused function; required since this file is built as a `bin`: -fn main() {} diff --git a/crates/test-programs/src/bin/async_error_context_stream_callee.rs b/crates/test-programs/src/bin/async_error_context_stream_callee.rs deleted file mode 100644 index 8d0a60133c..0000000000 --- a/crates/test-programs/src/bin/async_error_context_stream_callee.rs +++ /dev/null @@ -1,40 +0,0 @@ -mod bindings { - wit_bindgen::generate!({ - path: "../misc/component-async-tests/wit", - world: "error-context-stream-callee", - async: { - exports: [ - "local:local/run#run", - "local:local/run-stream#produce-then-error", - ], - } - }); - - use super::Component; - export!(Component); -} -use bindings::wit_stream; -use wit_bindgen_rt::async_support::futures::SinkExt; -use wit_bindgen_rt::async_support::{self, ErrorContext, StreamReader}; - -struct Component; - -impl bindings::exports::local::local::run_stream::Guest for Component { - async fn produce_then_error(times: u32) -> StreamReader<()> { - let (mut tx, rx) = wit_stream::new(); - async_support::spawn(async move { - for _ in 0..times { - let _ = tx.send(vec![()]).await; - } - tx.close_with_error(ErrorContext::new("error".into())); - }); - rx - } -} - -impl bindings::exports::local::local::run::Guest for Component { - async fn run() {} -} - -// Unused function; required since this file is built as a `bin`: -fn main() {} diff --git a/crates/test-programs/src/bin/async_error_context_stream_caller.rs b/crates/test-programs/src/bin/async_error_context_stream_caller.rs deleted file mode 100644 index 39ad74c6d8..0000000000 --- a/crates/test-programs/src/bin/async_error_context_stream_caller.rs +++ /dev/null @@ -1,43 +0,0 @@ -mod bindings { - wit_bindgen::generate!({ - path: "../misc/component-async-tests/wit", - world: "error-context-stream-caller", - async: { - imports: [ - "local:local/run-stream#run-error", - ], - exports: [ - "local:local/run#run", - ], - } - }); - - use super::Component; - export!(Component); -} -use bindings::exports::local::local::run::Guest; -use futures::StreamExt; - -struct Component; - -impl Guest for Component { - async fn run() { - let mut stream = bindings::local::local::run_stream::produce_then_error(2); - let Some(_) = stream.next().await else { - panic!("unexpected send #1"); - }; - let Some(_) = stream.next().await else { - panic!("unexpected send #1"); - }; - let Some(Err(e)) = stream.next().await else { - panic!("missing expected error context"); - }; - let _ = e.debug_message(); - let None = stream.next().await else { - panic!("unexpected object post error end"); - }; - } -} - -// Unused function; required since this file is built as a `bin`: -fn main() {} diff --git a/crates/test-programs/src/bin/async_http_echo.rs b/crates/test-programs/src/bin/async_http_echo.rs index b11d1624ee..cf04c66856 100644 --- a/crates/test-programs/src/bin/async_http_echo.rs +++ b/crates/test-programs/src/bin/async_http_echo.rs @@ -22,8 +22,7 @@ use { wasi::http::types::{Body, ErrorCode, Request, Response}, wit_future, wit_stream, }, - futures::{SinkExt, StreamExt}, - wit_bindgen_rt::async_support, + wit_bindgen_rt::async_support::{self, StreamResult}, }; struct Component; @@ -44,18 +43,27 @@ impl Handler for Component { async_support::spawn(async move { let mut body_rx = body.stream().unwrap(); - while let Some(Ok(chunk)) = body_rx.next().await { - pipe_tx.send(chunk).await.unwrap(); + let mut chunk = Vec::with_capacity(1024); + loop { + let (status, buf) = body_rx.read(chunk).await; + chunk = buf; + match status { + StreamResult::Complete(_) => { + chunk = pipe_tx.write_all(chunk).await; + assert!(chunk.is_empty()); + } + StreamResult::Closed => break, + // FIXME(WebAssembly/component-model#490): this should + // be a panic but will require some spec changes because + // right now this and `Complete(0)` are the same. + StreamResult::Cancelled => {} + } } drop(pipe_tx); - if let Some(trailers) = Body::finish(body) - .await - .transpose() - .expect("stream not closed early w/ error") - { - trailers_tx.write(trailers).await; + if let Some(trailers) = Body::finish(body).await { + trailers_tx.write(trailers).await.unwrap(); } }); diff --git a/crates/test-programs/src/bin/async_http_middleware.rs b/crates/test-programs/src/bin/async_http_middleware.rs index 44a5b50224..a2c0b5f7ff 100644 --- a/crates/test-programs/src/bin/async_http_middleware.rs +++ b/crates/test-programs/src/bin/async_http_middleware.rs @@ -29,9 +29,8 @@ use { write::{DeflateDecoder, DeflateEncoder}, Compression, }, - futures::{SinkExt, StreamExt}, std::{io::Write, mem}, - wit_bindgen_rt::async_support, + wit_bindgen_rt::async_support::{self, StreamResult}, }; struct Component; @@ -78,22 +77,23 @@ impl Handler for Component { let mut decoder = DeflateDecoder::new(Vec::new()); - while let Some(Ok(chunk)) = body_rx.next().await { + let (mut status, mut chunk) = body_rx.read(Vec::with_capacity(64 * 1024)).await; + while let StreamResult::Complete(_) = status { decoder.write_all(&chunk).unwrap(); - pipe_tx.send(mem::take(decoder.get_mut())).await.unwrap(); + let remaining = pipe_tx.write_all(mem::take(decoder.get_mut())).await; + assert!(remaining.is_empty()); + *decoder.get_mut() = remaining; + (status, chunk) = body_rx.read(chunk).await; } - pipe_tx.send(decoder.finish().unwrap()).await.unwrap(); + let remaining = pipe_tx.write_all(decoder.finish().unwrap()).await; + assert!(remaining.is_empty()); drop(pipe_tx); } - if let Some(trailers) = Body::finish(body) - .await - .transpose() - .expect("stream not closed early w/ error") - { - trailers_tx.write(trailers).await; + if let Some(trailers) = Body::finish(body).await { + trailers_tx.write(trailers).await.unwrap(); } }); @@ -136,23 +136,24 @@ impl Handler for Component { let mut body_rx = body.stream().unwrap(); let mut encoder = DeflateEncoder::new(Vec::new(), Compression::fast()); + let (mut status, mut chunk) = body_rx.read(Vec::with_capacity(64 * 1024)).await; - while let Some(Ok(chunk)) = body_rx.next().await { + while let StreamResult::Complete(_) = status { encoder.write_all(&chunk).unwrap(); - pipe_tx.send(mem::take(encoder.get_mut())).await.unwrap(); + let remaining = pipe_tx.write_all(mem::take(encoder.get_mut())).await; + assert!(remaining.is_empty()); + *encoder.get_mut() = remaining; + (status, chunk) = body_rx.read(chunk).await; } - pipe_tx.send(encoder.finish().unwrap()).await.unwrap(); + let remaining = pipe_tx.write_all(encoder.finish().unwrap()).await; + assert!(remaining.is_empty()); drop(pipe_tx); } - if let Some(trailers) = Body::finish(body) - .await - .transpose() - .expect("stream not closed early w/ error") - { - trailers_tx.write(trailers).await; + if let Some(trailers) = Body::finish(body).await { + trailers_tx.write(trailers).await.unwrap(); } }); diff --git a/crates/test-programs/src/bin/async_read_resource_stream.rs b/crates/test-programs/src/bin/async_read_resource_stream.rs index a9dd1f61be..59f2447941 100644 --- a/crates/test-programs/src/bin/async_read_resource_stream.rs +++ b/crates/test-programs/src/bin/async_read_resource_stream.rs @@ -13,10 +13,7 @@ mod bindings { export!(Component); } -use { - bindings::{exports::local::local::run::Guest, local::local::resource_stream}, - futures::StreamExt, -}; +use bindings::{exports::local::local::run::Guest, local::local::resource_stream}; struct Component; @@ -25,16 +22,14 @@ impl Guest for Component { let mut count = 7; let mut stream = resource_stream::foo(count); - while let Some(Ok(chunk)) = stream.next().await { - for x in chunk { - if count > 0 { - count -= 1; - } else { - panic!("received too many items"); - } - - x.foo() + while let Some(x) = stream.next().await { + if count > 0 { + count -= 1; + } else { + panic!("received too many items"); } + + x.foo() } if count != 0 { diff --git a/crates/test-programs/src/bin/async_transmit_callee.rs b/crates/test-programs/src/bin/async_transmit_callee.rs index 8824da59d7..928c17f131 100644 --- a/crates/test-programs/src/bin/async_transmit_callee.rs +++ b/crates/test-programs/src/bin/async_transmit_callee.rs @@ -18,7 +18,6 @@ use { exports::local::local::transmit::{Control, Guest}, wit_future, wit_stream, }, - futures::{SinkExt, StreamExt}, std::future::IntoFuture, wit_bindgen_rt::async_support::{self, FutureReader, StreamReader}, }; @@ -44,24 +43,27 @@ impl Guest for Component { let mut caller_future_rx1 = Some(caller_future_rx1); let mut callee_future_tx1 = Some(callee_future_tx1); - while let Some(Ok(messages)) = control_rx.next().await { - for message in messages { - match message { - Control::ReadStream(value) => { - assert_eq!(caller_stream_rx.next().await, Some(Ok(vec![value]))); - } - Control::ReadFuture(value) => { - assert_eq!( - caller_future_rx1.take().unwrap().into_future().await, - Some(Ok(value)) - ); - } - Control::WriteStream(value) => { - callee_stream_tx.send(vec![value]).await.unwrap(); - } - Control::WriteFuture(value) => { - callee_future_tx1.take().unwrap().write(value).await; - } + while let Some(message) = control_rx.next().await { + match message { + Control::ReadStream(value) => { + assert_eq!(caller_stream_rx.next().await, Some(value)); + } + Control::ReadFuture(value) => { + assert_eq!( + caller_future_rx1.take().unwrap().into_future().await, + Some(value) + ); + } + Control::WriteStream(value) => { + assert!(callee_stream_tx.write_one(value).await.is_none()); + } + Control::WriteFuture(value) => { + callee_future_tx1 + .take() + .unwrap() + .write(value) + .await + .unwrap(); } } } diff --git a/crates/test-programs/src/bin/async_transmit_caller.rs b/crates/test-programs/src/bin/async_transmit_caller.rs index bdc75bfde9..bdf054b198 100644 --- a/crates/test-programs/src/bin/async_transmit_caller.rs +++ b/crates/test-programs/src/bin/async_transmit_caller.rs @@ -22,12 +22,13 @@ use { local::local::transmit::{self, Control}, wit_future, wit_stream, }, - futures::{future, FutureExt, SinkExt, StreamExt}, + futures::{future, FutureExt}, std::{ future::{Future, IntoFuture}, pin::pin, task::Poll, }, + wit_bindgen_rt::async_support::FutureWriteCancel, }; struct Component; @@ -48,91 +49,95 @@ impl Guest for Component { .await; // Tell peer to read from its end of the stream and assert that the result matches an expected value. - control_tx - .send(vec![Control::ReadStream("a".into())]) + assert!(control_tx + .write_one(Control::ReadStream("a".into())) .await - .unwrap(); - caller_stream_tx.send(vec!["a".into()]).await.unwrap(); + .is_none()); + assert!(caller_stream_tx.write_one("a".into()).await.is_none()); // Start writing another value, but cancel the write before telling the peer to read. { - let send = caller_stream_tx.send(vec!["b".into()]); + let send = Box::pin(caller_stream_tx.write_one("b".into())); assert!(poll(send).await.is_err()); - caller_stream_tx.cancel(); } // Tell the peer to read an expected value again, which should _not_ match the value provided in the // canceled write above. - control_tx - .send(vec![Control::ReadStream("c".into())]) + assert!(control_tx + .write_one(Control::ReadStream("c".into())) .await - .unwrap(); - caller_stream_tx.send(vec!["c".into()]).await.unwrap(); + .is_none()); + assert!(caller_stream_tx.write_one("c".into()).await.is_none()); // Start writing a value to the future, but cancel the write before telling the peer to read. { - let send = caller_future_tx1.write("x".into()); + let send = Box::pin(caller_future_tx1.write("x".into())); match poll(send).await { Ok(_) => panic!(), - Err(send) => caller_future_tx1 = send.cancel(), + Err(mut send) => { + caller_future_tx1 = match send.as_mut().cancel() { + FutureWriteCancel::AlreadySent => unreachable!(), + FutureWriteCancel::Closed(_) => unreachable!(), + FutureWriteCancel::Cancelled(_, writer) => writer, + } + } } } // Tell the peer to read an expected value again, which should _not_ match the value provided in the // canceled write above. - control_tx - .send(vec![Control::ReadFuture("y".into())]) + assert!(control_tx + .write_one(Control::ReadFuture("y".into())) .await - .unwrap(); - caller_future_tx1.write("y".into()).await; + .is_none()); + caller_future_tx1.write("y".into()).await.unwrap(); // Tell the peer to write a value to its end of the stream, then read from our end and assert the value // matches. - control_tx - .send(vec![Control::WriteStream("a".into())]) + assert!(control_tx + .write_one(Control::WriteStream("a".into())) .await - .unwrap(); - assert_eq!(callee_stream_rx.next().await, Some(Ok(vec!["a".into()]))); + .is_none()); + assert_eq!(callee_stream_rx.next().await, Some("a".into())); // Start reading a value from the stream, but cancel the read before telling the peer to write. { - let next = callee_stream_rx.next(); + let next = Box::pin(callee_stream_rx.read(Vec::with_capacity(1))); assert!(poll(next).await.is_err()); - callee_stream_rx.cancel(); } // Once again, tell the peer to write a value to its end of the stream, then read from our end and assert // the value matches. - control_tx - .send(vec![Control::WriteStream("b".into())]) + assert!(control_tx + .write_one(Control::WriteStream("b".into())) .await - .unwrap(); - assert_eq!(callee_stream_rx.next().await, Some(Ok(vec!["b".into()]))); + .is_none()); + assert_eq!(callee_stream_rx.next().await, Some("b".into())); // Start reading a value from the future, but cancel the read before telling the peer to write. { - let next = callee_future_rx1.into_future(); + let next = Box::pin(callee_future_rx1.into_future()); match poll(next).await { Ok(_) => panic!(), - Err(next) => callee_future_rx1 = next.cancel(), + Err(mut next) => callee_future_rx1 = next.as_mut().cancel().unwrap_err(), } } // Tell the peer to write a value to its end of the future, then read from our end and assert the value // matches. - control_tx - .send(vec![Control::WriteFuture("b".into())]) + assert!(control_tx + .write_one(Control::WriteFuture("b".into())) .await - .unwrap(); - assert_eq!(callee_future_rx1.into_future().await, Some(Ok("b".into()))); + .is_none()); + assert_eq!(callee_future_rx1.into_future().await, Some("b".into())); // Start writing a value to the stream, but drop the stream without telling the peer to read. - let send = caller_stream_tx.send(vec!["d".into()]); + let send = Box::pin(caller_stream_tx.write_one("d".into())); assert!(poll(send).await.is_err()); drop(caller_stream_tx); // Start reading a value from the stream, but drop the stream without telling the peer to write. - let next = callee_stream_rx.next(); + let next = Box::pin(callee_stream_rx.next()); assert!(poll(next).await.is_err()); drop(callee_stream_rx); @@ -144,7 +149,7 @@ impl Guest for Component { // Start reading a value from the future, but drop the read without telling the peer to write. { - let next = callee_future_rx2.into_future(); + let next = Box::pin(callee_future_rx2.into_future()); assert!(poll(next).await.is_err()); } } diff --git a/crates/test-programs/src/bin/async_unit_stream_callee.rs b/crates/test-programs/src/bin/async_unit_stream_callee.rs index ffd9ed380f..27e972d6a4 100644 --- a/crates/test-programs/src/bin/async_unit_stream_callee.rs +++ b/crates/test-programs/src/bin/async_unit_stream_callee.rs @@ -15,7 +15,6 @@ mod bindings { use { bindings::{exports::local::local::unit_stream::Guest, wit_stream}, - futures::SinkExt, wit_bindgen_rt::async_support::{self, StreamReader}, }; @@ -30,9 +29,8 @@ impl Guest for Component { let mut chunk_size = 1; while sent < count { let n = (count - sent).min(chunk_size); - tx.send(vec![(); usize::try_from(n).unwrap()]) - .await - .unwrap(); + let remaining = tx.write_all(vec![(); usize::try_from(n).unwrap()]).await; + assert!(remaining.is_empty()); sent += n; chunk_size *= 2; } diff --git a/crates/test-programs/src/bin/async_unit_stream_caller.rs b/crates/test-programs/src/bin/async_unit_stream_caller.rs index 22027e13d2..0ab5d6c3ef 100644 --- a/crates/test-programs/src/bin/async_unit_stream_caller.rs +++ b/crates/test-programs/src/bin/async_unit_stream_caller.rs @@ -16,22 +16,16 @@ mod bindings { export!(Component); } -use { - bindings::{exports::local::local::run::Guest, local::local::unit_stream}, - futures::StreamExt, -}; +use bindings::{exports::local::local::run::Guest, local::local::unit_stream}; struct Component; impl Guest for Component { async fn run() { let count = 42; - let mut rx = unit_stream::run(count).await; + let rx = unit_stream::run(count).await; - let mut received = 0; - while let Some(Ok(chunk)) = rx.next().await { - received += chunk.len(); - } + let received = rx.collect().await.len(); assert_eq!(count, u32::try_from(received).unwrap()); } diff --git a/crates/test-programs/src/bin/filesystem_0_3_file_read_write.rs b/crates/test-programs/src/bin/filesystem_0_3_file_read_write.rs index dc19a16145..1d7cfe90b1 100644 --- a/crates/test-programs/src/bin/filesystem_0_3_file_read_write.rs +++ b/crates/test-programs/src/bin/filesystem_0_3_file_read_write.rs @@ -1,4 +1,4 @@ -use futures::{join, SinkExt as _, TryStreamExt as _}; +use futures::join; use test_programs::p3::wasi::filesystem::types::{DescriptorFlags, OpenFlags, PathFlags}; use test_programs::p3::{wasi, wit_stream}; @@ -26,14 +26,16 @@ impl test_programs::p3::exports::wasi::cli::run::Guest for Component { file.write_via_stream(data_rx, 5).await.unwrap(); }, async { - data_tx.send(b"Hello, ".to_vec()).await.unwrap(); - data_tx.send(b"World!".to_vec()).await.unwrap(); + let remaining = data_tx.write_all(b"Hello, ".to_vec()).await; + assert!(remaining.is_empty()); + let remaining = data_tx.write_all(b"World!".to_vec()).await; + assert!(remaining.is_empty()); drop(data_tx); }, ); let (data_rx, data_fut) = file.read_via_stream(0); - let contents = data_rx.try_collect::>().await.unwrap().concat(); - data_fut.await.unwrap().unwrap().unwrap(); + let contents = data_rx.collect().await; + data_fut.await.unwrap().unwrap(); assert_eq!( String::from_utf8_lossy(&contents), "\0\0\0\0\0Hello, World!" @@ -41,8 +43,8 @@ impl test_programs::p3::exports::wasi::cli::run::Guest for Component { // Test that file read streams behave like other read streams. let (data_rx, data_fut) = file.read_via_stream(5); - let contents = data_rx.try_collect::>().await.unwrap().concat(); - data_fut.await.unwrap().unwrap().unwrap(); + let contents = data_rx.collect().await; + data_fut.await.unwrap().unwrap(); assert_eq!(String::from_utf8_lossy(&contents), "Hello, World!"); dir.unlink_file_at(filename).unwrap(); diff --git a/crates/test-programs/src/bin/sockets_0_3_tcp_bind.rs b/crates/test-programs/src/bin/sockets_0_3_tcp_bind.rs index a725a48b56..1e0774836e 100644 --- a/crates/test-programs/src/bin/sockets_0_3_tcp_bind.rs +++ b/crates/test-programs/src/bin/sockets_0_3_tcp_bind.rs @@ -1,4 +1,4 @@ -use futures::{join, SinkExt as _, StreamExt as _}; +use futures::join; use test_programs::p3::sockets::attempt_random_port; use test_programs::p3::wasi::sockets::types::{ ErrorCode, IpAddress, IpAddressFamily, IpSocketAddress, TcpSocket, @@ -66,16 +66,15 @@ async fn test_tcp_bind_reuseaddr(ip: IpAddress) { client.connect(connect_addr).await.unwrap(); }, async { - let mut sock = accept.next().await.unwrap().unwrap(); - assert_eq!(sock.len(), 1); - let sock = sock.pop().unwrap(); + let sock = accept.next().await.unwrap(); let (mut data_tx, data_rx) = wit_stream::new(); join!( async { sock.send(data_rx).await.unwrap(); }, async { - data_tx.send(vec![0; 10]).await.unwrap(); + let remaining = data_tx.write_all(vec![0; 10]).await; + assert!(remaining.is_empty()); drop(data_tx); } ); diff --git a/crates/test-programs/src/bin/sockets_0_3_tcp_connect.rs b/crates/test-programs/src/bin/sockets_0_3_tcp_connect.rs index cc394724dd..a8be14003e 100644 --- a/crates/test-programs/src/bin/sockets_0_3_tcp_connect.rs +++ b/crates/test-programs/src/bin/sockets_0_3_tcp_connect.rs @@ -1,4 +1,4 @@ -use futures::{join, StreamExt as _}; +use futures::join; use test_programs::p3::wasi::sockets::types::{ ErrorCode, IpAddress, IpAddressFamily, IpSocketAddress, TcpSocket, }; @@ -117,7 +117,7 @@ async fn test_tcp_connect_explicit_bind(family: IpAddressFamily) { client.connect(listener_address).await.unwrap(); }, async { - accept.next().await.unwrap().unwrap(); + accept.next().await.unwrap(); } ); } diff --git a/crates/test-programs/src/bin/sockets_0_3_tcp_sample_application.rs b/crates/test-programs/src/bin/sockets_0_3_tcp_sample_application.rs index 9a093765e3..f37b92e113 100644 --- a/crates/test-programs/src/bin/sockets_0_3_tcp_sample_application.rs +++ b/crates/test-programs/src/bin/sockets_0_3_tcp_sample_application.rs @@ -1,8 +1,9 @@ -use futures::{join, SinkExt as _, StreamExt as _}; +use futures::join; use test_programs::p3::wasi::sockets::types::{ IpAddressFamily, IpSocketAddress, Ipv4SocketAddress, Ipv6SocketAddress, TcpSocket, }; use test_programs::p3::wit_stream; +use wit_bindgen_rt::async_support::StreamResult; struct Component; @@ -30,23 +31,26 @@ async fn test_tcp_sample_application(family: IpAddressFamily, bind_address: IpSo client.send(data_rx).await.unwrap(); }, async { - data_tx.send(vec![]).await.unwrap(); - data_tx.send(first_message.into()).await.unwrap(); + let (result, _) = data_tx.write(vec![]).await; + // FIXME(WebAssembly/component-model#490): this should be a + // panic but will require some spec changes because right + // now this and `Complete(0)` are the same. + assert_eq!(result, StreamResult::Cancelled); + let remaining = data_tx.write_all(first_message.into()).await; + assert!(remaining.is_empty()); drop(data_tx); } ); }, async { - let mut sock = accept.next().await.unwrap().unwrap(); - assert_eq!(sock.len(), 1); - let sock = sock.pop().unwrap(); - + let sock = accept.next().await.unwrap(); let (mut data_rx, fut) = sock.receive(); - let data = data_rx.next().await.unwrap().unwrap(); + let (result, data) = data_rx.read(Vec::with_capacity(100)).await; + assert_eq!(result, StreamResult::Complete(first_message.len())); // Check that we sent and received our message! assert_eq!(data, first_message); // Not guaranteed to work but should work in practice. - fut.await.unwrap().unwrap().unwrap() + fut.await.unwrap().unwrap() }, ); @@ -61,21 +65,21 @@ async fn test_tcp_sample_application(family: IpAddressFamily, bind_address: IpSo client.send(data_rx).await.unwrap(); }, async { - data_tx.send(second_message.into()).await.unwrap(); + let remaining = data_tx.write_all(second_message.into()).await; + assert!(remaining.is_empty()); drop(data_tx); } ); }, async { - let mut sock = accept.next().await.unwrap().unwrap(); - assert_eq!(sock.len(), 1); - let sock = sock.pop().unwrap(); + let sock = accept.next().await.unwrap(); let (mut data_rx, fut) = sock.receive(); - let data = data_rx.next().await.unwrap().unwrap(); + let (result, data) = data_rx.read(Vec::with_capacity(100)).await; + assert_eq!(result, StreamResult::Complete(second_message.len())); // Check that we sent and received our message! assert_eq!(data, second_message); // Not guaranteed to work but should work in practice. - fut.await.unwrap().unwrap().unwrap() + fut.await.unwrap().unwrap() } ); } diff --git a/crates/test-programs/src/bin/sockets_0_3_tcp_sockopts.rs b/crates/test-programs/src/bin/sockets_0_3_tcp_sockopts.rs index f52fd6e866..b1636fd068 100644 --- a/crates/test-programs/src/bin/sockets_0_3_tcp_sockopts.rs +++ b/crates/test-programs/src/bin/sockets_0_3_tcp_sockopts.rs @@ -1,4 +1,4 @@ -use futures::{join, StreamExt as _}; +use futures::join; use test_programs::p3::wasi::sockets::types::{ ErrorCode, IpAddress, IpAddressFamily, IpSocketAddress, TcpSocket, }; @@ -136,11 +136,7 @@ async fn test_tcp_sockopt_inheritance(family: IpAddressFamily) { async { client.connect(bound_addr).await.unwrap(); }, - async { - let mut sock = accept.next().await.unwrap().unwrap(); - assert_eq!(sock.len(), 1); - sock.pop().unwrap() - } + async { accept.next().await.unwrap() } ); // Verify options on accepted socket: @@ -204,11 +200,7 @@ async fn test_tcp_sockopt_after_listen(family: IpAddressFamily) { async { client.connect(bound_addr).await.unwrap(); }, - async { - let mut sock = accept.next().await.unwrap().unwrap(); - assert_eq!(sock.len(), 1); - sock.pop().unwrap() - } + async { accept.next().await.unwrap() } ); // Verify options on accepted socket: diff --git a/crates/test-programs/src/bin/sockets_0_3_tcp_states.rs b/crates/test-programs/src/bin/sockets_0_3_tcp_states.rs index b754b9652f..060a25eded 100644 --- a/crates/test-programs/src/bin/sockets_0_3_tcp_states.rs +++ b/crates/test-programs/src/bin/sockets_0_3_tcp_states.rs @@ -1,4 +1,4 @@ -use futures::{join, StreamExt as _}; +use futures::join; use test_programs::p3::wasi::sockets::types::{ ErrorCode, IpAddress, IpAddressFamily, IpSocketAddress, TcpSocket, }; @@ -155,7 +155,7 @@ async fn test_tcp_connected_state_invariants(family: IpAddressFamily) { sock.connect(addr_listener).await.unwrap(); }, async { - accept.next().await.unwrap().unwrap(); + accept.next().await.unwrap(); } ); diff --git a/crates/test-programs/src/bin/sockets_0_3_tcp_streams.rs b/crates/test-programs/src/bin/sockets_0_3_tcp_streams.rs index ce6669b4a2..b2e100e575 100644 --- a/crates/test-programs/src/bin/sockets_0_3_tcp_streams.rs +++ b/crates/test-programs/src/bin/sockets_0_3_tcp_streams.rs @@ -1,6 +1,6 @@ use core::future::Future; -use futures::{join, SinkExt as _, StreamExt as _, TryStreamExt as _}; +use futures::join; use test_programs::p3::wasi::sockets::types::{ IpAddress, IpAddressFamily, IpSocketAddress, TcpSocket, }; @@ -23,7 +23,7 @@ async fn test_tcp_input_stream_should_be_closed_by_remote_shutdown(family: IpAdd // Wait for the shutdown signal to reach the client: assert!(client_rx.next().await.is_none()); - assert_eq!(client_fut.await, Some(Ok(Ok(())))); + assert_eq!(client_fut.await, Some(Ok(()))); }) .await; } @@ -40,7 +40,8 @@ async fn test_tcp_input_stream_should_be_closed_by_local_shutdown(family: IpAddr // On Linux, `recv` continues to work even after `shutdown(sock, SHUT_RD)` // has been called. To properly test that this behavior doesn't happen in // WASI, we make sure there's some data to read by the client: - server_tx.send(b"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.".into()).await.unwrap(); + let rest = server_tx.write_all(b"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.".into()).await; + assert!(rest.is_empty()); drop(server_tx); }, ); @@ -50,15 +51,14 @@ async fn test_tcp_input_stream_should_be_closed_by_local_shutdown(family: IpAddr // Shut down socket locally: drop(client_rx); // Wait for the shutdown signal to reach the client: - assert_eq!(client_fut.await, Some(Ok(Ok(())))); + assert_eq!(client_fut.await, Some(Ok(()))); }).await; } /// StreamWriter should return `StreamError::Closed` after the connection has been locally shut down for sending. async fn test_tcp_output_stream_should_be_closed_by_local_shutdown(family: IpAddressFamily) { setup(family, |_server, client| async move { - let (mut client_tx, client_rx) = wit_stream::new(); - client_tx.close().await.unwrap(); + let (client_tx, client_rx) = wit_stream::new(); join!( async { client.send(client_rx).await.unwrap(); @@ -92,18 +92,19 @@ async fn test_tcp_shutdown_should_not_lose_data(family: IpAddressFamily) { client.send(client_rx).await.unwrap(); }, async { - client_tx.send(outgoing_data.clone()).await.unwrap(); + let ret = client_tx.write_all(outgoing_data.clone()).await; + assert!(ret.is_empty()); drop(client_tx); }, async { // The peer should receive _all_ data: let (server_rx, server_fut) = server.receive(); - let incoming_data = server_rx.try_collect::>().await.unwrap().concat(); + let incoming_data = server_rx.collect().await; assert_eq!( outgoing_data, incoming_data, "Received data should match the sent data" ); - server_fut.await.unwrap().unwrap().unwrap() + server_fut.await.unwrap().unwrap() }, ); }) @@ -144,11 +145,7 @@ async fn setup>( async { client_socket.connect(bound_address).await.unwrap(); }, - async { - let mut accepted_socket = accept.next().await.unwrap().unwrap(); - assert_eq!(accepted_socket.len(), 1); - accepted_socket.pop().unwrap() - }, + async { accept.next().await.unwrap() }, ); body(accepted_socket, client_socket).await; } diff --git a/crates/wasi-preview1-component-adapter/src/lib.rs b/crates/wasi-preview1-component-adapter/src/lib.rs index ad077ca9ed..0f75b713a9 100644 --- a/crates/wasi-preview1-component-adapter/src/lib.rs +++ b/crates/wasi-preview1-component-adapter/src/lib.rs @@ -62,7 +62,12 @@ pub mod bindings { // can't support in these special core-wasm adapters. // Instead, we manually define the bindings for these functions in // terms of raw pointers. - skip: ["run", "get-environment", "poll"], + skip: [ + "run", + "get-environment", + "poll", + "[method]outgoing-datagram-stream.send", + ], generate_all, disable_custom_section_link_helpers: true, }); @@ -79,7 +84,11 @@ pub mod bindings { // can't support in these special core-wasm adapters. // Instead, we manually define the bindings for these functions in // terms of raw pointers. - skip: ["get-environment", "poll"], + skip: [ + "get-environment", + "poll", + "[method]outgoing-datagram-stream.send", + ], generate_all, disable_custom_section_link_helpers: true, }); @@ -102,7 +111,7 @@ pub mod bindings { world: "wasmtime:adapter/adapter", raw_strings, runtime_path: "crate::bindings::wit_bindgen_rt_shim", - skip: ["poll"], + skip: ["poll", "[method]outgoing-datagram-stream.send"], generate_all, disable_custom_section_link_helpers: true, }); diff --git a/crates/wasmtime/src/runtime/component/concurrent.rs b/crates/wasmtime/src/runtime/component/concurrent.rs index f9d9b4604f..6f31e898ba 100644 --- a/crates/wasmtime/src/runtime/component/concurrent.rs +++ b/crates/wasmtime/src/runtime/component/concurrent.rs @@ -57,7 +57,7 @@ use { }; pub(crate) use futures_and_streams::{ - lower_error_context_to_index, lower_future_to_index, lower_stream_to_index, + lower_error_context_to_index, lower_future_to_index, lower_stream_to_index, ResourcePair, }; pub use futures_and_streams::{ ErrorContext, FutureReader, FutureWriter, HostFuture, HostStream, Single, StreamReader, @@ -2429,7 +2429,6 @@ pub unsafe trait VMComponentAsyncStore { string_encoding: u8, async_: bool, ty: TypeFutureTableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, future: u32, address: u32, ) -> Result; @@ -2443,7 +2442,6 @@ pub unsafe trait VMComponentAsyncStore { string_encoding: u8, async_: bool, ty: TypeFutureTableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, future: u32, address: u32, ) -> Result; @@ -2457,7 +2455,6 @@ pub unsafe trait VMComponentAsyncStore { string_encoding: u8, async_: bool, ty: TypeStreamTableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, stream: u32, address: u32, count: u32, @@ -2472,7 +2469,6 @@ pub unsafe trait VMComponentAsyncStore { string_encoding: u8, async_: bool, ty: TypeStreamTableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, stream: u32, address: u32, count: u32, @@ -2487,7 +2483,6 @@ pub unsafe trait VMComponentAsyncStore { realloc: *mut VMFuncRef, async_: bool, ty: TypeStreamTableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, payload_size: u32, payload_align: u32, stream: u32, @@ -2504,7 +2499,6 @@ pub unsafe trait VMComponentAsyncStore { realloc: *mut VMFuncRef, async_: bool, ty: TypeStreamTableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, payload_size: u32, payload_align: u32, stream: u32, @@ -2637,7 +2631,6 @@ unsafe impl VMComponentAsyncStore for StoreInner { string_encoding: u8, async_: bool, ty: TypeFutureTableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, future: u32, address: u32, ) -> Result { @@ -2648,7 +2641,6 @@ unsafe impl VMComponentAsyncStore for StoreInner { string_encoding, async_, TableIndex::Future(ty), - err_ctx_ty, None, future, address, @@ -2664,7 +2656,6 @@ unsafe impl VMComponentAsyncStore for StoreInner { string_encoding: u8, async_: bool, ty: TypeFutureTableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, future: u32, address: u32, ) -> Result { @@ -2675,7 +2666,6 @@ unsafe impl VMComponentAsyncStore for StoreInner { string_encoding, async_, TableIndex::Future(ty), - err_ctx_ty, None, future, address, @@ -2691,7 +2681,6 @@ unsafe impl VMComponentAsyncStore for StoreInner { string_encoding: u8, async_: bool, ty: TypeStreamTableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, stream: u32, address: u32, count: u32, @@ -2703,7 +2692,6 @@ unsafe impl VMComponentAsyncStore for StoreInner { string_encoding, async_, TableIndex::Stream(ty), - err_ctx_ty, None, stream, address, @@ -2719,7 +2707,6 @@ unsafe impl VMComponentAsyncStore for StoreInner { string_encoding: u8, async_: bool, ty: TypeStreamTableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, stream: u32, address: u32, count: u32, @@ -2731,7 +2718,6 @@ unsafe impl VMComponentAsyncStore for StoreInner { string_encoding, async_, TableIndex::Stream(ty), - err_ctx_ty, None, stream, address, @@ -2746,7 +2732,6 @@ unsafe impl VMComponentAsyncStore for StoreInner { realloc: *mut VMFuncRef, async_: bool, ty: TypeStreamTableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, payload_size: u32, payload_align: u32, stream: u32, @@ -2760,7 +2745,6 @@ unsafe impl VMComponentAsyncStore for StoreInner { StringEncoding::Utf8 as u8, async_, TableIndex::Stream(ty), - err_ctx_ty, Some(FlatAbi { size: payload_size, align: payload_align, @@ -2778,7 +2762,6 @@ unsafe impl VMComponentAsyncStore for StoreInner { realloc: *mut VMFuncRef, async_: bool, ty: TypeStreamTableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, payload_size: u32, payload_align: u32, stream: u32, @@ -2792,7 +2775,6 @@ unsafe impl VMComponentAsyncStore for StoreInner { StringEncoding::Utf8 as u8, async_, TableIndex::Stream(ty), - err_ctx_ty, Some(FlatAbi { size: payload_size, align: payload_align, diff --git a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs index 694660e87a..465203c7a4 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs @@ -14,7 +14,7 @@ use { vm::{component::ComponentInstance, SendSyncPtr, VMFuncRef, VMMemoryDefinition}, AsContextMut, StoreContextMut, ValRaw, }, - anyhow::{anyhow, bail, ensure, Context, Result}, + anyhow::{anyhow, bail, Context, Result}, bytes::Bytes, futures::{ channel::{mpsc, oneshot}, @@ -58,8 +58,8 @@ pub(super) enum TableIndex { enum PostWrite { /// Continue performing writes Continue, - /// Close the channel post-write, possibly with an error - Close(Option), + /// Close the channel post-write + Close, } pub(crate) enum HostReadResult { @@ -181,8 +181,6 @@ fn accept, U>( /// Represents the state of a stream or future handle. #[derive(Debug, Eq, PartialEq)] pub(super) enum StreamFutureState { - /// Both the read and write ends are owned by the same component instance. - Local, /// Only the write end is owned by this component instance. Write, /// Only the read end is owned by this component instance. @@ -210,9 +208,7 @@ enum StreamOrFutureEvent { post_write: PostWrite, tx: oneshot::Sender<()>, }, - CloseWriter { - err_ctx: Option, - }, + CloseWriter, WatchWriter { tx: oneshot::Sender<()>, }, @@ -385,21 +381,6 @@ impl FutureWriter { } } - /// Close this object with and error instead of writing a value. - /// - /// # Arguments - /// - /// * `err_ctx` - The handle of an error context that should be reported with the stream closure - /// - pub fn close_with_error(mut self, err_ctx: ErrorContext) { - send( - &mut self.tx.take().unwrap(), - StreamOrFutureEvent::CloseWriter { - err_ctx: Some(err_ctx), - }, - ); - } - /// Convert this object into a `Promise` which will resolve when the read /// end of this `future` is closed, plus a `Watch` which can be used to /// retrieve the `FutureWriter` again. @@ -431,7 +412,7 @@ impl Ready for FutureWriter { impl Drop for FutureWriter { fn drop(&mut self) { if let Some(mut tx) = self.tx.take() { - send(&mut tx, StreamOrFutureEvent::CloseWriter { err_ctx: None }); + send(&mut tx, StreamOrFutureEvent::CloseWriter); } } } @@ -507,9 +488,6 @@ impl HostFuture { get_mut_by_index_from(state_table, TableIndex::Future(src), index)?; match state { - StreamFutureState::Local => { - *state = StreamFutureState::Write; - } StreamFutureState::Read => { state_table.remove_by_index(index)?; } @@ -741,21 +719,6 @@ impl StreamWriter { } } - /// Close this object with a final error. - /// - /// # Arguments - /// - /// * `err_ctx` - The handle of an error context that should be reported with the stream closure. - /// - pub fn close_with_error(mut self, err_ctx: ErrorContext) { - send( - &mut self.tx.take().unwrap(), - StreamOrFutureEvent::CloseWriter { - err_ctx: Some(err_ctx), - }, - ); - } - /// Convert this object into a `Promise` which will resolve when the read /// end of this `stream` is closed, plus a `Watch` which can be used to /// retrieve the `StreamWriter` again. @@ -787,7 +750,7 @@ impl Ready for StreamWriter { impl Drop for StreamWriter { fn drop(&mut self) { if let Some(mut tx) = self.tx.take() { - send(&mut tx, StreamOrFutureEvent::CloseWriter { err_ctx: None }); + send(&mut tx, StreamOrFutureEvent::CloseWriter); } } } @@ -864,9 +827,6 @@ impl HostStream { get_mut_by_index_from(state_table, TableIndex::Stream(src), index)?; match state { - StreamFutureState::Local => { - *state = StreamFutureState::Write; - } StreamFutureState::Read => { state_table.remove_by_index(index)?; } @@ -1202,7 +1162,7 @@ enum WriteState { accept: Box Result + Send + Sync>, post_write: PostWrite, }, - Closed(Option), + Closed, } /// Read state of a transmit channel @@ -1215,7 +1175,6 @@ enum ReadState { Open, GuestReady { ty: TableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, flat_abi: Option, options: Options, address: usize, @@ -1239,7 +1198,7 @@ enum Writer<'a> { Host { values: Box, }, - End(Option), + End, } struct RawLowerContext<'a> { @@ -1333,16 +1292,9 @@ impl ComponentInstance { instance.host_write(store, rep, values, post_write, tx) } - fn close_writer( - instance: SendSyncPtr, - rep: u32, - err_ctx: Option, - ) -> Result<()> { + fn close_writer(instance: SendSyncPtr, rep: u32) -> Result<()> { let instance = unsafe { &mut *instance.as_ptr() }; - instance.host_close_writer( - rep, - err_ctx.map(|v| TypeComponentGlobalErrorContextTableIndex::from_u32(v.rep)), - ) + instance.host_close_writer(rep) } fn watch_writer( @@ -1352,7 +1304,7 @@ impl ComponentInstance { ) -> Result<()> { let instance = unsafe { &mut *instance.as_ptr() }; let state = instance.get_mut(TableId::::new(rep))?; - if !matches!(&state.write, WriteState::Closed(_)) { + if !matches!(&state.write, WriteState::Closed) { state.writer_watcher = Some(tx); } Ok(()) @@ -1412,9 +1364,7 @@ impl ComponentInstance { post_write, tx, } => write::(instance, rep, values, post_write, tx)?, - StreamOrFutureEvent::CloseWriter { err_ctx } => { - close_writer(instance, rep, err_ctx)? - } + StreamOrFutureEvent::CloseWriter => close_writer(instance, rep)?, StreamOrFutureEvent::WatchWriter { tx } => { watch_writer(instance, rep, tx)? } @@ -1501,10 +1451,15 @@ impl ComponentInstance { &mut self.waitable_tables()[runtime_instance] } - fn guest_new(&mut self, ty: TableIndex) -> Result { - let state = self.new_transmit()?.0.rep(); - self.state_table(ty) - .insert(state, waitable_state(ty, StreamFutureState::Local)) + fn guest_new(&mut self, ty: TableIndex) -> Result { + let (write, read) = self.new_transmit()?; + let write = self + .state_table(ty) + .insert(write.rep(), waitable_state(ty, StreamFutureState::Write))?; + let read = self + .state_table(ty) + .insert(read.rep(), waitable_state(ty, StreamFutureState::Read))?; + Ok(ResourcePair { write, read }) } /// Write to a waitable from the host @@ -1622,8 +1577,8 @@ impl ComponentInstance { } } - if let PostWrite::Close(err_ctx) = post_write { - self.host_close_writer(transmit_rep, err_ctx)?; + if let PostWrite::Close = post_write { + self.host_close_writer(transmit_rep)?; } if let Some(tx) = tx.take() { @@ -1649,8 +1604,8 @@ impl ComponentInstance { let transmit_id = TableId::::new(rep); let transmit = self.get_mut(transmit_id).with_context(|| rep.to_string())?; - let new_state = if let WriteState::Closed(maybe_err_ctx) = &transmit.write { - WriteState::Closed(*maybe_err_ctx) + let new_state = if let WriteState::Closed = &transmit.write { + WriteState::Closed } else { WriteState::Open }; @@ -1701,18 +1656,10 @@ impl ComponentInstance { _ = tx.send(HostReadResult::Values(values)); count } - Writer::End(maybe_err_ctx) => match maybe_err_ctx { - None => { - _ = tx.send(HostReadResult::EndOfStream(None)); - CLOSED - } - Some(err_ctx) => { - _ = tx.send(HostReadResult::EndOfStream(Some( - ErrorContext::new(err_ctx.as_u32()), - ))); - CLOSED | err_ctx.as_u32() as usize - } - }, + Writer::End => { + _ = tx.send(HostReadResult::EndOfStream(None)); + CLOSED + } }) }), }; @@ -1744,8 +1691,8 @@ impl ComponentInstance { B::from(T::load_list(lift, list)?) })); - let pending = if let PostWrite::Close(err_ctx) = post_write { - self.get_mut(transmit_id)?.write = WriteState::Closed(err_ctx); + let pending = if let PostWrite::Close = post_write { + self.get_mut(transmit_id)?.write = WriteState::Closed; false } else { true @@ -1781,12 +1728,12 @@ impl ComponentInstance { }, )?; - if let PostWrite::Close(err_ctx) = post_write { - self.get_mut(transmit_id)?.write = WriteState::Closed(err_ctx); + if let PostWrite::Close = post_write { + self.get_mut(transmit_id)?.write = WriteState::Closed; } } - WriteState::Closed(_) => {} + WriteState::Closed => {} } Ok(()) @@ -1801,7 +1748,7 @@ impl ComponentInstance { transmit.write = WriteState::Open; } - WriteState::Open | WriteState::Closed(_) => {} + WriteState::Open | WriteState::Closed => {} } let write_id = transmit.write_handle; @@ -1841,13 +1788,7 @@ impl ComponentInstance { /// # Arguments /// /// * `transmit_rep` - A component-global representation of the transmit state for the writer that should be closed - /// * `err_ctx` - An optional component-global representation of an error context to use as the final value of the writer - /// - fn host_close_writer( - &mut self, - transmit_rep: u32, - err_ctx: Option, - ) -> Result<()> { + fn host_close_writer(&mut self, transmit_rep: u32) -> Result<()> { let transmit_id = TableId::::new(transmit_rep); let transmit = self .get_mut(transmit_id) @@ -1858,15 +1799,15 @@ impl ComponentInstance { // Existing queued transmits must be updated with information for the impending writer closure match &mut transmit.write { WriteState::GuestReady { post_write, .. } => { - *post_write = PostWrite::Close(err_ctx); + *post_write = PostWrite::Close; } WriteState::HostReady { post_write, .. } => { - *post_write = PostWrite::Close(err_ctx); + *post_write = PostWrite::Close; } v @ WriteState::Open => { - *v = WriteState::Closed(err_ctx); + *v = WriteState::Closed; } - WriteState::Closed(_) => unreachable!("write state is already closed"), + WriteState::Closed => unreachable!("write state is already closed"), } // If the existing read state is closed, then there's nothing to read @@ -1885,49 +1826,10 @@ impl ComponentInstance { // If the guest was ready to read, then we cannot close the reader (or writer) // we must deliver the event, and update the state associated with the handle to // represent that a read must be performed - ReadState::GuestReady { - ty, - err_ctx_ty, - handle, - .. - } => { + ReadState::GuestReady { ty, handle, .. } => { let read_handle = transmit.read_handle; - // Lift the global err_ctx that we're receiving into an error context - // reference that the reader(caller) will - let reader_state_tbl = self - .error_context_tables() - .get_mut(err_ctx_ty) - .context("retrieving component-local error context during host writer close")?; - - let push_param = match err_ctx { - None => CLOSED, - Some(err_ctx) => { - let rep = err_ctx.as_u32(); - // Get or insert the global error context into this guest's component-local error context tracking - let (local_err_ctx, _) = match reader_state_tbl.get_mut_by_rep(rep) { - Some(r) => { - // If the error already existed, since we're about to read it, increase - // the local component-wide reference count - (*r.1).0 += 1; - r - } - None => { - // If the error context was not already tracked locally, start tracking - reader_state_tbl.insert(rep, LocalErrorContextRefCount(1))?; - reader_state_tbl.get_mut_by_rep(rep).context( - "retrieving inserted local error context during guest read", - )? - } - }; - - // NOTE: we do not have to manage the global error context ref count here, because - // it was preemptively increased, and the guest that is ready to consume this - // will account for the extra global context ref count. - - CLOSED | local_err_ctx as usize - } - }; + let push_param = CLOSED; // Ensure the final read of the guest is queued, with appropriate closure indicator let count = u32::try_from(push_param).unwrap(); @@ -1943,7 +1845,7 @@ impl ComponentInstance { // If the host was ready to read, and the writer end is being closed (host->host write?) // signal to the reader that we've reached the end of the stream ReadState::HostReady { accept } => { - accept(Writer::End(err_ctx))?; + accept(Writer::End)?; } // If the read state is open, then there are no registered readers of the stream/future @@ -1976,8 +1878,8 @@ impl ComponentInstance { // If the write end is already closed, it should stay closed, // otherwise, it should be opened. - let new_state = if let WriteState::Closed(err_ctx) = &transmit.write { - WriteState::Closed(*err_ctx) + let new_state = if let WriteState::Closed = &transmit.write { + WriteState::Closed } else { WriteState::Open }; @@ -1993,7 +1895,7 @@ impl ComponentInstance { } => { let write_handle = transmit.write_handle; - let pending = if let PostWrite::Close(_) = post_write { + let pending = if let PostWrite::Close = post_write { self.delete_transmit(transmit_id)?; false } else { @@ -2022,7 +1924,7 @@ impl ComponentInstance { WriteState::Open => {} - WriteState::Closed(_) => { + WriteState::Closed => { log::trace!("host_close_reader delete {transmit_rep}"); self.delete_transmit(transmit_id)?; } @@ -2187,7 +2089,6 @@ impl ComponentInstance { string_encoding: u8, async_: bool, ty: TableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, flat_abi: Option, handle: u32, address: u32, @@ -2197,9 +2098,6 @@ impl ComponentInstance { bail!("synchronous stream and future writes not yet supported"); } - // TODO: handle errors sent via `{stream|future}.close-readable`: - _ = err_ctx_ty; - let address = usize::try_from(address).unwrap(); let count = usize::try_from(count).unwrap(); let options = unsafe { @@ -2331,7 +2229,6 @@ impl ComponentInstance { string_encoding: u8, async_: bool, ty: TableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, flat_abi: Option, handle: u32, address: u32, @@ -2368,8 +2265,8 @@ impl ComponentInstance { transmit_id.rep() ); let transmit = self.get_mut(transmit_id)?; - let new_state = if let WriteState::Closed(err_ctx) = &transmit.write { - WriteState::Closed(*err_ctx) + let new_state = if let WriteState::Closed = &transmit.write { + WriteState::Closed } else { WriteState::Open }; @@ -2403,8 +2300,8 @@ impl ComponentInstance { rep, )?; - let pending = if let PostWrite::Close(err_ctx) = post_write { - self.get_mut(transmit_id)?.write = WriteState::Closed(err_ctx); + let pending = if let PostWrite::Close = post_write { + self.get_mut(transmit_id)?.write = WriteState::Closed; false } else { true @@ -2441,8 +2338,8 @@ impl ComponentInstance { }, )?; - if let PostWrite::Close(err_ctx) = post_write { - self.get_mut(transmit_id)?.write = WriteState::Closed(err_ctx); + if let PostWrite::Close = post_write { + self.get_mut(transmit_id)?.write = WriteState::Closed; } count @@ -2459,60 +2356,12 @@ impl ComponentInstance { address: usize::try_from(address).unwrap(), count: usize::try_from(count).unwrap(), handle, - err_ctx_ty, }; BLOCKED } - WriteState::Closed(err_ctx) => { - match err_ctx { - // If no error context is provided, closed can be sent - None => CLOSED, - // If an error context was present, we must ensure it's created and bitwise OR w/ CLOSED - Some(err_ctx) => { - // Lower the global error context that was saved into write state into a component-local - // error context handle - let state_tbl = self.error_context_tables().get_mut(err_ctx_ty).context( - "retrieving local error context table during closed read w/ error", - )?; - - // Get or insert the global error context into this guest's component-local error context tracking - let (local_err_ctx, _) = match state_tbl.get_mut_by_rep(err_ctx.as_u32()) { - Some(r) => { - // If the error already existed, since we're about to read it, increase - // the local component-wide reference count - (*r.1).0 += 1; - r - } - None => { - let rep = err_ctx.as_u32(); - // If the error context was not already tracked locally, start tracking - state_tbl.insert(rep, LocalErrorContextRefCount(1))?; - state_tbl.get_mut_by_rep(rep).context( - "retrieving inserted local error context during guest read", - )? - } - }; - - // NOTE: During write closure when the error context was provided, we - // incremented the global count to ensure the error context would not be garbage collected, - // if dropped by the sending component. - // - // Since we did that preemptively, we do not need to increment the global ref count even - // after this increase in local ref count. - // - // If a reader (this reader) *never* comes along, when the relevant stream/future is closed, - // the writer state will indicate that the global count must be amended. - - // We reset the write state to a simple closed now that the error value has been read out, - // in the case that another read is performed - self.get_mut(transmit_id)?.write = WriteState::Closed(None); - - CLOSED | local_err_ctx as usize - } - } - } + WriteState::Closed => CLOSED, }; if result != BLOCKED { @@ -2530,7 +2379,7 @@ impl ComponentInstance { }; log::trace!("guest cancel write {rep} (handle {writer})"); match state { - StreamFutureState::Local | StreamFutureState::Write => { + StreamFutureState::Write => { bail!("stream or future write canceled when no write is pending") } StreamFutureState::Read => { @@ -2552,7 +2401,7 @@ impl ComponentInstance { }; log::trace!("guest cancel read {rep} (handle {reader})"); match state { - StreamFutureState::Local | StreamFutureState::Read => { + StreamFutureState::Read => { bail!("stream or future read canceled when no read is pending") } StreamFutureState::Write => { @@ -2566,13 +2415,7 @@ impl ComponentInstance { self.host_cancel_read(rep) } - fn guest_close_writable( - &mut self, - ty: TableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, - writer: u32, - local_err_ctx: u32, - ) -> Result<()> { + fn guest_close_writable(&mut self, ty: TableIndex, writer: u32) -> Result<()> { let (transmit_rep, WaitableState::Stream(_, state) | WaitableState::Future(_, state)) = self.state_table(ty) .remove_by_index(writer) @@ -2581,86 +2424,28 @@ impl ComponentInstance { bail!("invalid stream or future handle"); }; match state { - StreamFutureState::Local | StreamFutureState::Write => {} + StreamFutureState::Write => {} StreamFutureState::Read => { bail!("passed read end to `{{stream|future}}.close-writable`") } StreamFutureState::Busy => bail!("cannot drop busy stream or future"), } - // Resolve the error context - let global_err_ctx = match local_err_ctx { - // If no error context was provided, we can pass that along as-is - 0 => None, - - // If a non-zero error context was provided, first ensure it's valid, - // then lift the guest-local (component instance local) error context reference - // to the component-global level. - // - // This ensures that after closing the writer, when the eventual reader appears - // we can lower the component-global error context into a reader-local error context - err_ctx => { - // Look up the local component error context - let state_tbl = self - .error_context_tables() - .get_mut(err_ctx_ty) - .context("retrieving local error context during guest close writable")?; - - // NOTE: the rep below is the component-global error context index - let (rep, _) = state_tbl.get_mut_by_index(local_err_ctx).with_context(|| { - format!("missing component local error context idx [{local_err_ctx}] while closing writable") - })?; - - let global_err_ctx = TypeComponentGlobalErrorContextTableIndex::from_u32(rep); - - // Closing the writer with an error context means that a reader must later - // come along and discover the error context even once the writer goes away. - // - // Here we preemptively increase the ref count to ensure the error context - // won't be removed by the time the reader comes along - let GlobalErrorContextRefCount(global_count) = self - .global_error_context_ref_counts() - .get_mut(&global_err_ctx) - .context( - "retrieving global error context ref count during guest close writable", - )?; - *global_count += 1; - ensure!( - self - .get(TableId::::new(rep)) - .is_ok(), - "missing global error context state [{rep}] for local error context [{err_ctx}] during guest close writable" - ); - Some(global_err_ctx) - } - }; - let transmit_rep = self .get(TableId::::new(transmit_rep))? .state .rep(); - self.host_close_writer(transmit_rep, global_err_ctx) + self.host_close_writer(transmit_rep) } - fn guest_close_readable( - &mut self, - ty: TableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, - reader: u32, - error: u32, - ) -> Result<()> { - if error != 0 { - _ = err_ctx_ty; - todo!(); - } - + fn guest_close_readable(&mut self, ty: TableIndex, reader: u32) -> Result<()> { let (rep, WaitableState::Stream(_, state) | WaitableState::Future(_, state)) = self.state_table(ty).remove_by_index(reader)? else { bail!("invalid stream or future handle"); }; match state { - StreamFutureState::Local | StreamFutureState::Read => {} + StreamFutureState::Read => {} StreamFutureState::Write => { bail!("passed write end to `{{stream|future}}.close-readable`") } @@ -2870,48 +2655,29 @@ impl ComponentInstance { make_state: impl Fn(U, StreamFutureState) -> WaitableState, ) -> Result { let src_table = &mut self.waitable_tables()[src_instance]; - let (rep, src_state) = src_table.get_mut_by_index(src_idx)?; + let (_rep, src_state) = src_table.get_mut_by_index(src_idx)?; let (src_ty, _) = match_state(src_state)?; if src_ty != src { bail!("invalid future handle"); } - let state = self.get(TableId::::new(rep))?.state; - let state = self.get_mut(state)?; - let read_handle = state.read_handle.rep(); - let write_handle = state.write_handle.rep(); - let src_table = &mut self.waitable_tables()[src_instance]; let (rep, src_state) = src_table.get_mut_by_index(src_idx)?; let (_, src_state) = match_state(src_state)?; match src_state { - StreamFutureState::Local => { - *src_state = StreamFutureState::Write; - let dst_table = &mut self.waitable_tables()[dst_instance]; - assert!(dst_table.get_mut_by_rep(read_handle).is_none()); - dst_table.insert(read_handle, make_state(dst, StreamFutureState::Read)) - } StreamFutureState::Read => { src_table.remove_by_index(src_idx)?; let dst_table = &mut self.waitable_tables()[dst_instance]; - if let Some((dst_idx, dst_state)) = dst_table.get_mut_by_rep(write_handle) { - let (dst_ty, dst_state) = match_state(dst_state).unwrap(); - assert_eq!(dst_ty, dst); - assert_eq!(*dst_state, StreamFutureState::Write); - *dst_state = StreamFutureState::Local; - Ok(dst_idx) - } else { - dst_table.insert(rep, make_state(dst, StreamFutureState::Read)) - } + dst_table.insert(rep, make_state(dst, StreamFutureState::Read)) } StreamFutureState::Write => bail!("cannot transfer write end of stream or future"), StreamFutureState::Busy => bail!("cannot transfer busy stream or future"), } } - pub(crate) fn future_new(&mut self, ty: TypeFutureTableIndex) -> Result { + pub(crate) fn future_new(&mut self, ty: TypeFutureTableIndex) -> Result { self.guest_new(TableIndex::Future(ty)) } @@ -2936,24 +2702,20 @@ impl ComponentInstance { pub(crate) fn future_close_writable( &mut self, ty: TypeFutureTableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, writer: u32, - error: u32, ) -> Result<()> { - self.guest_close_writable(TableIndex::Future(ty), err_ctx_ty, writer, error) + self.guest_close_writable(TableIndex::Future(ty), writer) } pub(crate) fn future_close_readable( &mut self, ty: TypeFutureTableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, reader: u32, - error: u32, ) -> Result<()> { - self.guest_close_readable(TableIndex::Future(ty), err_ctx_ty, reader, error) + self.guest_close_readable(TableIndex::Future(ty), reader) } - pub(crate) fn stream_new(&mut self, ty: TypeStreamTableIndex) -> Result { + pub(crate) fn stream_new(&mut self, ty: TypeStreamTableIndex) -> Result { self.guest_new(TableIndex::Stream(ty)) } @@ -2978,21 +2740,17 @@ impl ComponentInstance { pub(crate) fn stream_close_writable( &mut self, ty: TypeStreamTableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, writer: u32, - error: u32, ) -> Result<()> { - self.guest_close_writable(TableIndex::Stream(ty), err_ctx_ty, writer, error) + self.guest_close_writable(TableIndex::Stream(ty), writer) } pub(crate) fn stream_close_readable( &mut self, ty: TypeStreamTableIndex, - err_ctx_ty: TypeComponentLocalErrorContextTableIndex, reader: u32, - error: u32, ) -> Result<()> { - self.guest_close_readable(TableIndex::Stream(ty), err_ctx_ty, reader, error) + self.guest_close_readable(TableIndex::Stream(ty), reader) } pub(crate) fn future_transfer( @@ -3081,3 +2839,8 @@ impl ComponentInstance { Ok(updated_count) } } + +pub(crate) struct ResourcePair { + pub(crate) write: u32, + pub(crate) read: u32, +} diff --git a/crates/wasmtime/src/runtime/vm/component/libcalls.rs b/crates/wasmtime/src/runtime/vm/component/libcalls.rs index f92a6abe13..bd4405b56d 100644 --- a/crates/wasmtime/src/runtime/vm/component/libcalls.rs +++ b/crates/wasmtime/src/runtime/vm/component/libcalls.rs @@ -1,6 +1,8 @@ //! Implementation of string transcoding required by the component model. use crate::prelude::*; +#[cfg(feature = "component-model-async")] +use crate::runtime::component::concurrent::ResourcePair; use crate::runtime::vm::component::{ComponentInstance, VMComponentContext}; use crate::runtime::vm::{HostResultHasUnwindSentinel, VmSafe}; use core::cell::Cell; @@ -871,7 +873,18 @@ unsafe fn error_context_transfer( } #[cfg(feature = "component-model-async")] -unsafe fn future_new(vmctx: NonNull, ty: u32) -> Result { +unsafe impl HostResultHasUnwindSentinel for ResourcePair { + type Abi = u64; + const SENTINEL: u64 = u64::MAX; + + fn into_abi(self) -> Self::Abi { + assert!(self.read & (1 << 31) == 0); + (u64::from(self.read) << 32) | u64::from(self.write) + } +} + +#[cfg(feature = "component-model-async")] +unsafe fn future_new(vmctx: NonNull, ty: u32) -> Result { ComponentInstance::from_vmctx(vmctx, |instance| { instance.future_new(wasmtime_environ::component::TypeFutureTableIndex::from_u32( ty, @@ -887,7 +900,6 @@ unsafe fn future_write( string_encoding: u8, async_: u8, ty: u32, - err_ctx_ty: u32, future: u32, address: u32, ) -> Result { @@ -899,9 +911,6 @@ unsafe fn future_write( string_encoding, async_ != 0, wasmtime_environ::component::TypeFutureTableIndex::from_u32(ty), - wasmtime_environ::component::TypeComponentLocalErrorContextTableIndex::from_u32( - err_ctx_ty, - ), future, address, ) @@ -916,7 +925,6 @@ unsafe fn future_read( string_encoding: u8, async_: u8, ty: u32, - err_ctx_ty: u32, future: u32, address: u32, ) -> Result { @@ -928,9 +936,6 @@ unsafe fn future_read( string_encoding, async_ != 0, wasmtime_environ::component::TypeFutureTableIndex::from_u32(ty), - wasmtime_environ::component::TypeComponentLocalErrorContextTableIndex::from_u32( - err_ctx_ty, - ), future, address, ) @@ -973,18 +978,12 @@ unsafe fn future_cancel_read( unsafe fn future_close_writable( vmctx: NonNull, ty: u32, - err_ctx_ty: u32, writer: u32, - error: u32, ) -> Result<()> { ComponentInstance::from_vmctx(vmctx, |instance| { instance.future_close_writable( wasmtime_environ::component::TypeFutureTableIndex::from_u32(ty), - wasmtime_environ::component::TypeComponentLocalErrorContextTableIndex::from_u32( - err_ctx_ty, - ), writer, - error, ) }) } @@ -993,24 +992,18 @@ unsafe fn future_close_writable( unsafe fn future_close_readable( vmctx: NonNull, ty: u32, - err_ctx_ty: u32, reader: u32, - error: u32, ) -> Result<()> { ComponentInstance::from_vmctx(vmctx, |instance| { instance.future_close_readable( wasmtime_environ::component::TypeFutureTableIndex::from_u32(ty), - wasmtime_environ::component::TypeComponentLocalErrorContextTableIndex::from_u32( - err_ctx_ty, - ), reader, - error, ) }) } #[cfg(feature = "component-model-async")] -unsafe fn stream_new(vmctx: NonNull, ty: u32) -> Result { +unsafe fn stream_new(vmctx: NonNull, ty: u32) -> Result { ComponentInstance::from_vmctx(vmctx, |instance| { instance.stream_new(wasmtime_environ::component::TypeStreamTableIndex::from_u32( ty, @@ -1026,7 +1019,6 @@ unsafe fn stream_write( string_encoding: u8, async_: u8, ty: u32, - err_ctx_ty: u32, stream: u32, address: u32, count: u32, @@ -1039,9 +1031,6 @@ unsafe fn stream_write( string_encoding, async_ != 0, wasmtime_environ::component::TypeStreamTableIndex::from_u32(ty), - wasmtime_environ::component::TypeComponentLocalErrorContextTableIndex::from_u32( - err_ctx_ty, - ), stream, address, count, @@ -1057,7 +1046,6 @@ unsafe fn stream_read( string_encoding: u8, async_: u8, ty: u32, - err_ctx_ty: u32, stream: u32, address: u32, count: u32, @@ -1070,9 +1058,6 @@ unsafe fn stream_read( string_encoding, async_ != 0, wasmtime_environ::component::TypeStreamTableIndex::from_u32(ty), - wasmtime_environ::component::TypeComponentLocalErrorContextTableIndex::from_u32( - err_ctx_ty, - ), stream, address, count, @@ -1116,18 +1101,12 @@ unsafe fn stream_cancel_read( unsafe fn stream_close_writable( vmctx: NonNull, ty: u32, - err_ctx_ty: u32, writer: u32, - error: u32, ) -> Result<()> { ComponentInstance::from_vmctx(vmctx, |instance| { instance.stream_close_writable( wasmtime_environ::component::TypeStreamTableIndex::from_u32(ty), - wasmtime_environ::component::TypeComponentLocalErrorContextTableIndex::from_u32( - err_ctx_ty, - ), writer, - error, ) }) } @@ -1136,18 +1115,12 @@ unsafe fn stream_close_writable( unsafe fn stream_close_readable( vmctx: NonNull, ty: u32, - err_ctx_ty: u32, reader: u32, - error: u32, ) -> Result<()> { ComponentInstance::from_vmctx(vmctx, |instance| { instance.stream_close_readable( wasmtime_environ::component::TypeStreamTableIndex::from_u32(ty), - wasmtime_environ::component::TypeComponentLocalErrorContextTableIndex::from_u32( - err_ctx_ty, - ), reader, - error, ) }) } @@ -1159,7 +1132,6 @@ unsafe fn flat_stream_write( realloc: *mut u8, async_: u8, ty: u32, - err_ctx_ty: u32, payload_size: u32, payload_align: u32, stream: u32, @@ -1175,9 +1147,6 @@ unsafe fn flat_stream_write( realloc.cast::(), async_ != 0, wasmtime_environ::component::TypeStreamTableIndex::from_u32(ty), - wasmtime_environ::component::TypeComponentLocalErrorContextTableIndex::from_u32( - err_ctx_ty, - ), payload_size, payload_align, stream, @@ -1194,7 +1163,6 @@ unsafe fn flat_stream_read( realloc: *mut u8, async_: u8, ty: u32, - err_ctx_ty: u32, payload_size: u32, payload_align: u32, stream: u32, @@ -1210,9 +1178,6 @@ unsafe fn flat_stream_read( realloc.cast::(), async_ != 0, wasmtime_environ::component::TypeStreamTableIndex::from_u32(ty), - wasmtime_environ::component::TypeComponentLocalErrorContextTableIndex::from_u32( - err_ctx_ty, - ), payload_size, payload_align, stream, diff --git a/tests/misc_testsuite/component-model-async/future-read.wast b/tests/misc_testsuite/component-model-async/future-read.wast index 4a356685b7..738001ed63 100644 --- a/tests/misc_testsuite/component-model-async/future-read.wast +++ b/tests/misc_testsuite/component-model-async/future-read.wast @@ -35,11 +35,11 @@ (core func $child-run (canon lower (func $child "run"))) (core module $m - (import "" "new" (func $new (result i32))) + (import "" "new" (func $new (result i64))) (import "" "child-run" (func $child-run (param i32))) (func (export "run") - (call $child-run (call $new)) + (call $child-run (i32.wrap_i64 (i64.shr_u (call $new) (i64.const 32)))) ) ) (core instance $i (instantiate $m @@ -87,11 +87,11 @@ (core func $child-run (canon lower (func $child "run"))) (core module $m - (import "" "new" (func $new (result i32))) + (import "" "new" (func $new (result i64))) (import "" "child-run" (func $child-run (param i32))) (func (export "run") - (call $child-run (call $new)) + (call $child-run (i32.wrap_i64 (i64.shr_u (call $new) (i64.const 32)))) ) ) (core instance $i (instantiate $m @@ -143,11 +143,11 @@ (core func $child-run (canon lower (func $child "run"))) (core module $m - (import "" "new" (func $new (result i32))) + (import "" "new" (func $new (result i64))) (import "" "child-run" (func $child-run (param i32))) (func (export "run") - (call $child-run (call $new)) + (call $child-run (i32.wrap_i64 (i64.shr_u (call $new) (i64.const 32)))) ) ) (core instance $i (instantiate $m @@ -190,7 +190,7 @@ (core instance $i (instantiate $m (with "" (instance (export "read" (func $read)) - (export "return" (func $return)) + (export "return" (func $return)) )) )) (func (export "run") (param "x" $future) @@ -203,11 +203,11 @@ (core func $child-run (canon lower (func $child "run"))) (core module $m - (import "" "new" (func $new (result i32))) + (import "" "new" (func $new (result i64))) (import "" "child-run" (func $child-run (param i32))) (func (export "run") - (call $child-run (call $new)) + (call $child-run (i32.wrap_i64 (i64.shr_u (call $new) (i64.const 32)))) ) ) (core instance $i (instantiate $m diff --git a/tests/misc_testsuite/component-model-async/futures.wast b/tests/misc_testsuite/component-model-async/futures.wast index 8ed319c169..3a3eed8a56 100644 --- a/tests/misc_testsuite/component-model-async/futures.wast +++ b/tests/misc_testsuite/component-model-async/futures.wast @@ -4,7 +4,7 @@ ;; future.new (component (core module $m - (import "" "future.new" (func $future-new (result i32))) + (import "" "future.new" (func $future-new (result i64))) ) (type $future-type (future u8)) (core func $future-new (canon future.new $future-type)) @@ -73,7 +73,7 @@ ;; future.close-readable (component (core module $m - (import "" "future.close-readable" (func $future-close-readable (param i32 i32))) + (import "" "future.close-readable" (func $future-close-readable (param i32))) ) (type $future-type (future u8)) (core func $future-close-readable (canon future.close-readable $future-type)) @@ -83,7 +83,7 @@ ;; future.close-writable (component (core module $m - (import "" "future.close-writable" (func $future-close-writable (param i32 i32))) + (import "" "future.close-writable" (func $future-close-writable (param i32))) ) (type $future-type (future u8)) (core func $future-close-writable (canon future.close-writable $future-type)) diff --git a/tests/misc_testsuite/component-model-async/streams.wast b/tests/misc_testsuite/component-model-async/streams.wast index 6a5a8a0c36..c99f87c1fc 100644 --- a/tests/misc_testsuite/component-model-async/streams.wast +++ b/tests/misc_testsuite/component-model-async/streams.wast @@ -4,7 +4,7 @@ ;; stream.new (component (core module $m - (import "" "stream.new" (func $stream-new (result i32))) + (import "" "stream.new" (func $stream-new (result i64))) ) (type $stream-type (stream u8)) (core func $stream-new (canon stream.new $stream-type)) @@ -73,7 +73,7 @@ ;; stream.close-readable (component (core module $m - (import "" "stream.close-readable" (func $stream-close-readable (param i32 i32))) + (import "" "stream.close-readable" (func $stream-close-readable (param i32))) ) (type $stream-type (stream u8)) (core func $stream-close-readable (canon stream.close-readable $stream-type)) @@ -83,7 +83,7 @@ ;; stream.close-writable (component (core module $m - (import "" "stream.close-writable" (func $stream-close-writable (param i32 i32))) + (import "" "stream.close-writable" (func $stream-close-writable (param i32))) ) (type $stream-type (stream u8)) (core func $stream-close-writable (canon stream.close-writable $stream-type))