From a04196f3e0e1608863d07e533162d49b677f89e0 Mon Sep 17 00:00:00 2001 From: John Murray Date: Fri, 19 May 2023 17:03:10 -0400 Subject: [PATCH 1/4] shutdown hook - stub --- src/actor/actor.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/actor/actor.rs b/src/actor/actor.rs index 9b21d8d..a366938 100644 --- a/src/actor/actor.rs +++ b/src/actor/actor.rs @@ -31,6 +31,14 @@ pub trait Actor: Send { /// Receive a message. This is the primary method for handling messages and is called /// for every message received by the actor. fn receive(&mut self, ctx: Context, msg: Box); + + /// Hook called after all messages have been received and processed and before the actor is + /// removed fom the executor. This is useful for performing any cleanup that requires the + /// actor to be running. + /// + /// Any messages sent to this actor after this method is called will be sent to the dead-letter + /// queue. + fn before_shutdown(&mut self, _ctx: Context) {} } /// ActorInit defines a method of construction for an actor that takes an initialization From d5697d3a963354d57305098a07f71397e52e8351 Mon Sep 17 00:00:00 2001 From: John Murray Date: Mon, 22 May 2023 22:41:32 -0400 Subject: [PATCH 2/4] rough cut at synchronous shutdown support --- src/actor/actor.rs | 9 +++++++++ src/executor/mod.rs | 5 ++++- src/executor/thread_executor.rs | 17 +++++++++++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/src/actor/actor.rs b/src/actor/actor.rs index a366938..8651dc8 100644 --- a/src/actor/actor.rs +++ b/src/actor/actor.rs @@ -1,6 +1,8 @@ use crate::actor::{ActorAddress, Letter, SenderType}; +use crate::executor::ExecutorCommands; use crate::message::{Message, ToMessage}; use crate::system::RuntimeManagerRef; +use crate::util::CommandChannel; use crossbeam_channel::Receiver; use log::{trace, warn}; @@ -103,6 +105,7 @@ macro_rules! debug_serialize_msg { pub struct Context<'a> { pub(crate) address: &'a ActorAddress, pub(crate) runtime_manager: &'a RuntimeManagerRef, + pub(crate) executor_channel: &'a CommandChannel, pub(crate) parent: &'a Option, pub(crate) children: &'a mut Vec, pub(crate) sender: &'a SenderType, @@ -198,4 +201,10 @@ impl Context<'_> { pub fn parent(&self) -> Option<&ActorAddress> { self.parent.as_ref() } + + pub fn shutdown(&self) { + self.executor_channel + .send(ExecutorCommands::ShutdownActor(self.address.clone())) + .unwrap(); + } } diff --git a/src/executor/mod.rs b/src/executor/mod.rs index 64e1eaa..3998173 100644 --- a/src/executor/mod.rs +++ b/src/executor/mod.rs @@ -2,13 +2,14 @@ pub(crate) mod thread_executor; -use crate::actor::ActorCell; +use crate::actor::{ActorAddress, ActorCell}; use crate::config::ExecutorType; use crate::system::RuntimeManagerRef; use crate::util::CommandChannel; pub enum ExecutorCommands { AssignActor(ActorCell), + ShutdownActor(ActorAddress), Shutdown, } @@ -24,6 +25,8 @@ pub trait ExecutorFactory { ) -> ExecutorHandle; } +/// Executor represents a runtime responsible for processing messages on actors and handling +/// messages received from the command channel. pub trait Executor { fn run(self); } diff --git a/src/executor/thread_executor.rs b/src/executor/thread_executor.rs index 94660c8..665e10c 100644 --- a/src/executor/thread_executor.rs +++ b/src/executor/thread_executor.rs @@ -78,12 +78,28 @@ impl Executor for ThreadExecutor { cell.actor.before_start(Context { address: &cell.address, runtime_manager: &self.runtime_manager, + executor_channel: &self.command_channel, parent: &cell.parent, children: &mut cell.children, sender: &SenderType::System, }); self.actor_cells.insert(cell.address.uri.clone(), cell); } + ExecutorCommands::ShutdownActor(address) => { + trace!("calling before_shutdown for actor {}", &address.uri); + // TODO: Send the remaining messages in the mailbox to the dead letter queue + // TODO: Figure out how to redirect all Sender handles to the dead letter queue + if let Some(mut cell) = self.actor_cells.remove(&address.uri) { + cell.actor.before_shutdown(Context { + address: &cell.address, + runtime_manager: &self.runtime_manager, + executor_channel: &self.command_channel, + parent: &cell.parent, + children: &mut cell.children, + sender: &SenderType::System, + }); + } + } ExecutorCommands::Shutdown => { info!("received shutdown command"); break; @@ -101,6 +117,7 @@ impl Executor for ThreadExecutor { Context { address: &cell.address, runtime_manager: &self.runtime_manager, + executor_channel: &self.command_channel, parent: &cell.parent, children: &mut cell.children, sender: &letter.sender, From 12457be41df6ffcbf1e04ff916549bbcfdcfeb4e Mon Sep 17 00:00:00 2001 From: John Murray Date: Mon, 29 May 2023 17:40:47 -0400 Subject: [PATCH 3/4] PoisonPill message --- src/message/mod.rs | 6 ++++++ src/message/system.rs | 25 +++++++++++++++++++++++++ 2 files changed, 31 insertions(+) create mode 100644 src/message/system.rs diff --git a/src/message/mod.rs b/src/message/mod.rs index e8452cc..be6f280 100644 --- a/src/message/mod.rs +++ b/src/message/mod.rs @@ -1,6 +1,7 @@ //! Core message types used by Busan and primitive type wrappers pub mod common_types; +pub mod system; pub trait Message: prost::Message { fn as_any(&self) -> &dyn std::any::Any; @@ -20,6 +21,11 @@ pub trait Message: prost::Message { fn encoded_len(&self) -> usize { prost::Message::encoded_len(self) } + + #[doc(hidden)] + fn is_system_message(&self) -> bool { + false + } } pub trait ToMessage { diff --git a/src/message/system.rs b/src/message/system.rs new file mode 100644 index 0000000..d5824a8 --- /dev/null +++ b/src/message/system.rs @@ -0,0 +1,25 @@ +//! System messages that have special properties + +use prost::DecodeError; +use std::any::Any; + +#[derive(prost::Message)] +pub struct PoisonPill {} + +impl super::Message for PoisonPill { + fn as_any(&self) -> &dyn Any { + return self; + } + + fn encode_to_vec2(&self) -> Vec { + prost::Message::encode_to_vec(self) + } + + fn merge2(&mut self, buf: &[u8]) -> Result<(), DecodeError> { + prost::Message::merge(self, buf) + } + + fn is_system_message(&self) -> bool { + true + } +} From d6140cd749cf6de83b6713a392aa6d95f3524416 Mon Sep 17 00:00:00 2001 From: John Murray Date: Mon, 26 Jun 2023 18:36:30 -0400 Subject: [PATCH 4/4] wip --- src/executor/thread_executor.rs | 145 ++++++++++++++++++++++---------- src/message/mod.rs | 9 +- src/message/system.rs | 5 +- 3 files changed, 112 insertions(+), 47 deletions(-) diff --git a/src/executor/thread_executor.rs b/src/executor/thread_executor.rs index 665e10c..634e74e 100644 --- a/src/executor/thread_executor.rs +++ b/src/executor/thread_executor.rs @@ -3,10 +3,11 @@ use std::collections::HashMap; use std::thread; use std::time::Duration; -use crate::actor::{ActorCell, Context, SenderType, Uri}; +use crate::actor::{ActorCell, Context, Letter, SenderType, Uri}; use crate::executor::{ CommandChannel, Executor, ExecutorCommands, ExecutorFactory, ExecutorHandle, }; +use crate::message::system::PoisonPill; use crate::system::RuntimeManagerRef; pub struct ThreadExecutorFactory {} @@ -23,6 +24,24 @@ impl ExecutorFactory for ThreadExecutorFactory { } } +// Macro for quickly constructing a context object within the thread executor. The construction +// of the context almost always looks the same, just some slight differences with the sender. +macro_rules! context { + ($self:tt, $cell:tt, $sender:path) => { + context!($self, $cell, ($sender)) + }; + ($self:tt, $cell:tt, $sender:expr) => { + Context { + address: &$cell.address, + runtime_manager: &$self.runtime_manager, + executor_channel: &$self.command_channel, + parent: &$cell.parent, + children: &mut $cell.children, + sender: &$sender, + } + }; +} + /// thread-local executor responsible for processing messages on actors struct ThreadExecutor { // Name of the executor, which is part of the address to actors and used @@ -63,72 +82,112 @@ impl ThreadExecutor { panic!("Actor name {} already exists", address); } } + + /// Process a system message for a given actor. + /// + /// System messages have special handling logic that that involves the executor, so they + /// can't be passed on to the actor in the traditional fashion. Depending on the message, + /// they still may be forwarded to the actor. + fn process_system_message(letter: Letter, cell: &mut ActorCell, context: Context) { + if let Some(_) = letter.payload.as_any().downcast_ref::() { + trace!( + "received poison pill for {}. Calling shutdown hook", + &cell.address.uri + ); + cell.actor.before_shutdown(context); + // TODO: Signal shutdown to the execute + } else { + // System messages should always be known as only messages defined by the crate + // can be system messages *and* system messages cannot be sent remotely. + panic!("Unknown system message type: {:?}", letter.payload); + } + } + + fn shutdown_actor(&mut self, uri: &Uri) { + if let Some(mut cell) = self.actor_cells.remove(&uri) { + trace!("calling before_shutdown for actor {}", &uri); + // TODO: Send the remaining messages in the mailbox to the dead letter queue + // TODO: Figure out how to redirect all Sender handles to the dead letter queue + cell.actor + .before_shutdown(context!(self, cell, SenderType::System)); + } + } + + fn assign_actor(&mut self, mut cell: ActorCell) { + debug!("received actor assignment for {}", &cell.address.uri); + self.assert_unique_address(&cell.address.uri); + trace!("calling before_start for actor {}", &cell.address.uri); + cell.actor + .before_start(context!(self, cell, SenderType::System)); + self.actor_cells.insert(cell.address.uri.clone(), cell); + } } + impl Executor for ThreadExecutor { fn run(mut self) { const SLEEP_DURATION_MS: u64 = 1; loop { + // Handle executor commands before handling any actor messages. Generally it is expected + // to have very few of these per loop. if !self.command_channel.recv_is_empty() { match self.command_channel.recv().unwrap() { - ExecutorCommands::AssignActor(mut cell) => { - debug!("received actor assignment for {}", &cell.address.uri); - self.assert_unique_address(&cell.address.uri); - trace!("calling before_start for actor {}", &cell.address.uri); - cell.actor.before_start(Context { - address: &cell.address, - runtime_manager: &self.runtime_manager, - executor_channel: &self.command_channel, - parent: &cell.parent, - children: &mut cell.children, - sender: &SenderType::System, - }); - self.actor_cells.insert(cell.address.uri.clone(), cell); + ExecutorCommands::AssignActor(cell) => { + self.assign_actor(cell); } ExecutorCommands::ShutdownActor(address) => { - trace!("calling before_shutdown for actor {}", &address.uri); - // TODO: Send the remaining messages in the mailbox to the dead letter queue - // TODO: Figure out how to redirect all Sender handles to the dead letter queue - if let Some(mut cell) = self.actor_cells.remove(&address.uri) { - cell.actor.before_shutdown(Context { - address: &cell.address, - runtime_manager: &self.runtime_manager, - executor_channel: &self.command_channel, - parent: &cell.parent, - children: &mut cell.children, - sender: &SenderType::System, - }); - } + self.shutdown_actor(&address.uri); } ExecutorCommands::Shutdown => { info!("received shutdown command"); + // Break to exit the main 'loop' in the run function break; } } } + let mut messages_processed = 0; // Iterate over the actor-cells and check if there are any non-empty mailboxes. - // If one is found, process a message from it. - for (_, cell) in self.actor_cells.iter_mut() { + // If one is found, process a single message from it. This maintains fairness + // amongst message processing, but may result in large amounts of waste if there + // are a high number of mostly idle actors. + let cells_iter = self.actor_cells.iter_mut(); + for (_, cell) in cells_iter { if !cell.mailbox.is_empty() { let result = cell.mailbox.try_recv(); if let Ok(letter) = result { - trace!("[{}] processing message: {:?}", &cell.address, &letter); - cell.actor.receive( - Context { - address: &cell.address, - runtime_manager: &self.runtime_manager, - executor_channel: &self.command_channel, - parent: &cell.parent, - children: &mut cell.children, - sender: &letter.sender, - }, - letter.payload, - ); + messages_processed += 1; + // If the letter is a system message, perform any necessary pre-processing + // of the message before (potentially) passing it on to the actor. + if letter + .payload + .is_system_message(&crate::message::private::Local::Value) + { + trace!( + "[{}] processing system message: {:?}", + &cell.address, + &letter + ); + // self.process_system_message(letter, cell); + // TODO: Mark the cell as unable to receive/process new messages (???) + // TODO: Send shutdown signals to all children + // TODO: Wait for exit signals from children + } + // For all non-system messages, pass the message directly to the actor + else { + trace!("[{}] processing message: {:?}", &cell.address, &letter); + cell.actor + .receive(context!(self, cell, letter.sender), letter.payload); + } } } } - trace!("nothing to do, sleeping..."); - thread::sleep(Duration::from_millis(SLEEP_DURATION_MS)); + + // Inject a small sleep in the thread executor if a loop resulted in no work being + // performed. Otherwise the executor will spin at 100% CPU usage. + if (messages_processed == 0) { + trace!("nothing to do, sleeping..."); + thread::sleep(Duration::from_millis(SLEEP_DURATION_MS)); + } } self.runtime_manager.notify_shutdown(self.name); diff --git a/src/message/mod.rs b/src/message/mod.rs index be6f280..1cd9549 100644 --- a/src/message/mod.rs +++ b/src/message/mod.rs @@ -22,8 +22,11 @@ pub trait Message: prost::Message { prost::Message::encoded_len(self) } + /// Returns true if this message is a system message. This method takes a ref + /// to a private Local enum, which makes this callable _only_ from within the + /// busan crate and _not_ implementable outside of it. #[doc(hidden)] - fn is_system_message(&self) -> bool { + fn is_system_message(&self, _local: &private::Local) -> bool { false } } @@ -49,7 +52,9 @@ impl ToMessage for M { */ pub(crate) mod private { #[doc(hidden)] - pub enum Local {} + pub enum Local { + Value, + } #[doc(hidden)] pub trait IsLocal {} impl IsLocal for Local {} diff --git a/src/message/system.rs b/src/message/system.rs index d5824a8..f6fed1d 100644 --- a/src/message/system.rs +++ b/src/message/system.rs @@ -3,7 +3,8 @@ use prost::DecodeError; use std::any::Any; -#[derive(prost::Message)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, prost::Message)] pub struct PoisonPill {} impl super::Message for PoisonPill { @@ -19,7 +20,7 @@ impl super::Message for PoisonPill { prost::Message::merge(self, buf) } - fn is_system_message(&self) -> bool { + fn is_system_message(&self, _local: &super::private::Local) -> bool { true } }