Skip to content

Commit

Permalink
Support creating differently-typed recipients in Addr::recipient()
Browse files Browse the repository at this point in the history
...within bounds that that M: Into<N> for Recipient<M> for Actor<Message = N>.

Fixes #11 (in its narrow sense) without creating multiple channels per actor
and without boxing the messages transferred.

Has been made possible by removal of the ability to retrieve back the
failed-to-send message in one of the earlier commits.

The whole trick is to box crossbeam `Sender<M>` into `Arc<dyn SenderTrait<M>>`
in `Receiver`, and implementing `SenderTrait<M>` for crossbeam `Sender` and for
boxed version of itself (!), second time with ability to convert `M`.

All generic parameters in the echo example have disappeared without losing any
flexibility, yay!

Speaking strict semver, this is an API-breaking change, but e.g. the
media_pipeline compiles and works unchanged.

v2: make the change smaller and simpler by not messing up with
`GenericReceiver`. Enables nicer API and keeping `SenderTrait` private, at the
small expense of one extra boxing when creating `Addr`, and one extra pointer
indirection when calling send() family of functions.
All that thanks to @skywhale's clever question.
  • Loading branch information
strohel committed Feb 26, 2021
1 parent d65bacf commit c17a353
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 30 deletions.
34 changes: 17 additions & 17 deletions examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ fn silence_chunk() -> Chunk {
}

/// Actor to read and decode input stream (stdin) and produce sound [`DryChunk`]s.
struct Input<M> {
next: Recipient<M>,
struct Input {
next: Recipient<DryChunk>,
}

impl<M: From<DryChunk>> Actor for Input<M> {
impl Actor for Input {
type Error = Error;
type Message = ReadNext;

Expand Down Expand Up @@ -147,15 +147,15 @@ impl From<WetChunk> for MixerInput {

/// Audio mixer actor. Mixes 2 inputs (dry, wet) together, provides 2 equal outputs.
/// Consumer either [`DryChunk`]s or [`WetChunk`]s and produces [`Chunk`]s.
struct Mixer<M1, M2> {
out_1: Recipient<M1>,
out_2: Recipient<M2>,
struct Mixer {
out_1: Recipient<Chunk>,
out_2: Recipient<Chunk>,
dry_buffer: Option<DryChunk>,
wet_buffer: Option<WetChunk>,
}

impl<M1, M2> Mixer<M1, M2> {
fn new(out_1: Recipient<M1>, out_2: Recipient<M2>) -> Self {
impl Mixer {
fn new(out_1: Recipient<Chunk>, out_2: Recipient<Chunk>) -> Self {
// Start with buffers filled, so that output is produced right for the first message.
Self {
out_1,
Expand All @@ -166,7 +166,7 @@ impl<M1, M2> Mixer<M1, M2> {
}
}

impl<M1: From<Chunk>, M2: From<Chunk>> Actor for Mixer<M1, M2> {
impl Actor for Mixer {
type Error = Error;
type Message = MixerInput;

Expand Down Expand Up @@ -203,20 +203,20 @@ impl<M1: From<Chunk>, M2: From<Chunk>> Actor for Mixer<M1, M2> {

/// Delay audio effect actor. Technically just a fixed circular buffer.
/// Consumes [`Chunk`]s and produces [`WetChunk`]s.
struct Delay<M> {
next: Recipient<M>,
struct Delay {
next: Recipient<WetChunk>,
buffer: Vec<Chunk>,
index: usize,
}

impl<M> Delay<M> {
fn new(next: Recipient<M>) -> Self {
impl Delay {
fn new(next: Recipient<WetChunk>) -> Self {
let buffer: Vec<Chunk> = repeat(silence_chunk()).take(DELAY_CHUNKS).collect();
Self { next, buffer, index: 0 }
}
}

impl<M: From<WetChunk>> Actor for Delay<M> {
impl Actor for Delay {
type Error = Error;
type Message = Chunk;

Expand All @@ -236,11 +236,11 @@ impl<M: From<WetChunk>> Actor for Delay<M> {
}

/// Audio damper actor. Attenuates audio level a bit. Consumes [`Chunk`]s and produces [`WetChunk`]s.
struct Damper<M> {
next: Recipient<M>,
struct Damper {
next: Recipient<WetChunk>,
}

impl<M: From<WetChunk>> Actor for Damper<M> {
impl Actor for Damper {
type Error = Error;
type Message = Chunk;

Expand Down
74 changes: 61 additions & 13 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,8 @@ pub enum Control {
/// The base actor trait.
pub trait Actor {
/// The expected type of a message to be received.
type Message: Send;
// 'static required to create trait object in Addr, https://stackoverflow.com/q/29740488/4345715
type Message: Send + 'static;
/// The type to return on error in the handle method.
type Error: std::fmt::Debug;

Expand Down Expand Up @@ -610,6 +611,7 @@ impl<A: Actor> Addr<A> {
let (message_tx, message_rx) = channel::bounded::<A::Message>(capacity);
let (control_tx, control_rx) = channel::bounded(MAX_CHANNEL_BLOAT);

let message_tx = Arc::new(message_tx);
Self {
recipient: Recipient { actor_name: A::name().into(), message_tx, control_tx },
message_rx,
Expand All @@ -619,16 +621,22 @@ impl<A: Actor> Addr<A> {

/// "Genericize" an address to, rather than point to a specific actor,
/// be applicable to any actor that handles a given message-response type.
pub fn recipient(&self) -> Recipient<A::Message> {
self.recipient.clone()
/// Allows you to create recipient not only of `A::Message`, but of any `M: Into<A::Message>`.
pub fn recipient<M: Into<A::Message>>(&self) -> Recipient<M> {
Recipient {
actor_name: A::name().into(),
// Each level of boxing adds one .into() call, so box here to convert A::Message to M.
message_tx: Arc::new(self.recipient.message_tx.clone()),
control_tx: self.recipient.control_tx.clone(),
}
}
}

/// Similar to `Addr`, but rather than pointing to a specific actor,
/// Similar to [`Addr`], but rather than pointing to a specific actor,
/// it is typed for any actor that handles a given message-response type.
pub struct Recipient<M> {
actor_name: String,
message_tx: Sender<M>,
message_tx: Arc<dyn SenderTrait<M>>,
control_tx: Sender<Control>,
}

Expand All @@ -647,15 +655,15 @@ impl<M> Clone for Recipient<M> {
impl<M> Recipient<M> {
/// Non-blocking call to send a message. Use this if you need to react when
/// the channel is full.
pub fn try_send<N: Into<M>>(&self, message: N) -> Result<(), SendError> {
self.message_tx.try_send(message.into()).map_err(SendError::from)
pub fn try_send(&self, message: M) -> Result<(), SendError> {
self.message_tx.try_send(message).map_err(SendError::from)
}

/// Non-blocking call to send a message. Use this if there is nothing you can
/// do when the channel is full. The method still logs a warning for you in
/// that case.
pub fn send<N: Into<M>>(&self, message: N) -> Result<(), SendError> {
let result = self.try_send(message.into());
pub fn send(&self, message: M) -> Result<(), SendError> {
let result = self.try_send(message);
if let Err(SendError::Full) = &result {
trace!("[{}] dropped message (channel bloat)", self.actor_name);
return Ok(());
Expand All @@ -665,14 +673,14 @@ impl<M> Recipient<M> {

/// Non-blocking call to send a message. Use this if you do not care if
/// messages are being dropped.
pub fn send_quiet<N: Into<M>>(&self, message: N) {
let _ = self.try_send(message.into());
pub fn send_quiet(&self, message: M) {
let _ = self.try_send(message);
}

/// Non-blocking call to send a message. Use if you expect the channel to be
/// frequently full (slow consumer), but would still like to be notified if a
/// different error occurs (e.g. disconnection).
pub fn send_if_not_full<N: Into<M>>(&self, message: N) -> Result<(), SendError> {
pub fn send_if_not_full(&self, message: M) -> Result<(), SendError> {
if self.remaining_capacity().unwrap_or(usize::max_value()) > 1 {
return self.send(message);
}
Expand All @@ -686,14 +694,54 @@ impl<M> Recipient<M> {

// TODO(ryo): Properly support the concept of priority channels.
pub fn remaining_capacity(&self) -> Option<usize> {
self.message_tx.capacity().map(|capacity| capacity - self.message_tx.len())
let message_tx = &self.message_tx as &dyn SenderTrait<M>;
message_tx.capacity().map(|capacity| capacity - message_tx.len())
}

pub fn control_addr(&self) -> ControlAddr {
ControlAddr { control_tx: self.control_tx.clone() }
}
}

/// Internal trait to generalize over [`Sender`].
trait SenderTrait<M>: Send + Sync {
fn try_send(&self, message: M) -> Result<(), SendError>;

fn len(&self) -> usize;

fn capacity(&self) -> Option<usize>;
}

/// [`SenderTrait`] is implemented for concrete crossbeam [`Sender`].
impl<M: Send> SenderTrait<M> for Sender<M> {
fn try_send(&self, message: M) -> Result<(), SendError> {
self.try_send(message).map_err(SendError::from)
}

fn len(&self) -> usize {
self.len()
}

fn capacity(&self) -> Option<usize> {
self.capacity()
}
}

/// [`SenderTrait`] is also implemented for boxed version of itself, incluling M -> N conversion.
impl<M: Into<N>, N> SenderTrait<M> for Arc<dyn SenderTrait<N>> {
fn try_send(&self, message: M) -> Result<(), SendError> {
self.deref().try_send(message.into())
}

fn len(&self) -> usize {
self.deref().len()
}

fn capacity(&self) -> Option<usize> {
self.deref().capacity()
}
}

/// An address to an actor that can *only* handle lifecycle control.
#[derive(Clone)]
pub struct ControlAddr {
Expand Down

0 comments on commit c17a353

Please sign in to comment.