Skip to content

Commit

Permalink
Merge pull request #1284 from plebhash/sniffer_wait_for_message_type
Browse files Browse the repository at this point in the history
`Sniffer::wait_for_message_type`
  • Loading branch information
plebhash authored Dec 11, 2024
2 parents 5bcd54d + 0f02d70 commit 6b105cf
Showing 1 changed file with 51 additions and 1 deletion.
52 changes: 51 additions & 1 deletion roles/tests-integration/tests/common/sniffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{collections::VecDeque, convert::TryInto, net::SocketAddr, sync::Arc};
use tokio::{
net::{TcpListener, TcpStream},
select,
time::sleep,
time::{sleep, Duration},
};
type MessageFrame = StandardEitherFrame<AnyMessage<'static>>;
type MsgType = u8;
Expand Down Expand Up @@ -418,6 +418,40 @@ impl Sniffer {
panic!("Impossible to accept dowsntream connection")
}
}

/// used to block the test runtime
/// while we wait until Sniffer has received a message of some specific type
pub async fn wait_for_message_type(
&self,
message_direction: MessageDirection,
message_type: u8,
) {
let now = std::time::Instant::now();
loop {
let has_message_type = match message_direction {
MessageDirection::ToDownstream => {
self.messages_from_upstream.has_message_type(message_type)
}
MessageDirection::ToUpstream => {
self.messages_from_downstream.has_message_type(message_type)
}
};

// ready to unblock test runtime
if has_message_type {
return;
}

// 10 min timeout
// only for worst case, ideally should never be triggered
if now.elapsed().as_secs() > 10 * 60 {
panic!("Timeout waiting for message type");
}

// sleep to reduce async lock contention
sleep(Duration::from_secs(1)).await;
}
}
}

// Utility macro to assert that the downstream and upstream roles have sent specific messages.
Expand Down Expand Up @@ -589,6 +623,22 @@ impl MessagesAggregator {
.unwrap()
}

// returns true if contains message_type
fn has_message_type(&self, message_type: u8) -> bool {
let has_message: bool = self
.messages
.safe_lock(|messages| {
for (t, _) in messages.iter() {
if *t == message_type {
return true; // Exit early with `true`
}
}
false // Default value if no match is found
})
.unwrap();
has_message
}

// The aggregator queues messages in FIFO order, so this function returns the oldest message in
// the queue.
//
Expand Down

0 comments on commit 6b105cf

Please sign in to comment.