Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/actor/actor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::actor::{ActorAddress, Letter, SenderType};
use crate::executor::ExecutorCommands;
use crate::message::{Message, ToMessage};
use crate::system::RuntimeManagerRef;
use crate::util::CommandChannel;
use crossbeam_channel::Receiver;
use log::{trace, warn};

Expand Down Expand Up @@ -31,6 +33,14 @@ pub trait Actor: Send {
/// Receive a message. This is the primary method for handling messages and is called
/// for every message received by the actor.
fn receive(&mut self, ctx: Context, msg: Box<dyn Message>);

/// Hook called after all messages have been received and processed and before the actor is
/// removed fom the executor. This is useful for performing any cleanup that requires the
/// actor to be running.
///
/// Any messages sent to this actor after this method is called will be sent to the dead-letter
/// queue.
fn before_shutdown(&mut self, _ctx: Context) {}
}

/// ActorInit defines a method of construction for an actor that takes an initialization
Expand Down Expand Up @@ -95,6 +105,7 @@ macro_rules! debug_serialize_msg {
pub struct Context<'a> {
pub(crate) address: &'a ActorAddress,
pub(crate) runtime_manager: &'a RuntimeManagerRef,
pub(crate) executor_channel: &'a CommandChannel<ExecutorCommands>,
pub(crate) parent: &'a Option<ActorAddress>,
pub(crate) children: &'a mut Vec<ActorAddress>,
pub(crate) sender: &'a SenderType,
Expand Down Expand Up @@ -190,4 +201,10 @@ impl Context<'_> {
pub fn parent(&self) -> Option<&ActorAddress> {
self.parent.as_ref()
}

pub fn shutdown(&self) {
self.executor_channel
.send(ExecutorCommands::ShutdownActor(self.address.clone()))
.unwrap();
}
}
5 changes: 4 additions & 1 deletion src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

pub(crate) mod thread_executor;

use crate::actor::ActorCell;
use crate::actor::{ActorAddress, ActorCell};
use crate::config::ExecutorType;
use crate::system::RuntimeManagerRef;
use crate::util::CommandChannel;

pub enum ExecutorCommands {
AssignActor(ActorCell),
ShutdownActor(ActorAddress),
Shutdown,
}

Expand All @@ -24,6 +25,8 @@ pub trait ExecutorFactory {
) -> ExecutorHandle;
}

/// Executor represents a runtime responsible for processing messages on actors and handling
/// messages received from the command channel.
pub trait Executor {
fn run(self);
}
Expand Down
132 changes: 104 additions & 28 deletions src/executor/thread_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ use std::collections::HashMap;
use std::thread;
use std::time::Duration;

use crate::actor::{ActorCell, Context, SenderType, Uri};
use crate::actor::{ActorCell, Context, Letter, SenderType, Uri};
use crate::executor::{
CommandChannel, Executor, ExecutorCommands, ExecutorFactory, ExecutorHandle,
};
use crate::message::system::PoisonPill;
use crate::system::RuntimeManagerRef;

pub struct ThreadExecutorFactory {}
Expand All @@ -23,6 +24,24 @@ impl ExecutorFactory for ThreadExecutorFactory {
}
}

// Macro for quickly constructing a context object within the thread executor. The construction
// of the context almost always looks the same, just some slight differences with the sender.
macro_rules! context {
($self:tt, $cell:tt, $sender:path) => {
context!($self, $cell, ($sender))
};
($self:tt, $cell:tt, $sender:expr) => {
Context {
address: &$cell.address,
runtime_manager: &$self.runtime_manager,
executor_channel: &$self.command_channel,
parent: &$cell.parent,
children: &mut $cell.children,
sender: &$sender,
}
};
}

/// thread-local executor responsible for processing messages on actors
struct ThreadExecutor {
// Name of the executor, which is part of the address to actors and used
Expand Down Expand Up @@ -63,55 +82,112 @@ impl ThreadExecutor {
panic!("Actor name {} already exists", address);
}
}

/// Process a system message for a given actor.
///
/// System messages have special handling logic that that involves the executor, so they
/// can't be passed on to the actor in the traditional fashion. Depending on the message,
/// they still may be forwarded to the actor.
fn process_system_message(letter: Letter, cell: &mut ActorCell, context: Context) {
if let Some(_) = letter.payload.as_any().downcast_ref::<PoisonPill>() {
trace!(
"received poison pill for {}. Calling shutdown hook",
&cell.address.uri
);
cell.actor.before_shutdown(context);
// TODO: Signal shutdown to the execute
} else {
// System messages should always be known as only messages defined by the crate
// can be system messages *and* system messages cannot be sent remotely.
panic!("Unknown system message type: {:?}", letter.payload);
}
}

fn shutdown_actor(&mut self, uri: &Uri) {
if let Some(mut cell) = self.actor_cells.remove(&uri) {
trace!("calling before_shutdown for actor {}", &uri);
// TODO: Send the remaining messages in the mailbox to the dead letter queue
// TODO: Figure out how to redirect all Sender<Letter> handles to the dead letter queue
cell.actor
.before_shutdown(context!(self, cell, SenderType::System));
}
}

fn assign_actor(&mut self, mut cell: ActorCell) {
debug!("received actor assignment for {}", &cell.address.uri);
self.assert_unique_address(&cell.address.uri);
trace!("calling before_start for actor {}", &cell.address.uri);
cell.actor
.before_start(context!(self, cell, SenderType::System));
self.actor_cells.insert(cell.address.uri.clone(), cell);
}
}

impl Executor for ThreadExecutor {
fn run(mut self) {
const SLEEP_DURATION_MS: u64 = 1;

loop {
// Handle executor commands before handling any actor messages. Generally it is expected
// to have very few of these per loop.
if !self.command_channel.recv_is_empty() {
match self.command_channel.recv().unwrap() {
ExecutorCommands::AssignActor(mut cell) => {
debug!("received actor assignment for {}", &cell.address.uri);
self.assert_unique_address(&cell.address.uri);
trace!("calling before_start for actor {}", &cell.address.uri);
cell.actor.before_start(Context {
address: &cell.address,
runtime_manager: &self.runtime_manager,
parent: &cell.parent,
children: &mut cell.children,
sender: &SenderType::System,
});
self.actor_cells.insert(cell.address.uri.clone(), cell);
ExecutorCommands::AssignActor(cell) => {
self.assign_actor(cell);
}
ExecutorCommands::ShutdownActor(address) => {
self.shutdown_actor(&address.uri);
}
ExecutorCommands::Shutdown => {
info!("received shutdown command");
// Break to exit the main 'loop' in the run function
break;
}
}
}
let mut messages_processed = 0;
// Iterate over the actor-cells and check if there are any non-empty mailboxes.
// If one is found, process a message from it.
for (_, cell) in self.actor_cells.iter_mut() {
// If one is found, process a single message from it. This maintains fairness
// amongst message processing, but may result in large amounts of waste if there
// are a high number of mostly idle actors.
let cells_iter = self.actor_cells.iter_mut();
for (_, cell) in cells_iter {
if !cell.mailbox.is_empty() {
let result = cell.mailbox.try_recv();
if let Ok(letter) = result {
trace!("[{}] processing message: {:?}", &cell.address, &letter);
cell.actor.receive(
Context {
address: &cell.address,
runtime_manager: &self.runtime_manager,
parent: &cell.parent,
children: &mut cell.children,
sender: &letter.sender,
},
letter.payload,
);
messages_processed += 1;
// If the letter is a system message, perform any necessary pre-processing
// of the message before (potentially) passing it on to the actor.
if letter
.payload
.is_system_message(&crate::message::private::Local::Value)
{
trace!(
"[{}] processing system message: {:?}",
&cell.address,
&letter
);
// self.process_system_message(letter, cell);
// TODO: Mark the cell as unable to receive/process new messages (???)
// TODO: Send shutdown signals to all children
// TODO: Wait for exit signals from children
}
// For all non-system messages, pass the message directly to the actor
else {
trace!("[{}] processing message: {:?}", &cell.address, &letter);
cell.actor
.receive(context!(self, cell, letter.sender), letter.payload);
}
}
}
}
trace!("nothing to do, sleeping...");
thread::sleep(Duration::from_millis(SLEEP_DURATION_MS));

// Inject a small sleep in the thread executor if a loop resulted in no work being
// performed. Otherwise the executor will spin at 100% CPU usage.
if (messages_processed == 0) {
trace!("nothing to do, sleeping...");
thread::sleep(Duration::from_millis(SLEEP_DURATION_MS));
}
}

self.runtime_manager.notify_shutdown(self.name);
Expand Down
13 changes: 12 additions & 1 deletion src/message/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Core message types used by Busan and primitive type wrappers

pub mod common_types;
pub mod system;

pub trait Message: prost::Message {
fn as_any(&self) -> &dyn std::any::Any;
Expand All @@ -20,6 +21,14 @@ pub trait Message: prost::Message {
fn encoded_len(&self) -> usize {
prost::Message::encoded_len(self)
}

/// Returns true if this message is a system message. This method takes a ref
/// to a private Local enum, which makes this callable _only_ from within the
/// busan crate and _not_ implementable outside of it.
#[doc(hidden)]
fn is_system_message(&self, _local: &private::Local) -> bool {
false
}
}

pub trait ToMessage<M: Message> {
Expand All @@ -43,7 +52,9 @@ impl<M: Message> ToMessage<M> for M {
*/
pub(crate) mod private {
#[doc(hidden)]
pub enum Local {}
pub enum Local {
Value,
}
#[doc(hidden)]
pub trait IsLocal {}
impl IsLocal for Local {}
Expand Down
26 changes: 26 additions & 0 deletions src/message/system.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//! System messages that have special properties

use prost::DecodeError;
use std::any::Any;

#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, prost::Message)]
pub struct PoisonPill {}

impl super::Message for PoisonPill {
fn as_any(&self) -> &dyn Any {
return self;
}

fn encode_to_vec2(&self) -> Vec<u8> {
prost::Message::encode_to_vec(self)
}

fn merge2(&mut self, buf: &[u8]) -> Result<(), DecodeError> {
prost::Message::merge(self, buf)
}

fn is_system_message(&self, _local: &super::private::Local) -> bool {
true
}
}