From 775749f049384d6df9adbcf0df5fb1eb2a3a0b7e Mon Sep 17 00:00:00 2001 From: Matt Keeter Date: Tue, 19 Nov 2024 11:03:09 -0500 Subject: [PATCH] Unit tests for early rejection --- upstairs/src/dummy_downstairs_tests.rs | 239 ++++++++++++++++++++++++- 1 file changed, 237 insertions(+), 2 deletions(-) diff --git a/upstairs/src/dummy_downstairs_tests.rs b/upstairs/src/dummy_downstairs_tests.rs index fb7254fbd..bf74d5a59 100644 --- a/upstairs/src/dummy_downstairs_tests.rs +++ b/upstairs/src/dummy_downstairs_tests.rs @@ -31,6 +31,7 @@ use crucible_protocol::JobId; use crucible_protocol::Message; use crucible_protocol::ReadBlockContext; use crucible_protocol::ReadResponseHeader; +use crucible_protocol::SnapshotDetails; use crucible_protocol::WriteHeader; use bytes::BytesMut; @@ -289,6 +290,35 @@ impl DownstairsHandle { } } + /// Awaits a `Message::Flush` and sends a `FlushAck` with an `IoError` + /// + /// Returns the flush number for further checks. + /// + /// # Panics + /// If a non-flush message arrives + pub async fn err_flush(&mut self) -> u64 { + match self.recv().await.unwrap() { + Message::Flush { + job_id, + flush_number, + upstairs_id, + .. + } => { + self.send(Message::FlushAck { + upstairs_id, + session_id: self.upstairs_session_id.unwrap(), + job_id, + result: Err(CrucibleError::IoError("oh no".to_string())), + }) + .unwrap(); + flush_number + } + m => { + panic!("saw non flush {m:?}"); + } + } + } + /// Awaits a `Message::Write { .. }` and sends a `WriteAck` /// /// Returns the job ID for further checks. @@ -311,6 +341,23 @@ impl DownstairsHandle { } } + /// Awaits a `Message::Write` and sends a `WriteAck` with `IOError` + pub async fn err_write(&mut self) -> JobId { + match self.recv().await.unwrap() { + Message::Write { header, .. } => { + self.send(Message::WriteAck { + upstairs_id: header.upstairs_id, + session_id: self.upstairs_session_id.unwrap(), + job_id: header.job_id, + result: Err(CrucibleError::IoError("oh no".to_string())), + }) + .unwrap(); + header.job_id + } + m => panic!("saw non write: {m:?}"), + } + } + /// Awaits a `Message::Barrier { .. }` and sends a `BarrierAck` /// /// Returns the job ID for further checks. @@ -358,7 +405,7 @@ impl DownstairsHandle { job_id, blocks: Ok(vec![block]), }, - data: data.clone(), + data, }) .unwrap(); job_id @@ -811,7 +858,7 @@ async fn run_live_repair(mut harness: TestHarness) { job_id, blocks: Ok(vec![block]), }, - data: data.clone(), + data, }) { Ok(()) => panic!("DS1 should be disconnected"), Err(e) => { @@ -2890,3 +2937,191 @@ async fn test_bytes_based_barrier() { harness.ds2.ack_flush().await; harness.ds3.ack_flush().await; } + +/// Test for early rejection of writes if > 1 Downstairs is unavailable +#[tokio::test] +async fn fast_write_rejection() { + let mut harness = TestHarness::new().await; + + let write_buf = BytesMut::from(vec![1; 4096].as_slice()); + harness + .guest + .write(BlockIndex(0), write_buf.clone()) + .await + .unwrap(); + + harness.ds1().err_write().await; + harness.ds2.ack_write().await; + harness.ds3.ack_write().await; + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let ds = harness.guest.downstairs_state().await.unwrap(); + assert_eq!(ds[ClientId::new(0)], DsState::Faulted); + assert_eq!(ds[ClientId::new(1)], DsState::Active); + assert_eq!(ds[ClientId::new(2)], DsState::Active); + + // Send a second write, which should still work (because we have 2/3 ds) + harness + .guest + .write(BlockIndex(0), write_buf.clone()) + .await + .unwrap(); + harness.ds2.err_write().await; + harness.ds3.ack_write().await; + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let ds = harness.guest.downstairs_state().await.unwrap(); + assert_eq!(ds[ClientId::new(0)], DsState::Faulted); + assert_eq!(ds[ClientId::new(1)], DsState::Faulted); + assert_eq!(ds[ClientId::new(2)], DsState::Active); + + // Subsequent writes should be rejected immediately + let r = harness.guest.write(BlockIndex(0), write_buf.clone()).await; + assert!( + matches!(r, Err(CrucibleError::IoError(..))), + "expected IoError, got {r:?}" + ); +} + +/// Make sure reads work with only 1x Downstairs +#[tokio::test] +async fn read_with_one_fault() { + let mut harness = TestHarness::new().await; + + // Use a write to fault DS0 (XXX why do read errors not fault a DS?) + let write_buf = BytesMut::from(vec![1; 4096].as_slice()); + harness + .guest + .write(BlockIndex(0), write_buf.clone()) + .await + .unwrap(); + harness.ds1().err_write().await; + harness.ds2.ack_write().await; + harness.ds3.ack_write().await; + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let ds = harness.guest.downstairs_state().await.unwrap(); + assert_eq!(ds[ClientId::new(0)], DsState::Faulted); + assert_eq!(ds[ClientId::new(1)], DsState::Active); + assert_eq!(ds[ClientId::new(2)], DsState::Active); + + // Check that reads still work + let h = harness.spawn(|guest| async move { + let mut buffer = Buffer::new(1, 512); + guest.read(BlockIndex(0), &mut buffer).await.unwrap(); + }); + harness.ds2.ack_read().await; + h.await.unwrap(); // we have > 1x reply, so the read will return + harness.ds3.ack_read().await; + + // Take out DS1 next + harness + .guest + .write(BlockIndex(0), write_buf.clone()) + .await + .unwrap(); + harness.ds2.err_write().await; + harness.ds3.ack_write().await; + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let ds = harness.guest.downstairs_state().await.unwrap(); + assert_eq!(ds[ClientId::new(0)], DsState::Faulted); + assert_eq!(ds[ClientId::new(1)], DsState::Faulted); + assert_eq!(ds[ClientId::new(2)], DsState::Active); + + // Reads still work with 1x Downstairs + let h = harness.spawn(|guest| async move { + let mut buffer = Buffer::new(1, 512); + guest.read(BlockIndex(0), &mut buffer).await.unwrap(); + }); + harness.ds3.ack_read().await; + h.await.unwrap(); // we have > 1x reply, so the read will return +} + +/// Test early rejection of reads with 0x running Downstairs +#[tokio::test] +async fn fast_read_rejection() { + let mut harness = TestHarness::new().await; + + // Use a write to fault DS0 (XXX why do read errors not fault a DS?) + let write_buf = BytesMut::from(vec![1; 4096].as_slice()); + harness + .guest + .write(BlockIndex(0), write_buf.clone()) + .await + .unwrap(); + harness.ds1().err_write().await; + harness.ds2.err_write().await; + harness.ds3.err_write().await; + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let ds = harness.guest.downstairs_state().await.unwrap(); + assert_eq!(ds[ClientId::new(0)], DsState::Faulted); + assert_eq!(ds[ClientId::new(1)], DsState::Faulted); + assert_eq!(ds[ClientId::new(2)], DsState::Faulted); + + // Reads should return errors immediately + let mut buffer = Buffer::new(1, 512); + match harness.guest.read(BlockIndex(0), &mut buffer).await { + Err(CrucibleError::IoError(s)) => { + assert!(s.contains("too many inactive clients")) + } + r => panic!("expected IoError, got {r:?}"), + } +} + +/// Test for early rejection of flushes +#[tokio::test] +async fn fast_flush_rejection() { + let mut harness = TestHarness::new().await; + + let h = harness.spawn(|guest| async move { + guest.flush(None).await.unwrap(); + }); + harness.ds1().err_flush().await; + harness.ds2.ack_flush().await; + harness.ds3.ack_flush().await; + h.await.unwrap(); + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let ds = harness.guest.downstairs_state().await.unwrap(); + assert_eq!(ds[ClientId::new(0)], DsState::Faulted); + assert_eq!(ds[ClientId::new(1)], DsState::Active); + assert_eq!(ds[ClientId::new(2)], DsState::Active); + + // A flush with snapshot should fail immediately + match harness + .guest + .flush(Some(SnapshotDetails { + snapshot_name: "hiiiii".to_string(), + })) + .await + { + Err(CrucibleError::IoError(s)) => { + assert!(s.contains("too many inactive clients")) + } + r => panic!("expected IoError, got {r:?}"), + } + + // A non-snapshot flush should still succeed + let h = harness.spawn(|guest| async move { + guest.flush(None).await.unwrap(); + }); + harness.ds2.ack_flush().await; + harness.ds3.ack_flush().await; + h.await.unwrap(); + + // Use a flush to take out another downstairs + let h = harness.spawn(|guest| async move { guest.flush(None).await }); + harness.ds2.ack_flush().await; + harness.ds3.err_flush().await; + let r = h.await.unwrap(); + assert!(r.is_err()); + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let ds = harness.guest.downstairs_state().await.unwrap(); + assert_eq!(ds[ClientId::new(0)], DsState::Faulted); + assert_eq!(ds[ClientId::new(1)], DsState::Active); + assert_eq!(ds[ClientId::new(2)], DsState::Faulted); + + // Subsequent flushes should fail immediately + match harness.guest.flush(None).await { + Err(CrucibleError::IoError(s)) => { + assert!(s.contains("too many inactive clients")) + } + r => panic!("expected IoError, got {r:?}"), + } +}