Skip to content

Commit

Permalink
timed: fix #79 by enqueueing (rather than handling) delayed messages …
Browse files Browse the repository at this point in the history
…when due

Also rename `fire_at` to `enqueue_at` to be explicit about the fact.

This is a trade-off that prevents 2 sorts of bad behavior:
- actors with send-sending messages never handling any delayed/recurring messages (#72)
- actors with recurring messages that are slower to handle than their interval eventually not processing any outside messages (#79)

See the tweaked tests for the change of behavior.
  • Loading branch information
strohel committed Jan 12, 2024
1 parent e6fb7df commit 66fcd74
Showing 1 changed file with 61 additions and 64 deletions.
125 changes: 61 additions & 64 deletions src/timed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
//! To apply this to a given (receiving) actor:
//! * Use [`TimedContext<Self::Message>`] as [`Actor::Context`] associated type.
//! * Such actors cannot be spawned unless wrapped, making it impossible to forget wrapping it.
//! * Wrapped actor's `Error` must implement [`From<SendError>`].
//! * Wrap the actor in [`Timed`] before spawning.
//!
//! The wrapped actor will accept [`TimedMessage<M>`] with convenience conversion from `M`.
//! [`RecipientExt`] becomes available for [`Recipient<TimedMessage<M>>`]s which provides methods like
//! `send_delayed()`, `send_recurring()`.
//!
//! Once accepted by the actor, delayed and recurring messages do not occupy place in actor's
//! channel inbox, they are placed to internal queue instead. Due to the design, delayed and
//! recurring messages have always lower priority than instant messages when the actor is
//! saturated.
//! channel inbox, they are placed to internal queue instead. When delayed/recurring message become
//! due, they go through the actor's regular inboxes (subject to prioritization).
//!
//! See `delay_actor.rs` example for usage.
Expand All @@ -24,11 +24,11 @@ use std::{
time::{Duration, Instant},
};

/// A message that can be delivered now, at certain time and optionally repeatedly.
/// A message that can be enqueued now, at certain time and optionally repeatedly.
pub enum TimedMessage<M> {
Instant { message: M },
Delayed { message: M, fire_at: Instant },
Recurring { factory: Box<dyn FnMut() -> M + Send>, fire_at: Instant, interval: Duration },
Delayed { message: M, enqueue_at: Instant },
Recurring { factory: Box<dyn FnMut() -> M + Send>, enqueue_at: Instant, interval: Duration },
}

/// This implementation allows sending direct unwrapped messages to wrapped actors.
Expand All @@ -43,19 +43,19 @@ pub trait RecipientExt<M> {
/// Send a `message` now. Convenience to wrap message in [`TimedMessage::Instant`].
fn send_now(&self, message: M) -> Result<(), SendError>;

/// Send a `message` to be delivered later at a certain instant.
fn send_timed(&self, message: M, fire_at: Instant) -> Result<(), SendError>;
/// Send a `message` to be enqueued later at a certain instant.
fn send_timed(&self, message: M, enqueue_at: Instant) -> Result<(), SendError>;

/// Send a `message` to be delivered later after some time from now.
/// Send a `message` to be enqueued later after some time from now.
fn send_delayed(&self, message: M, delay: Duration) -> Result<(), SendError> {
self.send_timed(message, Instant::now() + delay)
}

/// Schedule sending of message at `fire_at` plus at regular `interval`s from that point on.
/// Schedule sending of message at `enqueue_at` plus at regular `interval`s from that point on.
fn send_recurring(
&self,
factory: impl FnMut() -> M + Send + 'static,
fire_at: Instant,
enqueue_at: Instant,
interval: Duration,
) -> Result<(), SendError>;
}
Expand All @@ -65,17 +65,17 @@ impl<M> RecipientExt<M> for Recipient<TimedMessage<M>> {
self.send(TimedMessage::Instant { message })
}

fn send_timed(&self, message: M, fire_at: Instant) -> Result<(), SendError> {
self.send(TimedMessage::Delayed { message, fire_at })
fn send_timed(&self, message: M, enqueue_at: Instant) -> Result<(), SendError> {
self.send(TimedMessage::Delayed { message, enqueue_at })
}

fn send_recurring(
&self,
factory: impl FnMut() -> M + Send + 'static,
fire_at: Instant,
enqueue_at: Instant,
interval: Duration,
) -> Result<(), SendError> {
self.send(TimedMessage::Recurring { factory: Box::new(factory), fire_at, interval })
self.send(TimedMessage::Recurring { factory: Box::new(factory), enqueue_at, interval })
}
}

Expand Down Expand Up @@ -111,50 +111,52 @@ pub struct Timed<A: Actor> {
queue: BinaryHeap<QueueItem<A::Message>>,
}

impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Timed<A> {
impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Timed<A>
where
<A as Actor>::Error: From<SendError>,
{
pub fn new(inner: A) -> Self {
Self { inner, queue: Default::default() }
}

/// Process any pending messages in the internal queue, calling wrapped actor's `handle()`.
fn process_queue(&mut self, context: &mut <Self as Actor>::Context) -> Result<(), A::Error> {
// Handle all messages that should have been handled by now.
let now = Instant::now();
while self.queue.peek().map(|m| m.fire_at <= now).unwrap_or(false) {
fn process_queue(&mut self, context: &mut <Self as Actor>::Context) -> Result<(), SendError> {
// If the message on top of the queue is due, send it to the regular actor queue.
// No problem if there are multiple such messages, it's handle() will call process_queue()
// again.
if self.queue.peek().map(|m| m.enqueue_at <= Instant::now()).unwrap_or(false) {
let item = self.queue.pop().expect("heap is non-empty, we have just peeked");

let message = match item.payload {
Payload::Delayed { message } => message,
Payload::Recurring { mut factory, interval } => {
let message = factory();
self.queue.push(QueueItem {
fire_at: item.fire_at + interval,
enqueue_at: item.enqueue_at + interval,
payload: Payload::Recurring { factory, interval },
});
message
},
};

// Let inner actor do its job.
//
// Alternatively, we could send an `Instant` message to ourselves.
// - The advantage would be that it would go into the queue with proper priority. But it
// is unclear what should be handled first: normal-priority message that should have
// been processed a while ago, or a high-priority message that was delivered now.
// - Disadvantage is we could easily overflow the queue if many messages fire at once.
self.inner.handle(&mut TimedContext::from_context(context), message)?;
// Enqueue an immediate message to process. Alternative would be to call inner handle(),
// but we don't want to effectively call child handle() twice in the parent handle().
context.myself.send_now(message)?;
}

Ok(())
}

fn schedule_timeout(&self, context: &mut <Self as Actor>::Context) {
// Schedule next timeout if the queue is not empty.
context.set_deadline(self.queue.peek().map(|earliest| earliest.fire_at));
context.set_deadline(self.queue.peek().map(|earliest| earliest.enqueue_at));
}
}

impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Actor for Timed<A> {
impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Actor for Timed<A>
where
<A as Actor>::Error: From<SendError>,
{
type Context = Context<Self::Message>;
type Error = A::Error;
type Message = TimedMessage<M>;
Expand All @@ -171,12 +173,14 @@ impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Actor
TimedMessage::Instant { message } => {
self.inner.handle(&mut TimedContext::from_context(context), message)?;
},
TimedMessage::Delayed { message, fire_at } => {
self.queue.push(QueueItem { fire_at, payload: Payload::Delayed { message } });
TimedMessage::Delayed { message, enqueue_at } => {
self.queue.push(QueueItem { enqueue_at, payload: Payload::Delayed { message } });
},
TimedMessage::Recurring { factory, fire_at, interval } => {
self.queue
.push(QueueItem { fire_at, payload: Payload::Recurring { factory, interval } });
TimedMessage::Recurring { factory, enqueue_at, interval } => {
self.queue.push(QueueItem {
enqueue_at,
payload: Payload::Recurring { factory, interval },
});
},
};

Expand All @@ -195,14 +199,11 @@ impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Actor

fn priority(message: &Self::Message) -> Priority {
match message {
// Use underlying message priority if we can reference it.
TimedMessage::Instant { message } | TimedMessage::Delayed { message, .. } => {
A::priority(message)
},
// Recurring message is only received once, the recurring instances go through the
// internal queue (and not actor's channel). Assign high priority to the request to
// set-up the recurrent sending.
TimedMessage::Recurring { .. } => Priority::High,
// Use underlying message priority for instant messages.
TimedMessage::Instant { message } => A::priority(message),
// Recurring and Delayed messages are only added to the queue when handled, and then go
// through actors priority inboxes again when actually enqueued.
TimedMessage::Recurring { .. } | TimedMessage::Delayed { .. } => Priority::High,
}
}

Expand Down Expand Up @@ -236,13 +237,13 @@ impl<A: Actor> Deref for Timed<A> {

/// Implementation detail, element of message queue ordered by time to fire at.
struct QueueItem<M> {
fire_at: Instant,
enqueue_at: Instant,
payload: Payload<M>,
}

impl<M> PartialEq for QueueItem<M> {
fn eq(&self, other: &Self) -> bool {
self.fire_at == other.fire_at
self.enqueue_at == other.enqueue_at
}
}

Expand All @@ -257,8 +258,8 @@ impl<M> PartialOrd for QueueItem<M> {

impl<M> Ord for QueueItem<M> {
fn cmp(&self, other: &Self) -> Ordering {
// Reverse because [BinaryHeap] is a *max* heap, but we want pop() to return lowest `fire_at`.
self.fire_at.cmp(&other.fire_at).reverse()
// Reverse because [BinaryHeap] is a *max* heap, but we want pop() to return lowest `enqueue_at`.
self.enqueue_at.cmp(&other.enqueue_at).reverse()
}
}

Expand All @@ -283,14 +284,14 @@ mod tests {

impl Actor for TimedTestActor {
type Context = TimedContext<Self::Message>;
type Error = ();
type Error = SendError;
type Message = usize;

fn name() -> &'static str {
"TimedTestActor"
}

fn handle(&mut self, context: &mut Self::Context, message: usize) -> Result<(), ()> {
fn handle(&mut self, context: &mut Self::Context, message: usize) -> Result<(), SendError> {
{
let mut guard = self.received.lock().unwrap();
guard.push(message);
Expand Down Expand Up @@ -338,14 +339,12 @@ mod tests {

// The timeline (order of messages received) is:
// at 0 ms: 1 (initial message, takes 100 ms to handle),
// at 100 ms: 2 (first recurring scheduled message, delivered 50 ms late),
// at 100 ms: 3 (first self-sent message, 100 ms to handle),
// at 200 ms: 2 (second recurring message, 50 ms late, in the same ^^^ handle() invocation)
// at 200 ms: 2 (first recurring scheduled message, delivered 150 ms late),
// at 200 ms: 3 (second self-sent message, 100 ms to handle)
// at 225 ms: (control message to shut down the actor sent)
// at 300 ms: 2 (third recurring message, 50 ms late, in the same ^^^ handle() invocation)
// at 300 ms: (control signal to shut down finally delivered to the actor)
assert_eq!(*received.lock().unwrap(), vec![1, 2, 3, 2, 3, 2]);
assert_eq!(*received.lock().unwrap(), vec![1, 3, 2, 3]);
}

/// Test that actors with recurring messages that take longer to handle than what the recurring
Expand All @@ -369,15 +368,13 @@ mod tests {
system.shutdown().unwrap();

// The timeline (order of messages received) is:
// at 0 ms: 2 (deadline_passed() handles the first recurring message, takes 100 ms)
// at 100 ms: 2 ten times (deadline_passed() gradually handles 10 recurring messages the should have
// fired by the time it started, takes 1 full second)
// at 150 ms: (message "4" enqueued from the main thread)
// at 275 ms: (control message to shut down the actor sent)
// at 1100 ms: (actor loop finally kicks in again, gets control message, shuts down)
//
// Notice the message "4" is never received even though the actor had 125 ms to handle it
// (more time than needed to handle one recurring message). That's issue #79.
assert_eq!(*received.lock().unwrap(), vec![2; 11]);
// at 0 ms: 2 (first recurring message, 100 ms to handle)
// at 100 ms: 2 (second recurring message, 90 ms late, 100 ms to handle)
// at 150 ms: (message "4" sent to the actor from the main thread)
// at 200 ms: 4 (actor wakes up, processes message 4 that was sent before the recurring one)
// at 200 ms: 2 (third recurring message, 180 ms late, 100 ms to handle)
// at 275 ms: (control message to shut down actor sent)
// at 300 ms: (control message to shut down received at highest priority)
assert_eq!(*received.lock().unwrap(), vec![2, 2, 4, 2]);
}
}

0 comments on commit 66fcd74

Please sign in to comment.