@@ -93,7 +93,8 @@ all_tests() ->
93
93
init_partial_writes ,
94
94
init_with_unexpected_file ,
95
95
overview_with_missing_segment ,
96
- overview_with_missing_index_at_start
96
+ overview_with_missing_index_at_start ,
97
+ read_ahead
97
98
% read_header_ahead_offset_reader,
98
99
% read_header_ahead_offset_reader_filter
99
100
].
@@ -385,6 +386,83 @@ iterator_read_chunk_mixed_sizes_with_credit(Config) ->
385
386
osiris_log :close (S1 ),
386
387
ok .
387
388
389
+ read_ahead (Config ) ->
390
+ RAL = 4096 , % % read ahead limit
391
+ HS = ? HEADER_SIZE_B ,
392
+ Conf = ? config (osiris_conf , Config ),
393
+ W = osiris_log :init (Conf ),
394
+ Shared = osiris_log :get_shared (W ),
395
+ RConf = Conf #{shared => Shared , transport => tcp },
396
+ {ok , R } = osiris_log :init_offset_reader (0 , RConf ),
397
+
398
+ % % server, we will read stream data from this socket
399
+ {ok , SLS } = gen_tcp :listen (0 , [binary , {packet , 0 },
400
+ {active , false }]),
401
+ {ok , Port } = inet :port (SLS ),
402
+
403
+ % % client, osiris will send to this socket
404
+ {ok , CS } = gen_tcp :connect (" localhost" , Port ,
405
+ [binary , {packet , 0 }]),
406
+
407
+ {ok , SS } = gen_tcp :accept (SLS ),
408
+
409
+ Tests =
410
+ [
411
+ fun (write , #{w := W0 }) ->
412
+ Entries = [<<" hiiiiiiiii" >>, <<" hooooooo" >>],
413
+ {_ , W1 } = write_committed (Entries , W0 ),
414
+ W1 ;
415
+ (read , #{r := R0 , tracer := T }) ->
416
+ % % first chunk, we read the header and the chunk
417
+ Entries = [<<" hiiiiiiiii" >>, <<" hooooooo" >>],
418
+ [_ , _ , D , _ ] = fake_chunk (Entries , ? LINE , 1 , 100 ),
419
+ {ok , R1 } = osiris_log :send_file (CS , R0 ),
420
+ {ok , Read } = recv (SS , byte_size (iolist_to_binary (D )) + HS ),
421
+ ? assertEqual (iolist_to_binary (D ), binary :part (Read , HS , byte_size (Read ) - HS )),
422
+ ? assertEqual (2 , length (osiris_tracer :calls (T ))),
423
+ R1
424
+ end ,
425
+ fun (write , #{w := W0 }) ->
426
+ {_ , W1 } = write_committed ([<<" hi" >>, <<" ho" >>], W0 ),
427
+ W1 ;
428
+ (read , #{r := R0 , tracer := T }) ->
429
+ % % small chunk, we read it ahead already
430
+ [_ , _ , D , _ ] = fake_chunk ([<<" hi" >>, <<" ho" >>], ? LINE , 1 , 100 ),
431
+ {ok , R1 } = osiris_log :send_file (CS , R0 ),
432
+ {ok , Read } = recv (SS , byte_size (iolist_to_binary (D )) + HS ),
433
+ ? assertEqual (iolist_to_binary (D ), binary :part (Read , HS , byte_size (Read ) - HS )),
434
+ ? assertEqual (0 , length (osiris_tracer :calls (T ))),
435
+ R1
436
+ end ,
437
+ fun (write , #{w := W0 }) ->
438
+ Entries = [<<" foo" >>, binary :copy (<<" b" >>, RAL * 2 )],
439
+ {_ , W1 } = write_committed (Entries , W0 ),
440
+ W1 ;
441
+ (read , #{r := R0 , tracer := T }) ->
442
+ % % large chunk, we will need to read from the file system
443
+ Entries = [<<" foo" >>, binary :copy (<<" b" >>, RAL * 2 )],
444
+ [_ , _ , D , _ ] = fake_chunk (Entries , ? LINE , 1 , 100 ),
445
+ {ok , R1 } = osiris_log :send_file (CS , R0 ),
446
+ {ok , Read } = recv (SS , byte_size (iolist_to_binary (D )) + HS ),
447
+ ? assertEqual (iolist_to_binary (D ), binary :part (Read , HS , byte_size (Read ) - HS )),
448
+ ? assertEqual (0 , length (osiris_tracer :calls (T , file , pread ))),
449
+ ? assertEqual (1 , length (osiris_tracer :calls (T , file , sendfile ))),
450
+ R1
451
+ end
452
+ ],
453
+
454
+ #{w := W1 , r := R1 } = run_read_ahead_tests (Tests , offset ,
455
+ ? DEFAULT_FILTER_SIZE , W , R ),
456
+
457
+ ok = gen_tcp :close (CS ),
458
+
459
+ ok = gen_tcp :close (SS ),
460
+ ok = gen_tcp :close (SLS ),
461
+
462
+ osiris_log :close (R1 ),
463
+ osiris_log :close (W1 ),
464
+ ok .
465
+
388
466
iterator_read_chunk_with_read_ahead (Config ) ->
389
467
% % the test makes sure reading ahead on header reading does not break
390
468
% % the iterator
@@ -402,25 +480,25 @@ iterator_read_chunk_with_read_ahead(Config) ->
402
480
W1 ;
403
481
(read , #{r := R0 , tracer := T }) ->
404
482
% % first chunk, there won't be any data size hints in the reader
405
- {ok , H , I0 , R1 } = osiris_log :chunk_iterator (R0 ),
483
+ {ok , _ , I0 , R1 } = osiris_log :chunk_iterator (R0 ),
406
484
{{_ , <<" ho" >>}, I1 } = osiris_log :iterator_next (I0 ),
407
485
{{_ , <<" hi" >>}, I2 } = osiris_log :iterator_next (I1 ),
408
486
? assertMatch (end_of_chunk , osiris_log :iterator_next (I2 )),
409
487
? assertEqual (2 , length (osiris_tracer :calls (T ))),
410
- { H , R1 }
488
+ R1
411
489
end ,
412
490
fun (write , #{w := W0 }) ->
413
491
% % this one will be read ahead
414
492
EntriesRev = [<<" foo" >>, <<" bar" >>],
415
493
{_ , W1 } = write_committed (EntriesRev , W0 ),
416
494
W1 ;
417
495
(read , #{r := R0 , tracer := T }) ->
418
- {ok , H , I0 , R1 } = osiris_log :chunk_iterator (R0 ),
496
+ {ok , _ , I0 , R1 } = osiris_log :chunk_iterator (R0 ),
419
497
{{_ , <<" bar" >>}, I1 } = osiris_log :iterator_next (I0 ),
420
498
{{_ , <<" foo" >>}, I2 } = osiris_log :iterator_next (I1 ),
421
499
? assertMatch (end_of_chunk , osiris_log :iterator_next (I2 )),
422
500
? assertEqual (0 , length (osiris_tracer :calls (T ))),
423
- { H , R1 }
501
+ R1
424
502
end ,
425
503
fun (write , #{w := W0 }) ->
426
504
% % this one will be read ahead
@@ -430,12 +508,12 @@ iterator_read_chunk_with_read_ahead(Config) ->
430
508
W1 ;
431
509
(read , #{r := R0 , tracer := T }) ->
432
510
E1 = binary :copy (<<" b" >>, RAL - 200 ),
433
- {ok , H , I0 , R1 } = osiris_log :chunk_iterator (R0 ),
511
+ {ok , _ , I0 , R1 } = osiris_log :chunk_iterator (R0 ),
434
512
{{_ , <<" aaa" >>}, I1 } = osiris_log :iterator_next (I0 ),
435
513
{{_ , E1 }, I2 } = osiris_log :iterator_next (I1 ),
436
514
? assertMatch (end_of_chunk , osiris_log :iterator_next (I2 )),
437
515
? assertEqual (0 , length (osiris_tracer :calls (T ))),
438
- { H , R1 }
516
+ R1
439
517
end ,
440
518
fun (write , #{w := W0 }) ->
441
519
% % this one is too big to be read ahead
@@ -445,12 +523,12 @@ iterator_read_chunk_with_read_ahead(Config) ->
445
523
W1 ;
446
524
(read , #{r := R0 , tracer := T }) ->
447
525
E1 = binary :copy (<<" b" >>, RAL * 2 ),
448
- {ok , H , I0 , R1 } = osiris_log :chunk_iterator (R0 ),
526
+ {ok , _ , I0 , R1 } = osiris_log :chunk_iterator (R0 ),
449
527
{{_ , <<" aaa" >>}, I1 } = osiris_log :iterator_next (I0 ),
450
528
{{_ , E1 }, I2 } = osiris_log :iterator_next (I1 ),
451
529
? assertMatch (end_of_chunk , osiris_log :iterator_next (I2 )),
452
530
? assertEqual (2 , length (osiris_tracer :calls (T ))),
453
- { H , R1 }
531
+ R1
454
532
end
455
533
],
456
534
@@ -2191,20 +2269,14 @@ run_read_ahead_tests(Tests, RType, FSize, Wr0, Rd0) ->
2191
2269
#{w => W }
2192
2270
end , #{w => Wr0 }, Tests ),
2193
2271
R = lists :foldl (fun (F , Acc ) ->
2194
- Tracer = osiris_tracer :start ({file , pread , '_' }),
2195
- {_ , R0 } = F (read , Acc #{tracer => Tracer }),
2272
+ Tracer = osiris_tracer :start ([{file , pread , '_' },
2273
+ {file , sendfile , '_' }]),
2274
+ R0 = F (read , Acc #{tracer => Tracer }),
2196
2275
osiris_tracer :stop (Tracer ),
2197
- % R1 = update_read(H, R0),
2198
2276
#{r => R0 , rtype => RType , fsize => FSize }
2199
2277
end , #{r => Rd0 , rtype => RType , fsize => FSize }, Tests ),
2200
2278
maps :merge (W , R ).
2201
2279
2202
- - spec update_read (map (), osiris_log :state ()) -> osiris_log :state ().
2203
- update_read (#{chunk_id := ChId ,
2204
- num_records := NumRecords ,
2205
- next_position := NextPos }, R ) ->
2206
- osiris_log :update_read (R , ChId , NumRecords , NextPos ).
2207
-
2208
2280
truncate_at (File , Pos ) ->
2209
2281
{ok , Fd } = file :open (File , [raw , binary , read , write ]),
2210
2282
% truncate the file so that chunk <<four>> is missing and <<three>> is corrupted
@@ -2311,3 +2383,19 @@ fake_chunk(Blobs, Ts, Epoch, NextChId, FSize) ->
2311
2383
element (1 ,
2312
2384
osiris_log :make_chunk (Blobs , <<>>, 0 , Ts , Epoch , NextChId ,
2313
2385
FSize )).
2386
+
2387
+ recv (Socket , Expected ) ->
2388
+ recv (Socket , Expected , <<>>).
2389
+
2390
+ recv (_Socket , 0 , Acc ) ->
2391
+ Acc ;
2392
+ recv (Socket , Expected , Acc ) ->
2393
+ case gen_tcp :recv (Socket , Expected , 10_000 ) of
2394
+ {ok , Data } when byte_size (Data ) == Expected ->
2395
+ {ok , Data };
2396
+ {ok , Data } when byte_size (Data ) < Expected ->
2397
+ {ok , recv (Socket , Expected - byte_size (Data ),
2398
+ <<Acc /binary , Data /binary >>)};
2399
+ Other ->
2400
+ Other
2401
+ end .
0 commit comments