Skip to content

Commit

Permalink
First version of the system spawn builder pattern (#41)
Browse files Browse the repository at this point in the history
* First version of the system spawn builder pattern

* Rename 'run_on_main' to 'block_on'

* Add documentation for the 'SpawnBuilder' API

* 'block_on' was not quite the right name, use 'run' instead

* Rename 'run' to 'run_and_block'
  • Loading branch information
bschwind authored Feb 26, 2021
1 parent 88670a6 commit cdbc5e8
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 42 deletions.
2 changes: 1 addition & 1 deletion benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fn make_chain(num_actors: usize) -> (System, Addr<ChainLink>, Addr<ChainLink>) {

fn run_chain((mut system, start, end): (System, Addr<ChainLink>, Addr<ChainLink>)) {
start.send(true).unwrap();
system.run_on_main(ChainLink { next: None }, end).unwrap();
system.prepare(ChainLink { next: None }).with_addr(end).run_and_block().unwrap();
}

fn criterion_benchmark(c: &mut Criterion) {
Expand Down
10 changes: 5 additions & 5 deletions examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ fn main() -> Result<(), Error> {

// Start creating actors. Because actors "point forward", start with the last one.
// Set larger message channel capacity for Output actor for some cushion.
let output_addr = system.spawn_with_capacity(Output, 60)?;
let output_addr = system.prepare(Output).with_capacity(60).spawn()?;

// Create Mixer address explicitly in order to break the circular dependency loop.
let mixer_addr = Addr::<Mixer>::default();
Expand All @@ -237,10 +237,10 @@ fn main() -> Result<(), Error> {
let delay_addr = system.spawn(Delay::new(damper_addr.recipient()))?;

// We can finally spawn the Mixer. Feeds into Output and Delay effect.
system.spawn_fn_with_addr(
move || Mixer::new(output_addr.recipient(), delay_addr.recipient()),
mixer_addr.clone(),
)?;
system
.prepare_fn(move || Mixer::new(output_addr.recipient(), delay_addr.recipient()))
.with_addr(mixer_addr.clone())
.spawn()?;

// Input feeds into Mixer.
let input_addr = system.spawn(Input { next: mixer_addr.recipient() })?;
Expand Down
2 changes: 1 addition & 1 deletion examples/media_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ fn main() -> Result<(), Error> {
// The display actor may spawn an OS window which in some cases must run
// on the main application thread.
let display_actor = VideoDisplayActor::new();
system.run_on_main(display_actor, display_addr)?;
system.prepare(display_actor).with_addr(display_addr).run_and_block()?;

Ok(())
}
97 changes: 62 additions & 35 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,51 @@ pub struct Context<A: Actor + ?Sized> {
pub myself: Addr<A>,
}

/// A builder for specifying how to spawn an [`Actor`].
/// You can specify your own [`Addr`] for the Actor,
/// the capacity of the Actor's inbox, and you can specify
/// whether to spawn the Actor into its own thread or block
/// on the current calling thread.
#[must_use = "You must call .spawn() or .block_on() to run this actor"]
pub struct SpawnBuilder<'a, A: Actor> {
system: &'a mut System,
capacity: Option<usize>,
addr: Option<Addr<A>>,
factory: Box<dyn FnOnce() -> A + Send + 'static>,
}

impl<'a, A: 'static + Actor> SpawnBuilder<'a, A> {
/// Specify a capacity for the actor's receiving channel.
pub fn with_capacity(self, capacity: usize) -> Self {
Self { capacity: Some(capacity), ..self }
}

/// Specify an existing [`Addr`] to use with this Actor.
pub fn with_addr(self, addr: Addr<A>) -> Self {
Self { addr: Some(addr), ..self }
}

/// Spawn this Actor into a new thread managed by the [`System`].
pub fn spawn(self) -> Result<Addr<A>, ActorError> {
let factory = self.factory;
let capacity = self.capacity.unwrap_or(MAX_CHANNEL_BLOAT);
let addr = self.addr.unwrap_or_else(|| Addr::with_capacity(capacity));

self.system.spawn_fn_with_addr(factory, addr.clone()).map(move |_| addr)
}

/// Run this Actor on the current calling thread. This is a
/// blocking call. This function will exit when the Actor
/// has stopped.
pub fn run_and_block(self) -> Result<(), ActorError> {
let factory = self.factory;
let capacity = self.capacity.unwrap_or(MAX_CHANNEL_BLOAT);
let addr = self.addr.unwrap_or_else(|| Addr::with_capacity(capacity));

self.system.block_on(factory(), addr)
}
}

impl System {
/// Creates a new System with a given name.
pub fn new(name: &str) -> Self {
Expand All @@ -159,59 +204,41 @@ impl System {
}
}

/// Spawn a normal [`Actor`] in the system.
pub fn spawn<A>(&mut self, actor: A) -> Result<Addr<A>, ActorError>
/// Prepare an actor to be spawned. Returns a [`SpawnBuilder`]
/// which can be used to customize the spawning of the actor.
pub fn prepare<A>(&mut self, actor: A) -> SpawnBuilder<A>
where
A: Actor + Send + 'static,
{
self.spawn_fn(move || actor)
SpawnBuilder { system: self, capacity: None, addr: None, factory: Box::new(move || actor) }
}

/// Spawn a normal [`Actor`] in the system, with non-default capacity for its input channel.
pub fn spawn_with_capacity<A>(
&mut self,
actor: A,
capacity: usize,
) -> Result<Addr<A>, ActorError>
/// Similar to `prepare`, but an actor factory is passed instead
/// of an [`Actor`] itself. This is used when an actor needs to be
/// created on its own thread instead of the calling thread.
/// Returns a [`SpawnBuilder`] which can be used to customize the
/// spawning of the actor.
pub fn prepare_fn<A, F>(&mut self, factory: F) -> SpawnBuilder<A>
where
A: Actor + Send + 'static,
{
self.spawn_fn_with_capacity(move || actor, capacity)
}

/// Spawn a normal Actor in the system, using a factory that produces an [`Actor`].
///
/// This method is useful if your actor does not implement [`Send`], since it can create
/// the struct directly within the thread.
pub fn spawn_fn<F, A>(&mut self, factory: F) -> Result<Addr<A>, ActorError>
where
F: FnOnce() -> A + Send + 'static,
A: Actor + 'static,
{
self.spawn_fn_with_capacity(factory, MAX_CHANNEL_BLOAT)
SpawnBuilder { system: self, capacity: None, addr: None, factory: Box::new(factory) }
}

/// Spawn a normal Actor in the system, using a factory that produces an [`Actor`],
/// with non-default capacity for its input channel. See [`System::spawn_fn()`].
pub fn spawn_fn_with_capacity<F, A>(
&mut self,
factory: F,
capacity: usize,
) -> Result<Addr<A>, ActorError>
/// Spawn a normal [`Actor`] in the system, returning its address when successful.
pub fn spawn<A>(&mut self, actor: A) -> Result<Addr<A>, ActorError>
where
F: FnOnce() -> A + Send + 'static,
A: Actor + 'static,
A: Actor + Send + 'static,
{
let addr = Addr::<A>::with_capacity(capacity);
self.spawn_fn_with_addr(factory, addr.clone())?;
Ok(addr)
self.prepare(actor).spawn()
}

/// Spawn a normal Actor in the system, using a factory that produces an [`Actor`],
/// and an address that will be assigned to the Actor.
///
/// This method is useful if you need to model circular dependencies between `Actor`s.
pub fn spawn_fn_with_addr<F, A>(&mut self, factory: F, addr: Addr<A>) -> Result<(), ActorError>
fn spawn_fn_with_addr<F, A>(&mut self, factory: F, addr: Addr<A>) -> Result<(), ActorError>
where
F: FnOnce() -> A + Send + 'static,
A: Actor + 'static,
Expand Down Expand Up @@ -267,7 +294,7 @@ impl System {

/// Takes an actor and its address and runs it on the calling thread. This function
/// will exit once the actor has stopped.
pub fn run_on_main<A>(&mut self, mut actor: A, addr: Addr<A>) -> Result<(), ActorError>
fn block_on<A>(&mut self, mut actor: A, addr: Addr<A>) -> Result<(), ActorError>
where
A: Actor,
{
Expand Down

0 comments on commit cdbc5e8

Please sign in to comment.