-
Notifications
You must be signed in to change notification settings - Fork 8
Host blocks when trying to write/read stream to/from guest #96
Description
Hi, I am trying out the new features in wasip3-prototyping (https://github.com/segeljakt/wasmtime-component-async-test).
Everything works well, except one of the test cases. I have a function on the guest that is supposed to take an input stream, create a task that reads 10 elements from the stream and write a response to each element on an output stream:
pub mod bindings {
use wit_bindgen::generate;
generate!({
world: "guest",
path: "interface.wit",
async: {
exports: [ "pkg:component/intf#test4" ]
}
});
pub struct Component;
export!(Component);
}
impl Guest for bindings::Component {
async fn test4(mut stream: StreamReader<String>) -> StreamReader<String> {
let (mut tx, rx) = bindings::wit_stream::new::<String>();
async_support::spawn(async move {
for i in 0..10 {
match stream.next().await {
Some(Ok(_items)) => {
tx.send(vec!["Response".to_string()]).await;
}
_ => {
tx.close().await.unwrap();
break;
}
}
}
});
rx
}
}I have this code on the host to test the function:
#[tokio::test]
async fn test4() {
let (instance, mut store, intf_export) = init().await;
let export = instance
.get_export(&mut store, Some(&intf_export), "test4")
.unwrap();
let func3: TypedFunc<(HostStream<String>,), (HostStream<String>,)> =
instance.get_typed_func(&mut store, export).unwrap();
let (mut tx, rx) = instance.stream(&mut store).unwrap();
let (result,) = func3.call_async(&mut store, (rx.into(),)).await.unwrap();
let handle1 = tokio::task::spawn(async move {
for i in 0..10 {
println!("{i} Writing: Hello World! (test4)");
let Some(new) = tx
.write(Single(format!("Hello World! {i} (test4)")))
.into_future()
.await
else {
panic!("Error writing stream");
};
tx = new;
}
});
func3.post_return_async(&mut store).await.unwrap();
let mut result: StreamReader<Vec<String>> = result.into_reader(&mut store);
let handle2 = tokio::task::spawn(async move {
for i in 0..10 {
println!("{i} Reading...");
let Ok((new, item)) = result.read().into_future().await else {
panic!("Error reading stream");
};
result = new;
println!("Result: {:?}", item);
}
});
tokio::try_join!(handle1, handle2).unwrap();
}However, when I try to run the code, the spawned tasks on the host get stuck at tx.write(Single(...)).into_future().await and result.read().into_future().await. Is there any obvious mistake in my code? Everything seems to work well when just using futures.
Another slightly unrelated question. Is there an async API for WASI that guests can use to for example read files and sockets? I only found wasi::io::streams, but it appears to be synchronous.