Skip to content

Commit

Permalink
test: add more tests case for SendingStream
Browse files Browse the repository at this point in the history
  • Loading branch information
huster-zhangpeng committed Dec 30, 2024
1 parent 9f9a73a commit bdda4f6
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 24 deletions.
2 changes: 1 addition & 1 deletion qbase/src/sid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl StreamId {
/// It is prohibited to directly create a StreamId from external sources.
/// StreamId can only be allocated incrementally by proactively creating new streams locally.
/// or accepting new streams opened by peer.
fn new(role: Role, dir: Dir, id: u64) -> Self {
pub fn new(role: Role, dir: Dir, id: u64) -> Self {
assert!(id <= MAX_STREAMS_LIMIT);
Self((((id << 1) | (dir as u64)) << 1) | (role as u64))
}
Expand Down
190 changes: 173 additions & 17 deletions qrecovery/src/send/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub struct ReadySender<TX> {
sndbuf: SendBuf,
flush_waker: Option<Waker>,
shutdown_waker: Option<Waker>,
reset_frame_tx: TX,
broker: TX,
writable_waker: Option<Waker>,
max_stream_data: u64,
}
Expand All @@ -41,20 +41,20 @@ where
VarInt::from_u64(err_code).expect("app error code must not exceed 2^62"),
VarInt::from_u64(final_size).expect("final size must not exceed 2^62"),
);
self.reset_frame_tx
self.broker
.send_frame([reset_stream_err.combine(self.stream_id)]);
reset_stream_err
}
}

impl<TX> ReadySender<TX> {
pub(super) fn new(stream_id: StreamId, buf_size: u64, reset_frame_tx: TX) -> ReadySender<TX> {
pub(super) fn new(stream_id: StreamId, buf_size: u64, broker: TX) -> ReadySender<TX> {
ReadySender {
stream_id,
sndbuf: SendBuf::with_capacity(buf_size as usize),
flush_waker: None,
shutdown_waker: None,
reset_frame_tx,
broker,
writable_waker: None,
max_stream_data: buf_size,
}
Expand Down Expand Up @@ -133,7 +133,7 @@ impl<TX: Clone> From<&mut ReadySender<TX>> for SendingSender<TX> {
sndbuf: std::mem::take(&mut value.sndbuf),
flush_waker: value.flush_waker.take(),
shutdown_waker: value.shutdown_waker.take(),
reset_frame_tx: value.reset_frame_tx.clone(),
broker: value.broker.clone(),
writable_waker: value.writable_waker.take(),
max_stream_data: value.max_stream_data,
}
Expand All @@ -148,7 +148,7 @@ impl<TX: Clone> From<&mut ReadySender<TX>> for DataSentSender<TX> {
sndbuf: std::mem::take(&mut value.sndbuf),
flush_waker: value.flush_waker.take(),
shutdown_waker: value.shutdown_waker.take(),
reset_frame_tx: value.reset_frame_tx.clone(),
broker: value.broker.clone(),
fin_state: FinState::None,
}
}
Expand All @@ -160,7 +160,7 @@ pub struct SendingSender<TX> {
sndbuf: SendBuf,
flush_waker: Option<Waker>,
shutdown_waker: Option<Waker>,
reset_frame_tx: TX,
broker: TX,
writable_waker: Option<Waker>,
max_stream_data: u64,
}
Expand All @@ -177,7 +177,7 @@ where
VarInt::from_u64(err_code).expect("app error code must not exceed 2^62"),
VarInt::from_u64(final_size).expect("final size must not exceed 2^62"),
);
self.reset_frame_tx
self.broker
.send_frame([reset_stream_err.combine(self.stream_id)]);
reset_stream_err
}
Expand Down Expand Up @@ -276,7 +276,7 @@ impl<TX: Clone> From<&mut SendingSender<TX>> for DataSentSender<TX> {
sndbuf: std::mem::take(&mut value.sndbuf),
flush_waker: value.flush_waker.take(),
shutdown_waker: value.shutdown_waker.take(),
reset_frame_tx: value.reset_frame_tx.clone(),
broker: value.broker.clone(),
fin_state: FinState::None,
}
}
Expand All @@ -297,7 +297,7 @@ pub struct DataSentSender<TX> {
sndbuf: SendBuf,
flush_waker: Option<Waker>,
shutdown_waker: Option<Waker>,
reset_frame_tx: TX,
broker: TX,
fin_state: FinState,
}

Expand All @@ -311,7 +311,7 @@ where
VarInt::from_u64(err_code).expect("app error code must not exceed 2^62"),
VarInt::from_u64(final_size).expect("final size must not exceed 2^62"),
);
self.reset_frame_tx
self.broker
.send_frame([reset_stream_err.combine(self.stream_id)]);
reset_stream_err
}
Expand Down Expand Up @@ -411,8 +411,8 @@ pub(super) enum Sender<TX> {
}

impl<TX> Sender<TX> {
pub fn new(stream_id: StreamId, buf_size: u64, reset_frame_tx: TX) -> Self {
Sender::Ready(ReadySender::new(stream_id, buf_size, reset_frame_tx))
pub fn new(stream_id: StreamId, buf_size: u64, broker: TX) -> Self {
Sender::Ready(ReadySender::new(stream_id, buf_size, broker))
}
}

Expand All @@ -431,11 +431,9 @@ pub struct ArcSender<TX>(Arc<Mutex<Result<Sender<TX>, Error>>>);

impl<TX> ArcSender<TX> {
#[doc(hidden)]
pub(crate) fn new(stream_id: StreamId, buf_size: u64, reset_frame_tx: TX) -> Self {
pub(crate) fn new(stream_id: StreamId, buf_size: u64, broker: TX) -> Self {
ArcSender(Arc::new(Mutex::new(Ok(Sender::new(
stream_id,
buf_size,
reset_frame_tx,
stream_id, buf_size, broker,
)))))
}
}
Expand All @@ -452,3 +450,161 @@ impl<TX> ArcSender<TX> {
self.0.lock().unwrap()
}
}

#[cfg(test)]
mod tests {
use qbase::sid::{Dir, Role};

use super::*;

#[derive(Debug, Default, Clone)]
struct MockBroker(Arc<Mutex<Vec<ResetStreamFrame>>>);

impl SendFrame<ResetStreamFrame> for MockBroker {
fn send_frame<I: IntoIterator<Item = ResetStreamFrame>>(&self, iter: I) {
self.0.lock().unwrap().extend(iter);
}
}

fn create_test_sender() -> ArcSender<MockBroker> {
let stream_id = StreamId::new(Role::Client, Dir::Bi, 0);
let buf_size = 1000;
let broker = MockBroker::default();
ArcSender::new(stream_id, buf_size, broker)
}

#[test]
fn test_ready_sender_new() {
let stream_id = StreamId::new(Role::Client, Dir::Bi, 0);
let buf_size = 1000;
let broker = MockBroker::default();
let sender = ReadySender::new(stream_id, buf_size, broker);

assert_eq!(sender.stream_id, stream_id);
assert_eq!(sender.max_stream_data, buf_size);
assert!(sender.flush_waker.is_none());
assert!(sender.shutdown_waker.is_none());
assert!(sender.writable_waker.is_none());
}

#[test]
fn test_ready_sender_write() {
let stream_id = StreamId::new(Role::Client, Dir::Bi, 0);
let buf_size = 10;
let broker = MockBroker::default();
let mut sender = ReadySender::new(stream_id, buf_size, broker);

let data = b"hello";
let result = sender.write(data);
assert!(result.is_ok());
assert_eq!(result.unwrap(), 5);

// Test write when buffer is full
let large_data = b"too much data";
let result = sender.write(large_data);
assert!(result.is_ok());
assert_eq!(result.unwrap(), 5);
}

#[tokio::test]
async fn test_ready_sender_poll_write() {
let stream_id = StreamId::new(Role::Client, Dir::Bi, 0);
let buf_size = 10;
let broker = MockBroker::default();
let mut sender = ReadySender::new(stream_id, buf_size, broker);

let data = b"test";
let mut cx = Context::from_waker(futures::task::noop_waker_ref());

if let Poll::Ready(result) = sender.poll_write(&mut cx, data) {
assert!(result.is_ok());
assert_eq!(result.unwrap(), 4);
}

// Test poll_write when buffer is full
sender.max_stream_data = 0;
let result = sender.poll_write(&mut cx, data);
assert!(result.is_pending());
}

#[test]
fn test_sender_state_transitions() {
let stream_id = StreamId::new(Role::Client, Dir::Bi, 0);
let buf_size = 1000;
let broker = MockBroker::default();
let mut ready = ReadySender::new(stream_id, buf_size, broker);

// Test transition to SendingSender
let sending = SendingSender::from(&mut ready);
assert_eq!(sending.stream_id, stream_id);
assert_eq!(sending.max_stream_data, buf_size);

// Test transition to DataSentSender
let mut sending = SendingSender::from(&mut ready);
let data_sent = DataSentSender::from(&mut sending);
assert_eq!(data_sent.stream_id, stream_id);
assert_eq!(data_sent.fin_state, FinState::None);
}

#[test]
fn test_arc_sender() {
let sender = create_test_sender();

// Test buffer size revision
sender.revise_buffer_size(2000);

// Test sender lock access
let guard = sender.sender();
assert!(guard.is_ok());
}

#[test]
fn test_data_sent_sender() {
let stream_id = StreamId::new(Role::Client, Dir::Bi, 0);
let buf_size = 1000;
let broker = MockBroker::default();
let mut sender = DataSentSender {
stream_id,
sndbuf: SendBuf::with_capacity(buf_size as usize),
flush_waker: None,
shutdown_waker: None,
broker,
fin_state: FinState::None,
};

// Test pick_up with empty buffer
let predicate = |_| Some(100);
let result = sender.pick_up(&predicate, 1000);
assert!(result.is_some());
let (offset, is_fresh, data, is_fin) = result.unwrap();
assert_eq!(offset, 0);
assert!(!is_fresh);
assert!(data.0.is_empty() && data.1.is_empty());
assert!(is_fin);
}

#[tokio::test]
async fn test_data_sent_sender_polling() {
let stream_id = StreamId::new(Role::Client, Dir::Bi, 0);
let buf_size = 1000;
let broker = MockBroker::default();
let mut sender = DataSentSender {
stream_id,
sndbuf: SendBuf::with_capacity(buf_size as usize),
flush_waker: None,
shutdown_waker: None,
broker,
fin_state: FinState::Rcvd,
};

let mut cx = Context::from_waker(futures::task::noop_waker_ref());

// Test poll_flush when all data received
let result = sender.poll_flush(&mut cx);
assert!(result.is_ready());

// Test poll_shutdown when all data received
let result = sender.poll_shutdown(&mut cx);
assert!(result.is_ready());
}
}
24 changes: 18 additions & 6 deletions qrecovery/src/send/sndbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@ impl State {
self.0 & Self::SUFFIX
}

#[allow(dead_code)]
fn set_offset(&mut self, value: u64) {
debug_assert!(value <= Self::SUFFIX, "value({value}) overflow");
self.0 = (self.0 & Self::PREFIX) | (value & Self::SUFFIX)
}

fn color(&self) -> Color {
match self.0 >> 62 {
0b00 => Color::Pending,
Expand Down Expand Up @@ -647,6 +641,24 @@ impl SendBuf {
mod tests {
use super::{BufMap, Color, State};

#[test]
fn test_state() {
let state = State::encode(100, Color::Pending);
assert_eq!(state.offset(), 100);
assert_eq!(state.color(), Color::Pending);

let mut state = State::encode(100, Color::Pending);
state.set_color(Color::Flighting);
assert_eq!(state.color(), Color::Flighting);

let state = State::encode(100, Color::Pending);
assert_eq!(state.decode(), (100, Color::Pending));

// test Dispaly
assert_eq!(format!("{}", state), "[100: Pending]");
assert_eq!(format!("{:?}", state), "[100: Pending]");
}

#[test]
fn test_bufmap_empty() {
let buf_map = BufMap::default();
Expand Down

0 comments on commit bdda4f6

Please sign in to comment.