Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 34 additions & 30 deletions src/hyperapp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,10 @@ use serde::{Deserialize, Serialize};
use thiserror::Error;
use uuid::Uuid;

// macro_export puts it in the root,
// so we re-export here so you can use as either
// hyperware_process_lib::run_async
// or
// hyperware_process_lib::hyperapp::run_async
pub use crate::run_async;

thread_local! {
static SPAWN_QUEUE: RefCell<Vec<Pin<Box<dyn Future<Output = ()>>>>> = RefCell::new(Vec::new());


pub static APP_CONTEXT: RefCell<AppContext> = RefCell::new(AppContext {
hidden_state: None,
executor: Executor::new(),
Expand Down Expand Up @@ -140,27 +136,46 @@ pub struct Executor {
tasks: Vec<Pin<Box<dyn Future<Output = ()>>>>,
}

pub fn spawn(fut: impl Future<Output = ()> + 'static) {
SPAWN_QUEUE.with(|queue| {
queue.borrow_mut().push(Box::pin(fut));
})
}

impl Executor {
pub fn new() -> Self {
Self { tasks: Vec::new() }
}

pub fn spawn(&mut self, fut: impl Future<Output = ()> + 'static) {
self.tasks.push(Box::pin(fut));
}

pub fn poll_all_tasks(&mut self) {
let mut ctx = Context::from_waker(noop_waker_ref());
let mut completed = Vec::new();
loop {
// Drain any newly spawned tasks into our task list
SPAWN_QUEUE.with(|queue| {
self.tasks.append(&mut queue.borrow_mut());
});

// Poll all tasks, collecting completed ones
let mut completed = Vec::new();
let mut ctx = Context::from_waker(noop_waker_ref());

for i in 0..self.tasks.len() {
if let Poll::Ready(()) = self.tasks[i].as_mut().poll(&mut ctx) {
completed.push(i);
}
}

for i in 0..self.tasks.len() {
if let Poll::Ready(()) = self.tasks[i].as_mut().poll(&mut ctx) {
completed.push(i);
// Remove completed tasks immediately to prevent re-polling
for idx in completed.into_iter().rev() {
let _ = self.tasks.remove(idx);
}
}

for idx in completed.into_iter().rev() {
let _ = self.tasks.remove(idx);
// Check if there are new tasks spawned during polling
let has_new_tasks = SPAWN_QUEUE.with(|queue| !queue.borrow().is_empty());

// Continue if new tasks were spawned, otherwise we're done
if !has_new_tasks {
break;
}
}
}
}
Expand Down Expand Up @@ -282,17 +297,6 @@ where
return Err(AppSendError::SendError(e));
}

#[macro_export]
macro_rules! run_async {
($($code:tt)*) => {
hyperware_process_lib::hyperapp::APP_CONTEXT.with(|ctx| {
ctx.borrow_mut().executor.spawn(async move {
$($code)*
})
})
};
}

// Enum defining the state persistance behaviour
#[derive(Clone)]
pub enum SaveOptions {
Expand Down