Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timed infinite loop fix #80

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 121 additions & 71 deletions src/timed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
//! To apply this to a given (receiving) actor:
//! * Use [`TimedContext<Self::Message>`] as [`Actor::Context`] associated type.
//! * Such actors cannot be spawned unless wrapped, making it impossible to forget wrapping it.
//! * Wrapped actor's `Error` must implement [`From<SendError>`].
//! * Wrap the actor in [`Timed`] before spawning.
//!
//! The wrapped actor will accept [`TimedMessage<M>`] with convenience conversion from `M`.
//! [`RecipientExt`] becomes available for [`Recipient<TimedMessage<M>>`]s which provides methods like
//! `send_delayed()`, `send_recurring()`.
//!
//! Once accepted by the actor, delayed and recurring messages do not occupy place in actor's
//! channel inbox, they are placed to internal queue instead. Due to the design, delayed and
//! recurring messages have always lower priority than instant messages when the actor is
//! saturated.
//! channel inbox, they are placed to internal queue instead. When delayed/recurring message become
//! due, they go through the actor's regular inboxes (subject to prioritization).
//!
//! See `delay_actor.rs` example for usage.

Expand All @@ -24,11 +24,16 @@ use std::{
time::{Duration, Instant},
};

/// A message that can be delivered now, at certain time and optionally repeatedly.
/// A message that can be enqueued now, at certain time and optionally repeatedly.
pub enum TimedMessage<M> {
/// Instant message `handle()`d by the wrapped actor right away.
Instant { message: M },
Delayed { message: M, fire_at: Instant },
Recurring { factory: Box<dyn FnMut() -> M + Send>, fire_at: Instant, interval: Duration },
/// Request to setup a delayed message. Goes to internal [`Timed`] wrapper queue and then gets
/// sent to ourselves as an `Instant` message at the specified time.
Delayed { message: M, enqueue_at: Instant },
/// Request to setup a recurring message. Goes to internal [`Timed`] wrapper queue and then gets
/// sent to ourselves as an `Instant` message regularly at the specified pace.
Recurring { factory: Box<dyn FnMut() -> M + Send>, enqueue_at: Instant, interval: Duration },
}

/// This implementation allows sending direct unwrapped messages to wrapped actors.
Expand All @@ -43,19 +48,19 @@ pub trait RecipientExt<M> {
/// Send a `message` now. Convenience to wrap message in [`TimedMessage::Instant`].
fn send_now(&self, message: M) -> Result<(), SendError>;

/// Send a `message` to be delivered later at a certain instant.
fn send_timed(&self, message: M, fire_at: Instant) -> Result<(), SendError>;
/// Send a `message` to be enqueued later at a certain instant.
fn send_timed(&self, message: M, enqueue_at: Instant) -> Result<(), SendError>;

/// Send a `message` to be delivered later after some time from now.
/// Send a `message` to be enqueued later after some time from now.
fn send_delayed(&self, message: M, delay: Duration) -> Result<(), SendError> {
self.send_timed(message, Instant::now() + delay)
}

/// Schedule sending of message at `fire_at` plus at regular `interval`s from that point on.
/// Schedule sending of message at `enqueue_at` plus at regular `interval`s from that point on.
fn send_recurring(
&self,
factory: impl FnMut() -> M + Send + 'static,
fire_at: Instant,
enqueue_at: Instant,
interval: Duration,
) -> Result<(), SendError>;
}
Expand All @@ -65,17 +70,17 @@ impl<M> RecipientExt<M> for Recipient<TimedMessage<M>> {
self.send(TimedMessage::Instant { message })
}

fn send_timed(&self, message: M, fire_at: Instant) -> Result<(), SendError> {
self.send(TimedMessage::Delayed { message, fire_at })
fn send_timed(&self, message: M, enqueue_at: Instant) -> Result<(), SendError> {
self.send(TimedMessage::Delayed { message, enqueue_at })
}

fn send_recurring(
&self,
factory: impl FnMut() -> M + Send + 'static,
fire_at: Instant,
enqueue_at: Instant,
interval: Duration,
) -> Result<(), SendError> {
self.send(TimedMessage::Recurring { factory: Box::new(factory), fire_at, interval })
self.send(TimedMessage::Recurring { factory: Box::new(factory), enqueue_at, interval })
}
}

Expand Down Expand Up @@ -111,50 +116,53 @@ pub struct Timed<A: Actor> {
queue: BinaryHeap<QueueItem<A::Message>>,
}

impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Timed<A> {
impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Timed<A>
where
<A as Actor>::Error: From<SendError>,
{
pub fn new(inner: A) -> Self {
Self { inner, queue: Default::default() }
}

/// Process any pending messages in the internal queue, calling wrapped actor's `handle()`.
fn process_queue(&mut self, context: &mut <Self as Actor>::Context) -> Result<(), A::Error> {
// Handle all messages that should have been handled by now.
let now = Instant::now();
while self.queue.peek().map(|m| m.fire_at <= now).unwrap_or(false) {
fn process_queue(&mut self, context: &mut <Self as Actor>::Context) -> Result<(), SendError> {
// If the message on top of the queue is due, send it to ourselves as `Instant` to enqueue
// it in the regular actor queue.
// No problem if there are multiple such messages, the next Timed::handle() will call
// process_queue() again.
if self.queue.peek().map(|m| m.enqueue_at <= Instant::now()).unwrap_or(false) {
let item = self.queue.pop().expect("heap is non-empty, we have just peeked");

let message = match item.payload {
Payload::Delayed { message } => message,
Payload::Recurring { mut factory, interval } => {
let message = factory();
self.queue.push(QueueItem {
fire_at: item.fire_at + interval,
enqueue_at: item.enqueue_at + interval,
payload: Payload::Recurring { factory, interval },
});
message
},
};

// Let inner actor do its job.
//
// Alternatively, we could send an `Instant` message to ourselves.
// - The advantage would be that it would go into the queue with proper priority. But it
// is unclear what should be handled first: normal-priority message that should have
// been processed a while ago, or a high-priority message that was delivered now.
// - Disadvantage is we could easily overflow the queue if many messages fire at once.
self.inner.handle(&mut TimedContext::from_context(context), message)?;
// Enqueue an immediate message to process. Alternative would be to call inner handle(),
// but we don't want to effectively call child handle() twice in the parent handle().
context.myself.send_now(message)?;
}

Ok(())
}

fn schedule_timeout(&self, context: &mut <Self as Actor>::Context) {
// Schedule next timeout if the queue is not empty.
context.set_deadline(self.queue.peek().map(|earliest| earliest.fire_at));
context.set_deadline(self.queue.peek().map(|earliest| earliest.enqueue_at));
}
}

impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Actor for Timed<A> {
impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Actor for Timed<A>
where
<A as Actor>::Error: From<SendError>,
{
type Context = Context<Self::Message>;
type Error = A::Error;
type Message = TimedMessage<M>;
Expand All @@ -171,12 +179,14 @@ impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Actor
TimedMessage::Instant { message } => {
self.inner.handle(&mut TimedContext::from_context(context), message)?;
},
TimedMessage::Delayed { message, fire_at } => {
self.queue.push(QueueItem { fire_at, payload: Payload::Delayed { message } });
TimedMessage::Delayed { message, enqueue_at } => {
self.queue.push(QueueItem { enqueue_at, payload: Payload::Delayed { message } });
},
TimedMessage::Recurring { factory, fire_at, interval } => {
self.queue
.push(QueueItem { fire_at, payload: Payload::Recurring { factory, interval } });
TimedMessage::Recurring { factory, enqueue_at, interval } => {
self.queue.push(QueueItem {
enqueue_at,
payload: Payload::Recurring { factory, interval },
});
},
};

Expand All @@ -195,14 +205,13 @@ impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Actor

fn priority(message: &Self::Message) -> Priority {
match message {
// Use underlying message priority if we can reference it.
TimedMessage::Instant { message } | TimedMessage::Delayed { message, .. } => {
A::priority(message)
},
// Recurring message is only received once, the recurring instances go through the
// internal queue (and not actor's channel). Assign high priority to the request to
// set-up the recurrent sending.
TimedMessage::Recurring { .. } => Priority::High,
// Use underlying message priority for instant messages.
TimedMessage::Instant { message } => A::priority(message),
// These priorities apply to the *set-up* of Delayed and Recurring messages and we
// want to handle that pronto.
// The resulting inner message then comes back as `Instant` and is prioritized per its
// underlying priority.
TimedMessage::Recurring { .. } | TimedMessage::Delayed { .. } => Priority::High,
}
}

Expand Down Expand Up @@ -236,13 +245,13 @@ impl<A: Actor> Deref for Timed<A> {

/// Implementation detail, element of message queue ordered by time to fire at.
struct QueueItem<M> {
fire_at: Instant,
enqueue_at: Instant,
payload: Payload<M>,
}

impl<M> PartialEq for QueueItem<M> {
fn eq(&self, other: &Self) -> bool {
self.fire_at == other.fire_at
self.enqueue_at == other.enqueue_at
}
}

Expand All @@ -257,8 +266,8 @@ impl<M> PartialOrd for QueueItem<M> {

impl<M> Ord for QueueItem<M> {
fn cmp(&self, other: &Self) -> Ordering {
// Reverse because [BinaryHeap] is a *max* heap, but we want pop() to return lowest `fire_at`.
self.fire_at.cmp(&other.fire_at).reverse()
// Reverse because [BinaryHeap] is a *max* heap, but we want pop() to return lowest `enqueue_at`.
self.enqueue_at.cmp(&other.enqueue_at).reverse()
}
}

Expand All @@ -277,19 +286,20 @@ mod tests {
};

struct TimedTestActor {
recurring_message_sleep: Duration,
received: Arc<Mutex<Vec<usize>>>,
}

impl Actor for TimedTestActor {
type Context = TimedContext<Self::Message>;
type Error = ();
type Error = SendError;
type Message = usize;

fn name() -> &'static str {
"TimedTestActor"
}

fn handle(&mut self, context: &mut Self::Context, message: usize) -> Result<(), ()> {
fn handle(&mut self, context: &mut Self::Context, message: usize) -> Result<(), SendError> {
{
let mut guard = self.received.lock().unwrap();
guard.push(message);
Expand All @@ -301,38 +311,78 @@ mod tests {
context.myself.send_now(3).unwrap();
}

Ok(())
}
// Message 2 is a recurring one, sleep based on a parameter.
if message == 2 {
thread::sleep(self.recurring_message_sleep);
}

fn started(&mut self, context: &mut Self::Context) {
context
.myself
.send_recurring(
|| 2,
Instant::now() + Duration::from_millis(50),
Duration::from_millis(100),
)
.unwrap()
Ok(())
}
}

/// Tests that recurring messages still get in for actors that have one "tick" message type that
/// does `block_for_some_time(); myself.send_now(Tick);` in its handle().
#[test]
fn recurring_messages_for_busy_actors() {
fn recurring_messages_for_self_looping_actors() {
let received = Arc::new(Mutex::new(Vec::new()));

let mut system = System::new("timed test");
let address =
system.spawn(Timed::new(TimedTestActor { received: Arc::clone(&received) })).unwrap();
let address = system
.spawn(Timed::new(TimedTestActor {
recurring_message_sleep: Duration::ZERO,
received: Arc::clone(&received),
}))
.unwrap();
address
.send_recurring(
|| 2,
Instant::now() + Duration::from_millis(50),
Duration::from_millis(100),
)
.unwrap();

address.send_now(1).unwrap();
thread::sleep(Duration::from_millis(225));
system.shutdown().unwrap();

// The order of messages should be:
// 1 (initial message),
// 2 (first recurring scheduled message),
// 3 (first self-sent message),
// 2 (second recurring message)
// 3 (second self-sent message)
assert_eq!(*received.lock().unwrap(), vec![1, 2, 3, 2, 3]);
// The timeline (order of messages received) is:
// at 0 ms: 1 (initial message, takes 100 ms to handle),
// at 100 ms: 3 (first self-sent message, 100 ms to handle),
// at 200 ms: 2 (first recurring scheduled message, delivered 150 ms late),
// at 200 ms: 3 (second self-sent message, 100 ms to handle)
// at 225 ms: (control message to shut down the actor sent)
// at 300 ms: (control signal to shut down finally delivered to the actor)
assert_eq!(*received.lock().unwrap(), vec![1, 3, 2, 3]);
}

/// Test that actors with recurring messages that take longer to handle than what the recurring
/// delay is still get other and control messages.
#[test]
fn recurring_messages_handled_slower_than_generated() {
let received = Arc::new(Mutex::new(Vec::new()));

let mut system = System::new("timed test");
let address = system
.spawn(Timed::new(TimedTestActor {
recurring_message_sleep: Duration::from_millis(100),
received: Arc::clone(&received),
}))
.unwrap();
address.send_recurring(|| 2, Instant::now(), Duration::from_millis(10)).unwrap();

thread::sleep(Duration::from_millis(150));
address.send_now(4).unwrap();
thread::sleep(Duration::from_millis(125));
system.shutdown().unwrap();

// The timeline (order of messages received) is:
// at 0 ms: 2 (first recurring message, 100 ms to handle)
// at 100 ms: 2 (second recurring message, 90 ms late, 100 ms to handle)
// at 150 ms: (message "4" sent to the actor from the main thread)
// at 200 ms: 4 (actor wakes up, processes message 4 that was sent before the recurring one)
// at 200 ms: 2 (third recurring message, 180 ms late, 100 ms to handle)
// at 275 ms: (control message to shut down actor sent)
// at 300 ms: (control message to shut down received at highest priority)
assert_eq!(*received.lock().unwrap(), vec![2, 2, 4, 2]);
}
}
Loading