From 1418240ba3030ead1a125f9bde1a675eb464a9c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mat=C4=9Bj=20Laitl?= Date: Wed, 24 Feb 2021 22:09:45 +0100 Subject: [PATCH] Support creating differently-typed recipients in Addr::recipient() ...within bounds that that M: Into for Recipient for Actor. Fixes #11 (in its narrow sense) without creating multiple channels per actor and without boxing the messages transferred. Has been made possible by removal of the ability to retrieve back the failed-to-send message in one of the earlier commits. The whole trick is to box crossbeam `Sender` into `Arc>` in `Receiver`, and implementing `SenderTrait` for crossbeam `Sender` and for boxed version of itself (!), second time with ability to convert `M`. All generic parameters in the echo example have disappeared without losing any flexibility, yay! Speaking strict semver, this is an API-breaking change, but e.g. the media_pipeline compiles and works unchanged. v2: make the change smaller and simpler by not messing up with `GenericReceiver`. Enables nicer API and keeping `SenderTrait` private, at the small expense of one extra boxing when creating `Addr`, and one extra pointer indirection when calling send() family of functions. All that thanks to @skywhale's clever question. --- examples/echo.rs | 34 +++++++++++----------- src/lib.rs | 75 +++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 79 insertions(+), 30 deletions(-) diff --git a/examples/echo.rs b/examples/echo.rs index c064675..53d08c5 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -71,11 +71,11 @@ fn silence_chunk() -> Chunk { } /// Actor to read and decode input stream (stdin) and produce sound [`DryChunk`]s. -struct Input { - next: Recipient, +struct Input { + next: Recipient, } -impl> Actor for Input { +impl Actor for Input { type Error = Error; type Message = ReadNext; @@ -147,15 +147,15 @@ impl From for MixerInput { /// Audio mixer actor. Mixes 2 inputs (dry, wet) together, provides 2 equal outputs. /// Consumer either [`DryChunk`]s or [`WetChunk`]s and produces [`Chunk`]s. -struct Mixer { - out_1: Recipient, - out_2: Recipient, +struct Mixer { + out_1: Recipient, + out_2: Recipient, dry_buffer: Option, wet_buffer: Option, } -impl Mixer { - fn new(out_1: Recipient, out_2: Recipient) -> Self { +impl Mixer { + fn new(out_1: Recipient, out_2: Recipient) -> Self { // Start with buffers filled, so that output is produced right for the first message. Self { out_1, @@ -166,7 +166,7 @@ impl Mixer { } } -impl, M2: From> Actor for Mixer { +impl Actor for Mixer { type Error = Error; type Message = MixerInput; @@ -203,20 +203,20 @@ impl, M2: From> Actor for Mixer { /// Delay audio effect actor. Technically just a fixed circular buffer. /// Consumes [`Chunk`]s and produces [`WetChunk`]s. -struct Delay { - next: Recipient, +struct Delay { + next: Recipient, buffer: Vec, index: usize, } -impl Delay { - fn new(next: Recipient) -> Self { +impl Delay { + fn new(next: Recipient) -> Self { let buffer: Vec = repeat(silence_chunk()).take(DELAY_CHUNKS).collect(); Self { next, buffer, index: 0 } } } -impl> Actor for Delay { +impl Actor for Delay { type Error = Error; type Message = Chunk; @@ -236,11 +236,11 @@ impl> Actor for Delay { } /// Audio damper actor. Attenuates audio level a bit. Consumes [`Chunk`]s and produces [`WetChunk`]s. -struct Damper { - next: Recipient, +struct Damper { + next: Recipient, } -impl> Actor for Damper { +impl Actor for Damper { type Error = Error; type Message = Chunk; diff --git a/src/lib.rs b/src/lib.rs index 269fe83..823a0a3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -524,7 +524,8 @@ pub enum Control { /// The base actor trait. pub trait Actor { /// The expected type of a message to be received. - type Message: Send; + // 'static required to create trait object in Addr, https://stackoverflow.com/q/29740488/4345715 + type Message: Send + 'static; /// The type to return on error in the handle method. type Error: std::fmt::Debug; @@ -583,6 +584,7 @@ impl Addr { let (message_tx, message_rx) = channel::bounded::(capacity); let (control_tx, control_rx) = channel::bounded(MAX_CHANNEL_BLOAT); + let message_tx = Arc::new(message_tx); Self { recipient: Recipient { actor_name: A::name().into(), message_tx, control_tx }, message_rx, @@ -592,16 +594,23 @@ impl Addr { /// "Genericize" an address to, rather than point to a specific actor, /// be applicable to any actor that handles a given message-response type. - pub fn recipient(&self) -> Recipient { - self.recipient.clone() + /// + /// Allows you to create recipient not only of `A::Message`, but of any `M: Into`. + pub fn recipient>(&self) -> Recipient { + Recipient { + actor_name: A::name().into(), + // Each level of boxing adds one .into() call, so box here to convert A::Message to M. + message_tx: Arc::new(self.recipient.message_tx.clone()), + control_tx: self.recipient.control_tx.clone(), + } } } -/// Similar to `Addr`, but rather than pointing to a specific actor, +/// Similar to [`Addr`], but rather than pointing to a specific actor, /// it is typed for any actor that handles a given message-response type. pub struct Recipient { actor_name: String, - message_tx: Sender, + message_tx: Arc>, control_tx: Sender, } @@ -620,15 +629,15 @@ impl Clone for Recipient { impl Recipient { /// Non-blocking call to send a message. Use this if you need to react when /// the channel is full. - pub fn try_send>(&self, message: N) -> Result<(), SendError> { - self.message_tx.try_send(message.into()).map_err(SendError::from) + pub fn try_send(&self, message: M) -> Result<(), SendError> { + self.message_tx.try_send(message).map_err(SendError::from) } /// Non-blocking call to send a message. Use this if there is nothing you can /// do when the channel is full. The method still logs a warning for you in /// that case. - pub fn send>(&self, message: N) -> Result<(), SendError> { - let result = self.try_send(message.into()); + pub fn send(&self, message: M) -> Result<(), SendError> { + let result = self.try_send(message); if let Err(SendError::Full) = &result { trace!("[{}] dropped message (channel bloat)", self.actor_name); return Ok(()); @@ -638,14 +647,14 @@ impl Recipient { /// Non-blocking call to send a message. Use this if you do not care if /// messages are being dropped. - pub fn send_quiet>(&self, message: N) { - let _ = self.try_send(message.into()); + pub fn send_quiet(&self, message: M) { + let _ = self.try_send(message); } /// Non-blocking call to send a message. Use if you expect the channel to be /// frequently full (slow consumer), but would still like to be notified if a /// different error occurs (e.g. disconnection). - pub fn send_if_not_full>(&self, message: N) -> Result<(), SendError> { + pub fn send_if_not_full(&self, message: M) -> Result<(), SendError> { if self.remaining_capacity().unwrap_or(usize::max_value()) > 1 { return self.send(message); } @@ -659,7 +668,8 @@ impl Recipient { // TODO(ryo): Properly support the concept of priority channels. pub fn remaining_capacity(&self) -> Option { - self.message_tx.capacity().map(|capacity| capacity - self.message_tx.len()) + let message_tx = &self.message_tx as &dyn SenderTrait; + message_tx.capacity().map(|capacity| capacity - message_tx.len()) } pub fn control_addr(&self) -> ControlAddr { @@ -667,6 +677,45 @@ impl Recipient { } } +/// Internal trait to generalize over [`Sender`]. +trait SenderTrait: Send + Sync { + fn try_send(&self, message: M) -> Result<(), SendError>; + + fn len(&self) -> usize; + + fn capacity(&self) -> Option; +} + +/// [`SenderTrait`] is implemented for concrete crossbeam [`Sender`]. +impl SenderTrait for Sender { + fn try_send(&self, message: M) -> Result<(), SendError> { + self.try_send(message).map_err(SendError::from) + } + + fn len(&self) -> usize { + self.len() + } + + fn capacity(&self) -> Option { + self.capacity() + } +} + +/// [`SenderTrait`] is also implemented for boxed version of itself, incluling M -> N conversion. +impl, N> SenderTrait for Arc> { + fn try_send(&self, message: M) -> Result<(), SendError> { + self.deref().try_send(message.into()) + } + + fn len(&self) -> usize { + self.deref().len() + } + + fn capacity(&self) -> Option { + self.deref().capacity() + } +} + /// An address to an actor that can *only* handle lifecycle control. #[derive(Clone)] pub struct ControlAddr {