Skip to content

Commit

Permalink
Add test for issue #79
Browse files Browse the repository at this point in the history
Not currently failing, just demonstrates the unfortunate behavior.
  • Loading branch information
strohel committed Jan 12, 2024
1 parent 60c123d commit e6fb7df
Showing 1 changed file with 53 additions and 13 deletions.
66 changes: 53 additions & 13 deletions src/timed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ mod tests {
};

struct TimedTestActor {
recurring_message_sleep: Duration,
received: Arc<Mutex<Vec<usize>>>,
}

Expand All @@ -301,18 +302,12 @@ mod tests {
context.myself.send_now(3).unwrap();
}

Ok(())
}
// Message 2 is a recurring one, sleep based on a paremeter.
if message == 2 {
thread::sleep(self.recurring_message_sleep);
}

fn started(&mut self, context: &mut Self::Context) {
context
.myself
.send_recurring(
|| 2,
Instant::now() + Duration::from_millis(50),
Duration::from_millis(100),
)
.unwrap()
Ok(())
}
}

Expand All @@ -323,8 +318,20 @@ mod tests {
let received = Arc::new(Mutex::new(Vec::new()));

let mut system = System::new("timed test");
let address =
system.spawn(Timed::new(TimedTestActor { received: Arc::clone(&received) })).unwrap();
let address = system
.spawn(Timed::new(TimedTestActor {
recurring_message_sleep: Duration::ZERO,
received: Arc::clone(&received),
}))
.unwrap();
address
.send_recurring(
|| 2,
Instant::now() + Duration::from_millis(50),
Duration::from_millis(100),
)
.unwrap();

address.send_now(1).unwrap();
thread::sleep(Duration::from_millis(225));
system.shutdown().unwrap();
Expand All @@ -340,4 +347,37 @@ mod tests {
// at 300 ms: (control signal to shut down finally delivered to the actor)
assert_eq!(*received.lock().unwrap(), vec![1, 2, 3, 2, 3, 2]);
}

/// Test that actors with recurring messages that take longer to handle than what the recurring
/// delay is still get other and control messages.
#[test]
fn recurring_messages_handled_slower_than_generated() {
let received = Arc::new(Mutex::new(Vec::new()));

let mut system = System::new("timed test");
let address = system
.spawn(Timed::new(TimedTestActor {
recurring_message_sleep: Duration::from_millis(100),
received: Arc::clone(&received),
}))
.unwrap();
address.send_recurring(|| 2, Instant::now(), Duration::from_millis(10)).unwrap();

thread::sleep(Duration::from_millis(150));
address.send_now(4).unwrap();
thread::sleep(Duration::from_millis(125));
system.shutdown().unwrap();

// The timeline (order of messages received) is:
// at 0 ms: 2 (deadline_passed() handles the first recurring message, takes 100 ms)
// at 100 ms: 2 ten times (deadline_passed() gradually handles 10 recurring messages the should have
// fired by the time it started, takes 1 full second)
// at 150 ms: (message "4" enqueued from the main thread)
// at 275 ms: (control message to shut down the actor sent)
// at 1100 ms: (actor loop finally kicks in again, gets control message, shuts down)
//
// Notice the message "4" is never received even though the actor had 125 ms to handle it
// (more time than needed to handle one recurring message). That's issue #79.
assert_eq!(*received.lock().unwrap(), vec![2; 11]);
}
}

0 comments on commit e6fb7df

Please sign in to comment.