35
35
import java .util .Set ;
36
36
import java .util .concurrent .ThreadLocalRandom ;
37
37
import java .util .concurrent .atomic .AtomicReference ;
38
+ import java .util .function .Predicate ;
38
39
import java .util .stream .Collectors ;
39
40
40
41
import static org .assertj .core .api .Assertions .assertThat ;
@@ -155,18 +156,164 @@ public void testLaterOrEqualTimeMills() throws IOException {
155
156
}
156
157
157
158
@ Test
158
- public void testlaterOrEqualWatermark () throws IOException {
159
+ public void testLaterOrEqualWatermark () throws IOException {
159
160
long millis = Long .MIN_VALUE ;
160
161
FileIO localFileIO = LocalFileIO .create ();
161
162
SnapshotManager snapshotManager =
162
163
new SnapshotManager (localFileIO , new Path (tempDir .toString ()));
163
- // create 10 snapshots
164
- for (long i = 0 ; i < 10 ; i ++) {
164
+ // create 10 snapshots, watermarks are Long.MIN_VALUE.
165
+ long i = 0 ;
166
+ for (; i < 10 ; i ++) {
165
167
Snapshot snapshot = createSnapshotWithMillis (i , millis , Long .MIN_VALUE );
166
168
localFileIO .tryToWriteAtomic (snapshotManager .snapshotPath (i ), snapshot .toJson ());
167
169
}
168
170
// smaller than the second snapshot
169
171
assertThat (snapshotManager .laterOrEqualWatermark (millis + 999 )).isNull ();
172
+
173
+ // create 5 snapshots, watermarks are not Long.MIN_VALUE.
174
+ for (; i < 15 ; i ++) {
175
+ Snapshot snapshot = createSnapshotWithMillis (i , millis , i * 10 + 1000L );
176
+ localFileIO .tryToWriteAtomic (snapshotManager .snapshotPath (i ), snapshot .toJson ());
177
+ }
178
+
179
+ assertThat (snapshotManager .laterOrEqualWatermark (1100L ))
180
+ .isEqualTo (createSnapshotWithMillis (10 , millis , 1100L ));
181
+ assertThat (snapshotManager .laterOrEqualWatermark (1101L ))
182
+ .isEqualTo (createSnapshotWithMillis (11 , millis , 1110L ));
183
+ assertThat (snapshotManager .laterOrEqualWatermark (99L ))
184
+ .isEqualTo (createSnapshotWithMillis (10 , millis , 1100L ));
185
+ assertThat (snapshotManager .laterOrEqualWatermark (1110L ))
186
+ .isEqualTo (createSnapshotWithMillis (11 , millis , 1110L ));
187
+ assertThat (snapshotManager .laterOrEqualWatermark (1140L ))
188
+ .isEqualTo (createSnapshotWithMillis (14 , millis , 1140L ));
189
+ assertThat (snapshotManager .laterOrEqualWatermark (1141L )).isNull ();
190
+
191
+ // delete snapshots
192
+ for (i = 0 ; i < 15 ; i ++) {
193
+ localFileIO .deleteQuietly (snapshotManager .snapshotPath (i ));
194
+ }
195
+
196
+ // create 15 snapshots, the first 10 watermark are null.
197
+ i = 0 ;
198
+ for (; i < 10 ; i ++) {
199
+ Snapshot snapshot = createSnapshotWithMillis (i , millis , null );
200
+ localFileIO .tryToWriteAtomic (snapshotManager .snapshotPath (i ), snapshot .toJson ());
201
+ }
202
+ for (; i < 15 ; i ++) {
203
+ Snapshot snapshot = createSnapshotWithMillis (i , millis , i * 10 + 1000 );
204
+ localFileIO .tryToWriteAtomic (snapshotManager .snapshotPath (i ), snapshot .toJson ());
205
+ }
206
+ assertThat (snapshotManager .laterOrEqualWatermark (1100L ))
207
+ .isEqualTo (createSnapshotWithMillis (10 , millis , 1100L ));
208
+ assertThat (snapshotManager .laterOrEqualWatermark (1101L ))
209
+ .isEqualTo (createSnapshotWithMillis (11 , millis , 1110L ));
210
+ assertThat (snapshotManager .laterOrEqualWatermark (99L ))
211
+ .isEqualTo (createSnapshotWithMillis (10 , millis , 1100L ));
212
+ assertThat (snapshotManager .laterOrEqualWatermark (1110L ))
213
+ .isEqualTo (createSnapshotWithMillis (11 , millis , 1110L ));
214
+ assertThat (snapshotManager .laterOrEqualWatermark (1140L ))
215
+ .isEqualTo (createSnapshotWithMillis (14 , millis , 1140L ));
216
+ assertThat (snapshotManager .laterOrEqualWatermark (1141L )).isNull ();
217
+
218
+ // delete snapshots
219
+ for (i = 0 ; i < 15 ; i ++) {
220
+ localFileIO .deleteQuietly (snapshotManager .snapshotPath (i ));
221
+ }
222
+
223
+ // create 15 snapshots, all watermark are not null.
224
+ i = 0 ;
225
+ for (; i < 15 ; i ++) {
226
+ Snapshot snapshot = createSnapshotWithMillis (i , millis , i * 10 + 1000 );
227
+ localFileIO .tryToWriteAtomic (snapshotManager .snapshotPath (i ), snapshot .toJson ());
228
+ }
229
+ assertThat (snapshotManager .laterOrEqualWatermark (999L ))
230
+ .isEqualTo (createSnapshotWithMillis (0 , millis , 1000L ));
231
+ assertThat (snapshotManager .laterOrEqualWatermark (1000L ))
232
+ .isEqualTo (createSnapshotWithMillis (0 , millis , 1000L ));
233
+ assertThat (snapshotManager .laterOrEqualWatermark (1001L ))
234
+ .isEqualTo (createSnapshotWithMillis (1 , millis , 1010L ));
235
+ assertThat (snapshotManager .laterOrEqualWatermark (1140L ))
236
+ .isEqualTo (createSnapshotWithMillis (14 , millis , 1140L ));
237
+ assertThat (snapshotManager .laterOrEqualWatermark (1141L )).isNull ();
238
+ }
239
+
240
+ @ Test
241
+ public void testEarlierOrEqualWatermark () throws IOException {
242
+ long millis = Long .MIN_VALUE ;
243
+ FileIO localFileIO = LocalFileIO .create ();
244
+ SnapshotManager snapshotManager =
245
+ new SnapshotManager (localFileIO , new Path (tempDir .toString ()));
246
+ // create 10 snapshots, watermarks are Long.MIN_VALUE.
247
+ long i = 0 ;
248
+ for (; i < 10 ; i ++) {
249
+ Snapshot snapshot = createSnapshotWithMillis (i , millis , Long .MIN_VALUE );
250
+ localFileIO .tryToWriteAtomic (snapshotManager .snapshotPath (i ), snapshot .toJson ());
251
+ }
252
+ // smaller than the second snapshot
253
+ assertThat (snapshotManager .earlierOrEqualWatermark (millis + 999 )).isNull ();
254
+
255
+ // create 5 snapshots, watermarks are not Long.MIN_VALUE.
256
+ for (; i < 15 ; i ++) {
257
+ Snapshot snapshot = createSnapshotWithMillis (i , millis , i * 10 + 1000L );
258
+ localFileIO .tryToWriteAtomic (snapshotManager .snapshotPath (i ), snapshot .toJson ());
259
+ }
260
+
261
+ assertThat (snapshotManager .earlierOrEqualWatermark (1140L ))
262
+ .isEqualTo (createSnapshotWithMillis (14 , millis , 1140L ));
263
+ assertThat (snapshotManager .earlierOrEqualWatermark (1141L ))
264
+ .isEqualTo (createSnapshotWithMillis (14 , millis , 1140L ));
265
+ assertThat (snapshotManager .earlierOrEqualWatermark (1139L ))
266
+ .isEqualTo (createSnapshotWithMillis (13 , millis , 1130L ));
267
+ assertThat (snapshotManager .earlierOrEqualWatermark (1100L ))
268
+ .isEqualTo (createSnapshotWithMillis (10 , millis , 1100L ));
269
+ assertThat (snapshotManager .earlierOrEqualWatermark (1099L ))
270
+ .isEqualTo (createSnapshotWithMillis (9 , millis , Long .MIN_VALUE ));
271
+
272
+ // delete snapshots
273
+ for (i = 0 ; i < 15 ; i ++) {
274
+ localFileIO .deleteQuietly (snapshotManager .snapshotPath (i ));
275
+ }
276
+
277
+ // create 15 snapshots, the first 10 watermark are null.
278
+ i = 0 ;
279
+ for (; i < 10 ; i ++) {
280
+ Snapshot snapshot = createSnapshotWithMillis (i , millis , null );
281
+ localFileIO .tryToWriteAtomic (snapshotManager .snapshotPath (i ), snapshot .toJson ());
282
+ }
283
+ for (; i < 15 ; i ++) {
284
+ Snapshot snapshot = createSnapshotWithMillis (i , millis , i * 10 + 1000L );
285
+ localFileIO .tryToWriteAtomic (snapshotManager .snapshotPath (i ), snapshot .toJson ());
286
+ }
287
+ assertThat (snapshotManager .earlierOrEqualWatermark (1140L ))
288
+ .isEqualTo (createSnapshotWithMillis (14 , millis , 1140L ));
289
+ assertThat (snapshotManager .earlierOrEqualWatermark (1141L ))
290
+ .isEqualTo (createSnapshotWithMillis (14 , millis , 1140L ));
291
+ assertThat (snapshotManager .earlierOrEqualWatermark (1139L ))
292
+ .isEqualTo (createSnapshotWithMillis (13 , millis , 1130L ));
293
+ assertThat (snapshotManager .earlierOrEqualWatermark (1100L ))
294
+ .isEqualTo (createSnapshotWithMillis (10 , millis , 1100L ));
295
+ assertThat (snapshotManager .earlierOrEqualWatermark (1099L )).isNull ();
296
+
297
+ // delete snapshots
298
+ for (i = 0 ; i < 15 ; i ++) {
299
+ localFileIO .deleteQuietly (snapshotManager .snapshotPath (i ));
300
+ }
301
+
302
+ // create 15 snapshots, all watermarks not null.
303
+ i = 0 ;
304
+ for (; i < 15 ; i ++) {
305
+ Snapshot snapshot = createSnapshotWithMillis (i , millis , i * 10 + 1000L );
306
+ localFileIO .tryToWriteAtomic (snapshotManager .snapshotPath (i ), snapshot .toJson ());
307
+ }
308
+ assertThat (snapshotManager .earlierOrEqualWatermark (1140L ))
309
+ .isEqualTo (createSnapshotWithMillis (14 , millis , 1140L ));
310
+ assertThat (snapshotManager .earlierOrEqualWatermark (1141L ))
311
+ .isEqualTo (createSnapshotWithMillis (14 , millis , 1140L ));
312
+ assertThat (snapshotManager .earlierOrEqualWatermark (1139L ))
313
+ .isEqualTo (createSnapshotWithMillis (13 , millis , 1130L ));
314
+ assertThat (snapshotManager .earlierOrEqualWatermark (1000L ))
315
+ .isEqualTo (createSnapshotWithMillis (0 , millis , 1000L ));
316
+ assertThat (snapshotManager .earlierOrEqualWatermark (999L )).isNull ();
170
317
}
171
318
172
319
private Snapshot createSnapshotWithMillis (long id , long millis ) {
@@ -189,7 +336,7 @@ private Snapshot createSnapshotWithMillis(long id, long millis) {
189
336
null );
190
337
}
191
338
192
- private Snapshot createSnapshotWithMillis (long id , long millis , long watermark ) {
339
+ private Snapshot createSnapshotWithMillis (long id , long millis , Long watermark ) {
193
340
return new Snapshot (
194
341
id ,
195
342
0L ,
@@ -410,4 +557,61 @@ public void testCommitChangelogWhenSameChangelogCommitTwice() throws IOException
410
557
snapshotManager .commitChangelog (changelog , id );
411
558
assertDoesNotThrow (() -> snapshotManager .commitChangelog (changelog , id ));
412
559
}
560
+
561
+ @ Test
562
+ public void testBinarySearch () {
563
+ FileIO localFileIO = LocalFileIO .create ();
564
+ SnapshotManager snapshotManager =
565
+ new SnapshotManager (localFileIO , new Path (tempDir .toString ()));
566
+ // findEarliest = true
567
+ Predicate <Long > condition = id -> id >= 5L ;
568
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , true )).isEqualTo (5L );
569
+ condition = id -> id > 5L ;
570
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , true )).isEqualTo (6L );
571
+ condition = id -> id >= 10L ;
572
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , true ))
573
+ .isEqualTo (10L );
574
+ condition = id -> id > 10L ;
575
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , true )).isNull ();
576
+ condition = id -> id >= 11L ;
577
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , true )).isNull ();
578
+ condition = id -> id > 11L ;
579
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , true )).isNull ();
580
+ condition = id -> id >= 1L ;
581
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , true )).isEqualTo (2L );
582
+ condition = id -> id > 1L ;
583
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , true )).isEqualTo (2L );
584
+ condition = id -> id >= 2L ;
585
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , true )).isEqualTo (2L );
586
+ condition = id -> id > 2L ;
587
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , true )).isEqualTo (3L );
588
+ // findEarliest = false
589
+ condition = id -> id <= 5 ;
590
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , false ))
591
+ .isEqualTo (5L );
592
+ condition = id -> id < 5 ;
593
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , false ))
594
+ .isEqualTo (4L );
595
+ condition = id -> id <= 10 ;
596
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , false ))
597
+ .isEqualTo (10L );
598
+ condition = id -> id < 10 ;
599
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , false ))
600
+ .isEqualTo (9L );
601
+ condition = id -> id <= 11 ;
602
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , false ))
603
+ .isEqualTo (10L );
604
+ condition = id -> id < 11 ;
605
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , false ))
606
+ .isEqualTo (10L );
607
+ condition = id -> id <= 1 ;
608
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , false )).isNull ();
609
+ condition = id -> id < 1 ;
610
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , false )).isNull ();
611
+ condition = id -> id <= 2 ;
612
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , false ))
613
+ .isEqualTo (2L );
614
+ condition = id -> id < 2 ;
615
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , false )).isNull ();
616
+ }
413
617
}
0 commit comments