Skip to content

Commit

Permalink
Support creating differently-typed recipients in Addr::recipient()
Browse files Browse the repository at this point in the history
...within bounds that that M: Into<N> for Recipient<M> for Actor<Message = N>.

Fixes #11 (in its narrow sense) without creating multiple channels per actor
and without boxing the messages transferred.

Has been made possible by removal of the ability to retrieve back the
failed-to-send message in one of the earlier commits.

The whole trick is in defining a `Sender<M>` trait to abstract crossbeam::Sender,
implementing `<M: Into<N>> Sender<M>` for `crossbeam::Sender<N>`, and boxing
*that* implementation in Addr::recipient().

All generic parameters in the echo example have disappeared without losing any
flexibility, yay!

Speaking strict semver, this is an API-breaking change, but e.g. the
media_pipeline compiles and works unchanged.

There was a lot of design choices with subtle differences (like making
Recipient a trait), this change preferred ones that meant a smaller change.
  • Loading branch information
strohel committed Feb 25, 2021
1 parent b9231fd commit d4acd2e
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 45 deletions.
34 changes: 17 additions & 17 deletions examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ fn silence_chunk() -> Chunk {
}

/// Actor to read and decode input stream (stdin) and produce sound [`DryChunk`]s.
struct Input<M> {
next: Recipient<M>,
struct Input {
next: Recipient<DryChunk>,
}

impl<M: From<DryChunk>> Actor for Input<M> {
impl Actor for Input {
type Error = Error;
type Message = ReadNext;

Expand Down Expand Up @@ -147,15 +147,15 @@ impl From<WetChunk> for MixerInput {

/// Audio mixer actor. Mixes 2 inputs (dry, wet) together, provides 2 equal outputs.
/// Consumer either [`DryChunk`]s or [`WetChunk`]s and produces [`Chunk`]s.
struct Mixer<M1, M2> {
out_1: Recipient<M1>,
out_2: Recipient<M2>,
struct Mixer {
out_1: Recipient<Chunk>,
out_2: Recipient<Chunk>,
dry_buffer: Option<DryChunk>,
wet_buffer: Option<WetChunk>,
}

impl<M1, M2> Mixer<M1, M2> {
fn new(out_1: Recipient<M1>, out_2: Recipient<M2>) -> Self {
impl Mixer {
fn new(out_1: Recipient<Chunk>, out_2: Recipient<Chunk>) -> Self {
// Start with buffers filled, so that output is produced right for the first message.
Self {
out_1,
Expand All @@ -166,7 +166,7 @@ impl<M1, M2> Mixer<M1, M2> {
}
}

impl<M1: From<Chunk>, M2: From<Chunk>> Actor for Mixer<M1, M2> {
impl Actor for Mixer {
type Error = Error;
type Message = MixerInput;

Expand Down Expand Up @@ -203,20 +203,20 @@ impl<M1: From<Chunk>, M2: From<Chunk>> Actor for Mixer<M1, M2> {

/// Delay audio effect actor. Technically just a fixed circular buffer.
/// Consumes [`Chunk`]s and produces [`WetChunk`]s.
struct Delay<M> {
next: Recipient<M>,
struct Delay {
next: Recipient<WetChunk>,
buffer: Vec<Chunk>,
index: usize,
}

impl<M> Delay<M> {
fn new(next: Recipient<M>) -> Self {
impl Delay {
fn new(next: Recipient<WetChunk>) -> Self {
let buffer: Vec<Chunk> = repeat(silence_chunk()).take(DELAY_CHUNKS).collect();
Self { next, buffer, index: 0 }
}
}

impl<M: From<WetChunk>> Actor for Delay<M> {
impl Actor for Delay {
type Error = Error;
type Message = Chunk;

Expand All @@ -236,11 +236,11 @@ impl<M: From<WetChunk>> Actor for Delay<M> {
}

/// Audio damper actor. Attenuates audio level a bit. Consumes [`Chunk`]s and produces [`WetChunk`]s.
struct Damper<M> {
next: Recipient<M>,
struct Damper {
next: Recipient<WetChunk>,
}

impl<M: From<WetChunk>> Actor for Damper<M> {
impl Actor for Damper {
type Error = Error;
type Message = Chunk;

Expand Down
99 changes: 71 additions & 28 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
//! ```
//!
use crossbeam::channel::{self, select, Receiver, Sender};
use crossbeam::channel::{self, select, Receiver};
use log::*;
use parking_lot::{Mutex, RwLock};
use std::{fmt, ops::Deref, sync::Arc, thread, time::Duration};
Expand Down Expand Up @@ -571,7 +571,7 @@ pub enum ErroredResult {
}

pub struct Addr<A: Actor + ?Sized> {
recipient: Recipient<A::Message>,
recipient: StaticRecipient<A::Message>,
message_rx: Receiver<A::Message>,
control_rx: Receiver<Control>,
}
Expand All @@ -596,7 +596,7 @@ impl<A, M> Deref for Addr<A>
where
A: Actor<Message = M>,
{
type Target = Recipient<M>;
type Target = StaticRecipient<M>;

fn deref(&self) -> &Self::Target {
&self.recipient
Expand All @@ -609,51 +609,88 @@ impl<A: Actor> Addr<A> {
let (control_tx, control_rx) = channel::bounded(MAX_CHANNEL_BLOAT);

Self {
recipient: Recipient { actor_name: A::name().into(), message_tx, control_tx },
recipient: GenericRecipient { actor_name: A::name().into(), message_tx, control_tx },
message_rx,
control_rx,
}
}

/// "Genericize" an address to, rather than point to a specific actor,
/// be applicable to any actor that handles a given message-response type.
pub fn recipient(&self) -> Recipient<A::Message> {
self.recipient.clone()
///
/// Allows you to create recipient not only of `A::Message`, but of any `M: Into<A::Message>`.
pub fn recipient<M: Into<A::Message>>(&self) -> Recipient<M>
where
// https://stackoverflow.com/q/29740488/4345715, 'static required to create trait object
A::Message: 'static,
{
Recipient {
actor_name: A::name().into(),
message_tx: Arc::new(self.recipient.message_tx.clone()),
control_tx: self.recipient.control_tx.clone(),
}
}
}

/// Similar to `Addr`, but rather than pointing to a specific actor,
/// A variant of [`GenericRecipient`] that can send messages of type `M` to actors accepting
/// `N: From<M>`. Created by [`Addr::recipient()`].
pub type Recipient<M> = GenericRecipient<Arc<dyn Sender<M>>>;

/// A variant of [`GenericRecipient`] created by dereferencing [`Addr`].
///
/// `M` is fixed to the message type of the receiving actor, but it can perform conversion
/// from `N: Into<M>` in the [`GenericRecipient::send()`] family of methods.
/// See [`Addr::recipient()`] if you need more flexibility in `M`.
pub type StaticRecipient<M> = GenericRecipient<channel::Sender<M>>;

/// Similar to [`Addr`], but rather than pointing to a specific actor,
/// it is typed for any actor that handles a given message-response type.
pub struct Recipient<M> {
///
/// Comes in [`Recipient`] and [`StaticRecipient`] variants.
#[derive(Clone)]
pub struct GenericRecipient<S> {
actor_name: String,
message_tx: Sender<M>,
control_tx: Sender<Control>,
message_tx: S,
control_tx: channel::Sender<Control>,
}

// #[derive(Clone)] adds Clone bound to M, which is not necessary.
// https://github.com/rust-lang/rust/issues/26925
impl<M> Clone for Recipient<M> {
fn clone(&self) -> Self {
Self {
actor_name: self.actor_name.clone(),
message_tx: self.message_tx.clone(),
control_tx: self.control_tx.clone(),
}
/// Internal trait to generalize over crossbeam::channel::Sender<M>.
pub trait Sender<M>: Send + Sync {
fn try_send(&self, message: M) -> Result<(), SendError>;
}

/// [`Sender`] trait is implemented for concrete crossbeam sender.
impl<M: Into<N>, N: Send> Sender<M> for channel::Sender<N> {
fn try_send(&self, message: M) -> Result<(), SendError> {
self.try_send(message.into()).map_err(SendError::from)
}
}

/// [`Sender`] trait is also implemented for type-erased (dynamic) Arc-wrapped Sender.
impl<M> Sender<M> for Arc<dyn Sender<M>> {
fn try_send(&self, message: M) -> Result<(), SendError> {
self.deref().try_send(message)
}
}

impl<M> Recipient<M> {
impl<S> GenericRecipient<S> {
/// Non-blocking call to send a message. Use this if you need to react when
/// the channel is full.
pub fn try_send<N: Into<M>>(&self, message: N) -> Result<(), SendError> {
self.message_tx.try_send(message.into()).map_err(SendError::from)
pub fn try_send<M>(&self, message: M) -> Result<(), SendError>
where
S: Sender<M>,
{
self.message_tx.try_send(message).map_err(SendError::from)
}

/// Non-blocking call to send a message. Use this if there is nothing you can
/// do when the channel is full. The method still logs a warning for you in
/// that case.
pub fn send<N: Into<M>>(&self, message: N) -> Result<(), SendError> {
let result = self.try_send(message.into());
pub fn send<M>(&self, message: M) -> Result<(), SendError>
where
S: Sender<M>,
{
let result = self.try_send(message);
if let Err(SendError::Full) = &result {
trace!("[{}] dropped message (channel bloat)", self.actor_name);
return Ok(());
Expand All @@ -663,14 +700,20 @@ impl<M> Recipient<M> {

/// Non-blocking call to send a message. Use this if you do not care if
/// messages are being dropped.
pub fn send_quiet<N: Into<M>>(&self, message: N) {
let _ = self.try_send(message.into());
pub fn send_quiet<M>(&self, message: M)
where
S: Sender<M>,
{
let _ = self.try_send(message);
}

/// Non-blocking call to send a message. Use if you expect the channel to be
/// frequently full (slow consumer), but would still like to be notified if a
/// different error occurs (e.g. disconnection).
pub fn send_if_not_full<N: Into<M>>(&self, message: N) -> Result<(), SendError> {
pub fn send_if_not_full<M>(&self, message: M) -> Result<(), SendError>
where
S: Sender<M>,
{
self.try_send(message).or_else(|err| match err {
SendError::Full => Ok(()),
other => Err(other),
Expand All @@ -689,7 +732,7 @@ impl<M> Recipient<M> {
/// An address to an actor that can *only* handle lifecycle control.
#[derive(Clone)]
pub struct ControlAddr {
control_tx: Sender<Control>,
control_tx: channel::Sender<Control>,
}

impl ControlAddr {
Expand Down

0 comments on commit d4acd2e

Please sign in to comment.