@@ -2,20 +2,27 @@ mod common;
2
2
3
3
#[ cfg( feature = "replicated-loglet" ) ]
4
4
mod tests {
5
- use std:: { num:: NonZeroU8 , sync:: Arc } ;
5
+ use std:: { collections :: BTreeSet , num:: NonZeroU8 , sync:: Arc , time :: Duration } ;
6
6
7
+ use futures_util:: StreamExt ;
7
8
use googletest:: prelude:: * ;
8
9
use restate_bifrost:: loglet:: AppendError ;
10
+ use restate_core:: { cancellation_token, metadata, task_center} ;
9
11
use test_log:: test;
10
12
11
13
use restate_types:: {
12
14
config:: Configuration ,
13
- logs:: { Keys , LogletOffset , Record , TailState } ,
14
- replicated_loglet:: ReplicationProperty ,
15
+ logs:: {
16
+ metadata:: { LogletParams , ProviderKind } ,
17
+ KeyFilter , Keys , LogId , LogletOffset , Lsn , Record , SequenceNumber , TailState ,
18
+ } ,
19
+ replicated_loglet:: { ReplicatedLogletParams , ReplicationProperty } ,
15
20
storage:: PolyBytes ,
16
21
time:: NanosSinceEpoch ,
17
- GenerationalNodeId ,
22
+ GenerationalNodeId , Version ,
18
23
} ;
24
+ use tokio:: task:: { JoinHandle , JoinSet } ;
25
+ use tokio_util:: sync:: CancellationToken ;
19
26
20
27
use super :: common:: replicated_loglet:: run_in_test_env;
21
28
@@ -175,4 +182,168 @@ mod tests {
175
182
)
176
183
. await
177
184
}
185
+
186
+ #[ test( tokio:: test( flavor = "multi_thread" , worker_threads = 4 ) ) ]
187
+ async fn bifrost_append_and_seal_concurrent ( ) -> googletest:: Result < ( ) > {
188
+ const TEST_DURATION : Duration = Duration :: from_secs ( 10 ) ;
189
+ const SEAL_PERIOD : Duration = Duration :: from_secs ( 1 ) ;
190
+ const CONCURRENT_APPENDERS : usize = 20 ;
191
+
192
+ run_in_test_env (
193
+ Configuration :: default ( ) ,
194
+ GenerationalNodeId :: new ( 5 , 1 ) , // local sequencer
195
+ ReplicationProperty :: new ( NonZeroU8 :: new ( 2 ) . unwrap ( ) ) ,
196
+ 3 ,
197
+ |test_env| async move {
198
+ let log_id = LogId :: new ( 0 ) ;
199
+
200
+ let tc = task_center ( ) ;
201
+ let metadata = metadata ( ) ;
202
+
203
+
204
+ let mut appenders: JoinSet < googletest:: Result < _ > > = JoinSet :: new ( ) ;
205
+ let cancel_appenders = CancellationToken :: new ( ) ;
206
+
207
+ for appender_id in 0 ..CONCURRENT_APPENDERS {
208
+ appenders. spawn ( {
209
+ let bifrost = test_env. bifrost . clone ( ) ;
210
+ let cancel_appenders = cancel_appenders. clone ( ) ;
211
+ let tc = tc. clone ( ) ;
212
+ async move {
213
+ tc. run_in_scope ( "append" , None , async move {
214
+ let mut i = 1 ;
215
+ let mut committed = Vec :: new ( ) ;
216
+ while !cancel_appenders. is_cancelled ( ) {
217
+ let offset = bifrost
218
+ . append (
219
+ log_id,
220
+ format ! ( "appender-{}-record{}" , appender_id, i) ,
221
+ )
222
+ . await ?;
223
+ i += 1 ;
224
+ committed. push ( offset) ;
225
+ }
226
+ Ok ( committed)
227
+ } )
228
+ . await
229
+ }
230
+ } ) ;
231
+ }
232
+
233
+ let mut sealer_handle: JoinHandle < googletest:: Result < ( ) > > = tokio:: task:: spawn ( {
234
+ let tc = tc. clone ( ) ;
235
+ let ( bifrost, metadata_writer, metadata_store_client) = (
236
+ test_env. bifrost . clone ( ) ,
237
+ test_env. metadata_writer . clone ( ) ,
238
+ test_env. metadata_store_client . clone ( )
239
+ ) ;
240
+
241
+ async move {
242
+ let cancellation_token = cancellation_token ( ) ;
243
+
244
+ let mut chain = metadata. updateable_logs_metadata ( ) . map ( |logs| logs. chain ( & log_id) . expect ( "a chain to exist" ) ) ;
245
+
246
+ let bifrost_admin = restate_bifrost:: BifrostAdmin :: new (
247
+ & bifrost,
248
+ & metadata_writer,
249
+ & metadata_store_client,
250
+ ) ;
251
+
252
+ tc. run_in_scope ( "sealer" , None , async move {
253
+ let mut last_loglet_id = None ;
254
+
255
+ while !cancellation_token. is_cancelled ( ) {
256
+ tokio:: time:: sleep ( SEAL_PERIOD ) . await ;
257
+
258
+ let mut params = ReplicatedLogletParams :: deserialize_from (
259
+ chain. live_load ( ) . tail ( ) . config . params . as_ref ( ) ,
260
+ ) ?;
261
+ if last_loglet_id == Some ( params. loglet_id ) {
262
+ fail ! ( "Could not seal as metadata has not caught up from the last seal (version={})" , metadata. logs_version( ) ) ?;
263
+ }
264
+ last_loglet_id = Some ( params. loglet_id ) ;
265
+ eprintln ! ( "Sealing loglet {} and creating new loglet {}" , params. loglet_id, params. loglet_id. next( ) ) ;
266
+ params. loglet_id = params. loglet_id . next ( ) ;
267
+
268
+ bifrost_admin
269
+ . seal_and_extend_chain (
270
+ log_id,
271
+ None ,
272
+ Version :: MIN ,
273
+ ProviderKind :: Replicated ,
274
+ LogletParams :: from ( params. serialize ( ) ?) ,
275
+ )
276
+ . await ?;
277
+ }
278
+
279
+ Ok ( ( ) )
280
+ } )
281
+ . await
282
+ }
283
+ } ) ;
284
+
285
+ tokio:: select! {
286
+ res = appenders. join_next( ) => {
287
+ fail!( "an appender exited early: {res:?}" ) ?;
288
+ }
289
+ res = & mut sealer_handle => {
290
+ fail!( "sealer exited early: {res:?}" ) ?;
291
+ }
292
+ _ = tokio:: time:: sleep( TEST_DURATION ) => {
293
+ eprintln!( "cancelling appenders and running validation" )
294
+ }
295
+ }
296
+
297
+ // stop appending
298
+ cancel_appenders. cancel ( ) ;
299
+ // stop sealing
300
+ sealer_handle. abort ( ) ;
301
+
302
+ match sealer_handle. await {
303
+ Err ( err) if err. is_cancelled ( ) => { }
304
+ res => fail ! ( "unexpected error from sealer handle: {res:?}" ) ?,
305
+ }
306
+
307
+ let mut all_committed = BTreeSet :: new ( ) ;
308
+ while let Some ( handle) = appenders. join_next ( ) . await {
309
+ let committed = handle??;
310
+ let committed_len = committed. len ( ) ;
311
+ assert_that ! ( committed_len, ge( 0 ) ) ;
312
+ let tail_record = committed. last ( ) . unwrap ( ) ;
313
+ println ! (
314
+ "Committed len={}, last appended={}" ,
315
+ committed_len, tail_record
316
+ ) ;
317
+ // ensure that all committed records are unique
318
+ for offset in committed {
319
+ if !all_committed. insert ( offset) {
320
+ fail ! ( "Committed duplicate sequence number {}" , offset) ?
321
+ }
322
+ }
323
+ }
324
+ let last_lsn = * all_committed
325
+ . last ( )
326
+ . expect ( "to have committed some records" ) ;
327
+
328
+ let mut reader =
329
+ test_env. bifrost . create_reader ( log_id, KeyFilter :: Any , Lsn :: OLDEST , last_lsn) ?;
330
+
331
+ let mut records = BTreeSet :: new ( ) ;
332
+
333
+ while let Some ( record) = reader. next ( ) . await {
334
+ let record = record?;
335
+ if !records. insert ( record. sequence_number ( ) ) {
336
+ fail ! ( "Read duplicate sequence number {}" , record. sequence_number( ) ) ?
337
+ }
338
+ }
339
+
340
+ // every record committed must be observed exactly once in readstream
341
+ assert ! ( all_committed. eq( & records) ) ;
342
+ eprintln ! ( "Validated {} committed records" , all_committed. len( ) ) ;
343
+
344
+ Ok ( ( ) )
345
+ } ,
346
+ )
347
+ . await
348
+ }
178
349
}
0 commit comments