Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion web-streams/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ authors = ["Luke Curley"]
repository = "https://github.com/kixelated/web-rs"
license = "MIT OR Apache-2.0"

version = "0.1.2"
version = "0.1.4"
edition = "2021"

categories = ["wasm", "web-programming", "api-bindings"]
Expand All @@ -14,6 +14,12 @@ categories = ["wasm", "web-programming", "api-bindings"]
thiserror = "2.0"
wasm-bindgen = "0.2"
wasm-bindgen-futures = "0.4"
tokio = { version = "1.45.1", features = ["io-util"], optional = true }
tracing = { version = "0.1", optional = true}


[features]
tokio = ["dep:tokio", "dep:tracing" ]

[dependencies.web-sys]
version = "0.3.77"
Expand Down
97 changes: 94 additions & 3 deletions web-streams/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct Reader<T: JsCast> {
inner: ReadableStreamDefaultReader,

// Keep the most recent promise to make `read` cancelable
read: Option<js_sys::Promise>,
read: Option<JsFuture>,

_phantom: PhantomData<T>,
}
Expand All @@ -31,12 +31,14 @@ impl<T: JsCast> Reader<T> {
/// Read the next element from the stream, returning None if the stream is done.
pub async fn read(&mut self) -> Result<Option<T>, Error> {
if self.read.is_none() {
self.read = Some(self.inner.read());
self.read = Some(JsFuture::from(self.inner.read()));
}

let result: ReadableStreamReadResult = JsFuture::from(self.read.as_ref().unwrap().clone()).await?.into();
let result: ReadableStreamReadResult = self.read.as_mut().unwrap().await?.into();
self.read.take(); // Clear the promise on success

//todo why do you use `Reflect` here?
// is get_done and get_value not good enough?
if Reflect::get(&result, &"done".into())?.is_truthy() {
return Ok(None);
}
Expand Down Expand Up @@ -64,3 +66,92 @@ impl<T: JsCast> Drop for Reader<T> {
self.inner.release_lock();
}
}


use wasm_bindgen::JsCast;

#[cfg(feature = "tokio")]
mod tokio_impl {
use std::io::{Result, Error, ErrorKind, ErrorKind::Unsupported};
use super::*;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, ReadBuf};
use wasm_bindgen::JsCast;
use crate::reader::js_sys::Uint8Array;
use std::future::Future;
use Poll::{Pending, Ready};
use js_sys::Promise;
use ErrorKind::Other;
use tracing::info;

impl AsyncRead for Reader<Uint8Array> {

fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<()>> {

//if there is no pending read, we need to create one
if self.read.is_none() {
self.read = Some(JsFuture::from(self.inner.read()));
}

let Some(promise) = self.read.as_mut() else {
return Ready(Err(Error::new(Other, "Unrecoverable error: No pending read found despite just queued")));
};

match Pin::new(promise).poll(cx) {
Pending => Pending,
Ready(Ok(js_val)) => {
//we clone, set and then take here because
//in case of pending it needs to be in read
self.read.take();

//it seems at the moment that the value cannot be typechecked?
let result = js_val.unchecked_into::<ReadableStreamReadResult>();
/*let Ok(result) = js_val.dyn_into::<ReadableStreamReadResult>() else {
return Ready(Err(Error::new(Unsupported, "Unrecoverable error: Expected js type ReadableStreamReadResult")));
};*/
if result.get_done().unwrap_or(false) {
return Ready(Ok(())); // EOF
}

let Ok(array) = result.get_value().dyn_into::<Uint8Array>() else {
return Ready(Err(Error::new(Unsupported, "Unrecoverable error: Expected js type Uint8Array")));
};
let array_len = array.length() as usize;
let len = std::cmp::min(buf.remaining(), array_len);

// Copy what fits
// # Safety: copy_to_uninit does not uninit anything and inits the first `len` bytes.
let dst = unsafe {
&mut buf.unfilled_mut()[0..len]
};
array.slice(0, len as u32).copy_to_uninit(dst);
unsafe { buf.assume_init(len); }
buf.advance(len);

// If there are leftover bytes, we must not drop them
// create a new ReadableStreamReadResult and set self.read
if len < array_len {
let leftover = array.slice(len as u32, array_len as u32);
//let result = ReadableStreamReadResult::new(); i believe we can reuse the existing one
result.set_done(false);
result.set_value(&**leftover);
let promise = Promise::resolve(&**result);
self.read = Some(JsFuture::from(promise));
}

Ready(Ok(()))
}
Ready(Err(_)) => {
self.read.take();
Ready(Err(Error::new(Other, "js read error")))
}
}
}
}
}

225 changes: 224 additions & 1 deletion web-streams/src/writer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::marker::PhantomData;
use std::mem::ManuallyDrop;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::JsFuture;
use wasm_bindgen_futures::{js_sys, JsFuture};
use web_sys::{WritableStream, WritableStreamDefaultWriter};

use crate::{Error, PromiseExt};
Expand Down Expand Up @@ -40,3 +42,224 @@ impl Drop for Writer {
self.inner.release_lock();
}
}

impl<T: JsCast> From<Writer> for TypedWriter<T> {
fn from(value: Writer) -> Self {
let value: ManuallyDrop<Writer> = ManuallyDrop::new(value);

TypedWriter {
inner: value.inner.clone(),
write_promise: None,
_phantom: PhantomData,
}
}
}

impl<T: JsCast> Drop for TypedWriter<T> {
fn drop(&mut self) {
self.inner.release_lock();
}
}

impl<T: JsCast> TryFrom<TypedWriter<T>> for Writer {
type Error = TypedWriter<T>;

fn try_from(value: TypedWriter<T>) -> Result<Self, Self::Error> {
if value.write_promise.is_some() {
Err(value)
} else {
let value: ManuallyDrop<TypedWriter<T>> = ManuallyDrop::new(value);
Ok(Writer {
inner: value.inner.clone(),
})
}

}
}


pub struct TypedWriter<T: JsCast> {
inner: WritableStreamDefaultWriter,
// Keep the most recent promise to make `write` cancelable
write_promise: Option<JsFuture>,

_phantom: PhantomData<T>,
}

impl<T: JsCast> TypedWriter<T> {
pub fn new(stream: &WritableStream) -> Result<Self, Error> {
let inner = stream.get_writer()?.unchecked_into();
Ok(Self {
inner,
write_promise: None,
_phantom: PhantomData,
})
}

pub async fn write(&mut self, v: &T) -> Result<(), Error> {
if let Some(promise) = &mut self.write_promise.take() {
promise.await?;
}
let js_value = JsValue::from(v);
self.write_promise = Some(JsFuture::from(self.inner.write_with_chunk(&js_value)));
if let Some(promise) = &mut self.write_promise {
promise.await?;
self.write_promise = None;
}
Ok(())
}

pub fn close(&mut self) {
/*if let Some(promise) = self.write_promise.take() {
promise.block;
}*/
self.inner.close().ignore();
}

pub fn abort(&mut self, reason: &str) {
/*if let Some(promise) = self.write_promise.take() {
promise.ignore();
}*/
let str = JsValue::from_str(reason);
self.inner.abort_with_reason(&str).ignore();
}

/// Wait for the stream to be closed
pub async fn closed(&self) -> Result<(), Error> {
//todo is it correct that this only requires &self?
/*if let Some(promise) = &self.write_promise {
promise.await?;
}*/

JsFuture::from(self.inner.closed()).await?;
Ok(())
}
}

#[cfg(feature = "tokio")]
mod tokio_impl {
use std::future::Future;
use super::*;
use std::io::{Result, Error, ErrorKind};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncWrite;
use wasm_bindgen::JsCast;
use wasm_bindgen_futures::JsFuture;
use js_sys::Uint8Array;
use ErrorKind::{BrokenPipe, Other};
use std::task::Poll::Ready;
use Poll::Pending;
use tracing::info;

impl<T: JsCast + Unpin> TypedWriter<T> {
fn project(self: Pin<&mut Self>) -> (&mut WritableStreamDefaultWriter, &mut Option<JsFuture>) {
// Safety: None of the fields are self-referential or require pinning
let this = self.get_mut();
(&mut this.inner, &mut this.write_promise)
}
}

impl AsyncWrite for TypedWriter<Uint8Array> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize>> {
info!("poll_write called with buf{{len={}}}: {:?}", buf.len(), buf);

let Ok(Some(desired_size)) = self.inner.desired_size() else {
return Ready(Err(Error::new(BrokenPipe, "stream is closed, not writable, or abort queued")));
};

let (inner, write_promise) = Self::project(self);
info!("desired size: {}", desired_size);
if desired_size < 1f64 {
// if we return Pending here we must also ensure a waker is provided
return if let Some(promise) = write_promise {
match Pin::new(promise).poll(cx) {
Pending => Pending,
Ready(Ok(_)) => {
*write_promise = None;
Ready(Ok(0))
},
Ready(Err(err)) => {
*write_promise = None;
let js_err_str = err.as_string().unwrap_or_else(|| "unknown error".to_string());
Ready(Err(Error::new(Other, format!("js wait for write error: {}", js_err_str))))
},
}
} else {
Ready(Ok(0)) // No pending write, nothing to flush
};

//return Ready(Err(Error::from(WouldBlock)));
//return Ready(Err(Error::new(WouldBlock, format!("desired size is too small: {}", desired_size))));
}
//let desired_size = desired_size as usize;
if let Some(promise) = write_promise {
if let Ready(Err(err)) = Pin::new(promise).poll(cx) {
*write_promise = None;
let js_err_str = err.as_string().unwrap_or_else(|| "unknown error".to_string());
return Ready(Err(Error::new(Other, format!("js write error: {}", js_err_str))));
}
}

//let len = std::cmp::min(buf.len(), desired_size);
let array = Uint8Array::from(buf);//.slice(0, len as u32);
//todo this looks like a proper issue to me!
let p = JsFuture::from(inner.write_with_chunk(&array));
*write_promise = Some(p); //this promise should only resolve after the current anyway
/*match write_promise {
Some(val) => {
*val = val.then(&Closure::<dyn FnMut(JsValue)>::new(move |_| {
p
}));
},
opt @ None => *opt = Some(p),
}*/
Ready(Ok(buf.len()))
}

fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<()>> {
let (_ , write_promise) = Self::project(self);
if let Some(promise) = write_promise {
match Pin::new(promise).poll(cx) {
Pending => Pending,
Ready(Ok(_)) => {
*write_promise = None;
Ready(Ok(()))
},
Ready(Err(err)) => {
*write_promise = None;
let js_err_str = err.as_string().unwrap_or_else(|| "unknown error".to_string());
Ready(Err(Error::new(Other, format!("js flush error: {}", js_err_str))))
},
}
} else {
Ready(Ok(())) // No pending write, nothing to flush
}
}

fn poll_shutdown(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<()>> {
let (inner, _) = Self::project(self);
inner.close().ignore();
let p = inner.closed();
let mut js_future = JsFuture::from(p);
match Pin::new(&mut js_future).poll(_cx) {
Pending => Pending,
Ready(Ok(_)) => Ready(Ok(())),
Ready(Err(err)) => {
let js_err_str = err.as_string().unwrap_or_else(|| "unknown error".to_string());
Ready(Err(Error::new(Other, format!("js shutdown error: {}", js_err_str))))
},
}
}
}
}