@@ -33,6 +33,7 @@ use crucible_protocol::JobId;
33
33
use crucible_protocol:: Message ;
34
34
use crucible_protocol:: ReadBlockContext ;
35
35
use crucible_protocol:: ReadResponseHeader ;
36
+ use crucible_protocol:: SnapshotDetails ;
36
37
use crucible_protocol:: WriteHeader ;
37
38
38
39
use bytes:: BytesMut ;
@@ -291,6 +292,35 @@ impl DownstairsHandle {
291
292
}
292
293
}
293
294
295
+ /// Awaits a `Message::Flush` and sends a `FlushAck` with an `IoError`
296
+ ///
297
+ /// Returns the flush number for further checks.
298
+ ///
299
+ /// # Panics
300
+ /// If a non-flush message arrives
301
+ pub async fn err_flush ( & mut self ) -> u64 {
302
+ match self . recv ( ) . await . unwrap ( ) {
303
+ Message :: Flush {
304
+ job_id,
305
+ flush_number,
306
+ upstairs_id,
307
+ ..
308
+ } => {
309
+ self . send ( Message :: FlushAck {
310
+ upstairs_id,
311
+ session_id : self . upstairs_session_id . unwrap ( ) ,
312
+ job_id,
313
+ result : Err ( CrucibleError :: IoError ( "oh no" . to_string ( ) ) ) ,
314
+ } )
315
+ . unwrap ( ) ;
316
+ flush_number
317
+ }
318
+ m => {
319
+ panic ! ( "saw non flush {m:?}" ) ;
320
+ }
321
+ }
322
+ }
323
+
294
324
/// Awaits a `Message::Write { .. }` and sends a `WriteAck`
295
325
///
296
326
/// Returns the job ID for further checks.
@@ -313,6 +343,23 @@ impl DownstairsHandle {
313
343
}
314
344
}
315
345
346
+ /// Awaits a `Message::Write` and sends a `WriteAck` with `IOError`
347
+ pub async fn err_write ( & mut self ) -> JobId {
348
+ match self . recv ( ) . await . unwrap ( ) {
349
+ Message :: Write { header, .. } => {
350
+ self . send ( Message :: WriteAck {
351
+ upstairs_id : header. upstairs_id ,
352
+ session_id : self . upstairs_session_id . unwrap ( ) ,
353
+ job_id : header. job_id ,
354
+ result : Err ( CrucibleError :: IoError ( "oh no" . to_string ( ) ) ) ,
355
+ } )
356
+ . unwrap ( ) ;
357
+ header. job_id
358
+ }
359
+ m => panic ! ( "saw non write: {m:?}" ) ,
360
+ }
361
+ }
362
+
316
363
/// Awaits a `Message::Barrier { .. }` and sends a `BarrierAck`
317
364
///
318
365
/// Returns the job ID for further checks.
@@ -360,7 +407,7 @@ impl DownstairsHandle {
360
407
job_id,
361
408
blocks : Ok ( vec ! [ block] ) ,
362
409
} ,
363
- data : data . clone ( ) ,
410
+ data,
364
411
} )
365
412
. unwrap ( ) ;
366
413
job_id
@@ -813,7 +860,7 @@ async fn run_live_repair(mut harness: TestHarness) {
813
860
job_id,
814
861
blocks : Ok ( vec ! [ block] ) ,
815
862
} ,
816
- data : data . clone ( ) ,
863
+ data,
817
864
} ) {
818
865
Ok ( ( ) ) => panic ! ( "DS1 should be disconnected" ) ,
819
866
Err ( e) => {
@@ -2964,3 +3011,191 @@ async fn test_bytes_based_barrier() {
2964
3011
harness. ds2 . ack_flush ( ) . await ;
2965
3012
harness. ds3 . ack_flush ( ) . await ;
2966
3013
}
3014
+
3015
+ /// Test for early rejection of writes if > 1 Downstairs is unavailable
3016
+ #[ tokio:: test]
3017
+ async fn fast_write_rejection ( ) {
3018
+ let mut harness = TestHarness :: new ( ) . await ;
3019
+
3020
+ let write_buf = BytesMut :: from ( vec ! [ 1 ; 4096 ] . as_slice ( ) ) ;
3021
+ harness
3022
+ . guest
3023
+ . write ( BlockIndex ( 0 ) , write_buf. clone ( ) )
3024
+ . await
3025
+ . unwrap ( ) ;
3026
+
3027
+ harness. ds1 ( ) . err_write ( ) . await ;
3028
+ harness. ds2 . ack_write ( ) . await ;
3029
+ harness. ds3 . ack_write ( ) . await ;
3030
+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 100 ) ) . await ;
3031
+ let ds = harness. guest . downstairs_state ( ) . await . unwrap ( ) ;
3032
+ assert_eq ! ( ds[ ClientId :: new( 0 ) ] , DsState :: Faulted ) ;
3033
+ assert_eq ! ( ds[ ClientId :: new( 1 ) ] , DsState :: Active ) ;
3034
+ assert_eq ! ( ds[ ClientId :: new( 2 ) ] , DsState :: Active ) ;
3035
+
3036
+ // Send a second write, which should still work (because we have 2/3 ds)
3037
+ harness
3038
+ . guest
3039
+ . write ( BlockIndex ( 0 ) , write_buf. clone ( ) )
3040
+ . await
3041
+ . unwrap ( ) ;
3042
+ harness. ds2 . err_write ( ) . await ;
3043
+ harness. ds3 . ack_write ( ) . await ;
3044
+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 100 ) ) . await ;
3045
+ let ds = harness. guest . downstairs_state ( ) . await . unwrap ( ) ;
3046
+ assert_eq ! ( ds[ ClientId :: new( 0 ) ] , DsState :: Faulted ) ;
3047
+ assert_eq ! ( ds[ ClientId :: new( 1 ) ] , DsState :: Faulted ) ;
3048
+ assert_eq ! ( ds[ ClientId :: new( 2 ) ] , DsState :: Active ) ;
3049
+
3050
+ // Subsequent writes should be rejected immediately
3051
+ let r = harness. guest . write ( BlockIndex ( 0 ) , write_buf. clone ( ) ) . await ;
3052
+ assert ! (
3053
+ matches!( r, Err ( CrucibleError :: IoError ( ..) ) ) ,
3054
+ "expected IoError, got {r:?}"
3055
+ ) ;
3056
+ }
3057
+
3058
+ /// Make sure reads work with only 1x Downstairs
3059
+ #[ tokio:: test]
3060
+ async fn read_with_one_fault ( ) {
3061
+ let mut harness = TestHarness :: new ( ) . await ;
3062
+
3063
+ // Use a write to fault DS0 (XXX why do read errors not fault a DS?)
3064
+ let write_buf = BytesMut :: from ( vec ! [ 1 ; 4096 ] . as_slice ( ) ) ;
3065
+ harness
3066
+ . guest
3067
+ . write ( BlockIndex ( 0 ) , write_buf. clone ( ) )
3068
+ . await
3069
+ . unwrap ( ) ;
3070
+ harness. ds1 ( ) . err_write ( ) . await ;
3071
+ harness. ds2 . ack_write ( ) . await ;
3072
+ harness. ds3 . ack_write ( ) . await ;
3073
+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 100 ) ) . await ;
3074
+ let ds = harness. guest . downstairs_state ( ) . await . unwrap ( ) ;
3075
+ assert_eq ! ( ds[ ClientId :: new( 0 ) ] , DsState :: Faulted ) ;
3076
+ assert_eq ! ( ds[ ClientId :: new( 1 ) ] , DsState :: Active ) ;
3077
+ assert_eq ! ( ds[ ClientId :: new( 2 ) ] , DsState :: Active ) ;
3078
+
3079
+ // Check that reads still work
3080
+ let h = harness. spawn ( |guest| async move {
3081
+ let mut buffer = Buffer :: new ( 1 , 512 ) ;
3082
+ guest. read ( BlockIndex ( 0 ) , & mut buffer) . await . unwrap ( ) ;
3083
+ } ) ;
3084
+ harness. ds2 . ack_read ( ) . await ;
3085
+ h. await . unwrap ( ) ; // we have > 1x reply, so the read will return
3086
+ harness. ds3 . ack_read ( ) . await ;
3087
+
3088
+ // Take out DS1 next
3089
+ harness
3090
+ . guest
3091
+ . write ( BlockIndex ( 0 ) , write_buf. clone ( ) )
3092
+ . await
3093
+ . unwrap ( ) ;
3094
+ harness. ds2 . err_write ( ) . await ;
3095
+ harness. ds3 . ack_write ( ) . await ;
3096
+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 100 ) ) . await ;
3097
+ let ds = harness. guest . downstairs_state ( ) . await . unwrap ( ) ;
3098
+ assert_eq ! ( ds[ ClientId :: new( 0 ) ] , DsState :: Faulted ) ;
3099
+ assert_eq ! ( ds[ ClientId :: new( 1 ) ] , DsState :: Faulted ) ;
3100
+ assert_eq ! ( ds[ ClientId :: new( 2 ) ] , DsState :: Active ) ;
3101
+
3102
+ // Reads still work with 1x Downstairs
3103
+ let h = harness. spawn ( |guest| async move {
3104
+ let mut buffer = Buffer :: new ( 1 , 512 ) ;
3105
+ guest. read ( BlockIndex ( 0 ) , & mut buffer) . await . unwrap ( ) ;
3106
+ } ) ;
3107
+ harness. ds3 . ack_read ( ) . await ;
3108
+ h. await . unwrap ( ) ; // we have > 1x reply, so the read will return
3109
+ }
3110
+
3111
+ /// Test early rejection of reads with 0x running Downstairs
3112
+ #[ tokio:: test]
3113
+ async fn fast_read_rejection ( ) {
3114
+ let mut harness = TestHarness :: new ( ) . await ;
3115
+
3116
+ // Use a write to fault DS0 (XXX why do read errors not fault a DS?)
3117
+ let write_buf = BytesMut :: from ( vec ! [ 1 ; 4096 ] . as_slice ( ) ) ;
3118
+ harness
3119
+ . guest
3120
+ . write ( BlockIndex ( 0 ) , write_buf. clone ( ) )
3121
+ . await
3122
+ . unwrap ( ) ;
3123
+ harness. ds1 ( ) . err_write ( ) . await ;
3124
+ harness. ds2 . err_write ( ) . await ;
3125
+ harness. ds3 . err_write ( ) . await ;
3126
+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 100 ) ) . await ;
3127
+ let ds = harness. guest . downstairs_state ( ) . await . unwrap ( ) ;
3128
+ assert_eq ! ( ds[ ClientId :: new( 0 ) ] , DsState :: Faulted ) ;
3129
+ assert_eq ! ( ds[ ClientId :: new( 1 ) ] , DsState :: Faulted ) ;
3130
+ assert_eq ! ( ds[ ClientId :: new( 2 ) ] , DsState :: Faulted ) ;
3131
+
3132
+ // Reads should return errors immediately
3133
+ let mut buffer = Buffer :: new ( 1 , 512 ) ;
3134
+ match harness. guest . read ( BlockIndex ( 0 ) , & mut buffer) . await {
3135
+ Err ( CrucibleError :: IoError ( s) ) => {
3136
+ assert ! ( s. contains( "too many inactive clients" ) )
3137
+ }
3138
+ r => panic ! ( "expected IoError, got {r:?}" ) ,
3139
+ }
3140
+ }
3141
+
3142
+ /// Test for early rejection of flushes
3143
+ #[ tokio:: test]
3144
+ async fn fast_flush_rejection ( ) {
3145
+ let mut harness = TestHarness :: new ( ) . await ;
3146
+
3147
+ let h = harness. spawn ( |guest| async move {
3148
+ guest. flush ( None ) . await . unwrap ( ) ;
3149
+ } ) ;
3150
+ harness. ds1 ( ) . err_flush ( ) . await ;
3151
+ harness. ds2 . ack_flush ( ) . await ;
3152
+ harness. ds3 . ack_flush ( ) . await ;
3153
+ h. await . unwrap ( ) ;
3154
+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 100 ) ) . await ;
3155
+ let ds = harness. guest . downstairs_state ( ) . await . unwrap ( ) ;
3156
+ assert_eq ! ( ds[ ClientId :: new( 0 ) ] , DsState :: Faulted ) ;
3157
+ assert_eq ! ( ds[ ClientId :: new( 1 ) ] , DsState :: Active ) ;
3158
+ assert_eq ! ( ds[ ClientId :: new( 2 ) ] , DsState :: Active ) ;
3159
+
3160
+ // A flush with snapshot should fail immediately
3161
+ match harness
3162
+ . guest
3163
+ . flush ( Some ( SnapshotDetails {
3164
+ snapshot_name : "hiiiii" . to_string ( ) ,
3165
+ } ) )
3166
+ . await
3167
+ {
3168
+ Err ( CrucibleError :: IoError ( s) ) => {
3169
+ assert ! ( s. contains( "too many inactive clients" ) )
3170
+ }
3171
+ r => panic ! ( "expected IoError, got {r:?}" ) ,
3172
+ }
3173
+
3174
+ // A non-snapshot flush should still succeed
3175
+ let h = harness. spawn ( |guest| async move {
3176
+ guest. flush ( None ) . await . unwrap ( ) ;
3177
+ } ) ;
3178
+ harness. ds2 . ack_flush ( ) . await ;
3179
+ harness. ds3 . ack_flush ( ) . await ;
3180
+ h. await . unwrap ( ) ;
3181
+
3182
+ // Use a flush to take out another downstairs
3183
+ let h = harness. spawn ( |guest| async move { guest. flush ( None ) . await } ) ;
3184
+ harness. ds2 . ack_flush ( ) . await ;
3185
+ harness. ds3 . err_flush ( ) . await ;
3186
+ let r = h. await . unwrap ( ) ;
3187
+ assert ! ( r. is_err( ) ) ;
3188
+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 100 ) ) . await ;
3189
+ let ds = harness. guest . downstairs_state ( ) . await . unwrap ( ) ;
3190
+ assert_eq ! ( ds[ ClientId :: new( 0 ) ] , DsState :: Faulted ) ;
3191
+ assert_eq ! ( ds[ ClientId :: new( 1 ) ] , DsState :: Active ) ;
3192
+ assert_eq ! ( ds[ ClientId :: new( 2 ) ] , DsState :: Faulted ) ;
3193
+
3194
+ // Subsequent flushes should fail immediately
3195
+ match harness. guest . flush ( None ) . await {
3196
+ Err ( CrucibleError :: IoError ( s) ) => {
3197
+ assert ! ( s. contains( "too many inactive clients" ) )
3198
+ }
3199
+ r => panic ! ( "expected IoError, got {r:?}" ) ,
3200
+ }
3201
+ }
0 commit comments