35
35
import java .util .Set ;
36
36
import java .util .concurrent .ThreadLocalRandom ;
37
37
import java .util .concurrent .atomic .AtomicReference ;
38
+ import java .util .function .Predicate ;
38
39
import java .util .stream .Collectors ;
39
40
40
41
import static org .assertj .core .api .Assertions .assertThat ;
@@ -154,18 +155,164 @@ public void testLaterOrEqualTimeMills() throws IOException {
154
155
}
155
156
156
157
@ Test
157
- public void testlaterOrEqualWatermark () throws IOException {
158
+ public void testLaterOrEqualWatermark () throws IOException {
158
159
long millis = Long .MIN_VALUE ;
159
160
FileIO localFileIO = LocalFileIO .create ();
160
161
SnapshotManager snapshotManager =
161
162
new SnapshotManager (localFileIO , new Path (tempDir .toString ()));
162
- // create 10 snapshots
163
- for (long i = 0 ; i < 10 ; i ++) {
163
+ // create 10 snapshots, watermarks are Long.MIN_VALUE.
164
+ long i = 0 ;
165
+ for (; i < 10 ; i ++) {
164
166
Snapshot snapshot = createSnapshotWithMillis (i , millis , Long .MIN_VALUE );
165
167
localFileIO .tryToWriteAtomic (snapshotManager .snapshotPath (i ), snapshot .toJson ());
166
168
}
167
169
// smaller than the second snapshot
168
170
assertThat (snapshotManager .laterOrEqualWatermark (millis + 999 )).isNull ();
171
+
172
+ // create 5 snapshots, watermarks are not Long.MIN_VALUE.
173
+ for (; i < 15 ; i ++) {
174
+ Snapshot snapshot = createSnapshotWithMillis (i , millis , i * 10 + 1000L );
175
+ localFileIO .tryToWriteAtomic (snapshotManager .snapshotPath (i ), snapshot .toJson ());
176
+ }
177
+
178
+ assertThat (snapshotManager .laterOrEqualWatermark (1100L ))
179
+ .isEqualTo (createSnapshotWithMillis (10 , millis , 1100L ));
180
+ assertThat (snapshotManager .laterOrEqualWatermark (1101L ))
181
+ .isEqualTo (createSnapshotWithMillis (11 , millis , 1110L ));
182
+ assertThat (snapshotManager .laterOrEqualWatermark (99L ))
183
+ .isEqualTo (createSnapshotWithMillis (10 , millis , 1100L ));
184
+ assertThat (snapshotManager .laterOrEqualWatermark (1110L ))
185
+ .isEqualTo (createSnapshotWithMillis (11 , millis , 1110L ));
186
+ assertThat (snapshotManager .laterOrEqualWatermark (1140L ))
187
+ .isEqualTo (createSnapshotWithMillis (14 , millis , 1140L ));
188
+ assertThat (snapshotManager .laterOrEqualWatermark (1141L )).isNull ();
189
+
190
+ // delete snapshots
191
+ for (i = 0 ; i < 15 ; i ++) {
192
+ localFileIO .deleteQuietly (snapshotManager .snapshotPath (i ));
193
+ }
194
+
195
+ // create 15 snapshots, the first 10 watermark are null.
196
+ i = 0 ;
197
+ for (; i < 10 ; i ++) {
198
+ Snapshot snapshot = createSnapshotWithMillis (i , millis , null );
199
+ localFileIO .tryToWriteAtomic (snapshotManager .snapshotPath (i ), snapshot .toJson ());
200
+ }
201
+ for (; i < 15 ; i ++) {
202
+ Snapshot snapshot = createSnapshotWithMillis (i , millis , i * 10 + 1000 );
203
+ localFileIO .tryToWriteAtomic (snapshotManager .snapshotPath (i ), snapshot .toJson ());
204
+ }
205
+ assertThat (snapshotManager .laterOrEqualWatermark (1100L ))
206
+ .isEqualTo (createSnapshotWithMillis (10 , millis , 1100L ));
207
+ assertThat (snapshotManager .laterOrEqualWatermark (1101L ))
208
+ .isEqualTo (createSnapshotWithMillis (11 , millis , 1110L ));
209
+ assertThat (snapshotManager .laterOrEqualWatermark (99L ))
210
+ .isEqualTo (createSnapshotWithMillis (10 , millis , 1100L ));
211
+ assertThat (snapshotManager .laterOrEqualWatermark (1110L ))
212
+ .isEqualTo (createSnapshotWithMillis (11 , millis , 1110L ));
213
+ assertThat (snapshotManager .laterOrEqualWatermark (1140L ))
214
+ .isEqualTo (createSnapshotWithMillis (14 , millis , 1140L ));
215
+ assertThat (snapshotManager .laterOrEqualWatermark (1141L )).isNull ();
216
+
217
+ // delete snapshots
218
+ for (i = 0 ; i < 15 ; i ++) {
219
+ localFileIO .deleteQuietly (snapshotManager .snapshotPath (i ));
220
+ }
221
+
222
+ // create 15 snapshots, all watermark are not null.
223
+ i = 0 ;
224
+ for (; i < 15 ; i ++) {
225
+ Snapshot snapshot = createSnapshotWithMillis (i , millis , i * 10 + 1000 );
226
+ localFileIO .tryToWriteAtomic (snapshotManager .snapshotPath (i ), snapshot .toJson ());
227
+ }
228
+ assertThat (snapshotManager .laterOrEqualWatermark (999L ))
229
+ .isEqualTo (createSnapshotWithMillis (0 , millis , 1000L ));
230
+ assertThat (snapshotManager .laterOrEqualWatermark (1000L ))
231
+ .isEqualTo (createSnapshotWithMillis (0 , millis , 1000L ));
232
+ assertThat (snapshotManager .laterOrEqualWatermark (1001L ))
233
+ .isEqualTo (createSnapshotWithMillis (1 , millis , 1010L ));
234
+ assertThat (snapshotManager .laterOrEqualWatermark (1140L ))
235
+ .isEqualTo (createSnapshotWithMillis (14 , millis , 1140L ));
236
+ assertThat (snapshotManager .laterOrEqualWatermark (1141L )).isNull ();
237
+ }
238
+
239
+ @ Test
240
+ public void testEarlierOrEqualWatermark () throws IOException {
241
+ long millis = Long .MIN_VALUE ;
242
+ FileIO localFileIO = LocalFileIO .create ();
243
+ SnapshotManager snapshotManager =
244
+ new SnapshotManager (localFileIO , new Path (tempDir .toString ()));
245
+ // create 10 snapshots, watermarks are Long.MIN_VALUE.
246
+ long i = 0 ;
247
+ for (; i < 10 ; i ++) {
248
+ Snapshot snapshot = createSnapshotWithMillis (i , millis , Long .MIN_VALUE );
249
+ localFileIO .tryToWriteAtomic (snapshotManager .snapshotPath (i ), snapshot .toJson ());
250
+ }
251
+ // smaller than the second snapshot
252
+ assertThat (snapshotManager .earlierOrEqualWatermark (millis + 999 )).isNull ();
253
+
254
+ // create 5 snapshots, watermarks are not Long.MIN_VALUE.
255
+ for (; i < 15 ; i ++) {
256
+ Snapshot snapshot = createSnapshotWithMillis (i , millis , i * 10 + 1000L );
257
+ localFileIO .tryToWriteAtomic (snapshotManager .snapshotPath (i ), snapshot .toJson ());
258
+ }
259
+
260
+ assertThat (snapshotManager .earlierOrEqualWatermark (1140L ))
261
+ .isEqualTo (createSnapshotWithMillis (14 , millis , 1140L ));
262
+ assertThat (snapshotManager .earlierOrEqualWatermark (1141L ))
263
+ .isEqualTo (createSnapshotWithMillis (14 , millis , 1140L ));
264
+ assertThat (snapshotManager .earlierOrEqualWatermark (1139L ))
265
+ .isEqualTo (createSnapshotWithMillis (13 , millis , 1130L ));
266
+ assertThat (snapshotManager .earlierOrEqualWatermark (1100L ))
267
+ .isEqualTo (createSnapshotWithMillis (10 , millis , 1100L ));
268
+ assertThat (snapshotManager .earlierOrEqualWatermark (1099L ))
269
+ .isEqualTo (createSnapshotWithMillis (9 , millis , Long .MIN_VALUE ));
270
+
271
+ // delete snapshots
272
+ for (i = 0 ; i < 15 ; i ++) {
273
+ localFileIO .deleteQuietly (snapshotManager .snapshotPath (i ));
274
+ }
275
+
276
+ // create 15 snapshots, the first 10 watermark are null.
277
+ i = 0 ;
278
+ for (; i < 10 ; i ++) {
279
+ Snapshot snapshot = createSnapshotWithMillis (i , millis , null );
280
+ localFileIO .tryToWriteAtomic (snapshotManager .snapshotPath (i ), snapshot .toJson ());
281
+ }
282
+ for (; i < 15 ; i ++) {
283
+ Snapshot snapshot = createSnapshotWithMillis (i , millis , i * 10 + 1000L );
284
+ localFileIO .tryToWriteAtomic (snapshotManager .snapshotPath (i ), snapshot .toJson ());
285
+ }
286
+ assertThat (snapshotManager .earlierOrEqualWatermark (1140L ))
287
+ .isEqualTo (createSnapshotWithMillis (14 , millis , 1140L ));
288
+ assertThat (snapshotManager .earlierOrEqualWatermark (1141L ))
289
+ .isEqualTo (createSnapshotWithMillis (14 , millis , 1140L ));
290
+ assertThat (snapshotManager .earlierOrEqualWatermark (1139L ))
291
+ .isEqualTo (createSnapshotWithMillis (13 , millis , 1130L ));
292
+ assertThat (snapshotManager .earlierOrEqualWatermark (1100L ))
293
+ .isEqualTo (createSnapshotWithMillis (10 , millis , 1100L ));
294
+ assertThat (snapshotManager .earlierOrEqualWatermark (1099L )).isNull ();
295
+
296
+ // delete snapshots
297
+ for (i = 0 ; i < 15 ; i ++) {
298
+ localFileIO .deleteQuietly (snapshotManager .snapshotPath (i ));
299
+ }
300
+
301
+ // create 15 snapshots, all watermarks are not null.
302
+ i = 0 ;
303
+ for (; i < 15 ; i ++) {
304
+ Snapshot snapshot = createSnapshotWithMillis (i , millis , i * 10 + 1000L );
305
+ localFileIO .tryToWriteAtomic (snapshotManager .snapshotPath (i ), snapshot .toJson ());
306
+ }
307
+ assertThat (snapshotManager .earlierOrEqualWatermark (1140L ))
308
+ .isEqualTo (createSnapshotWithMillis (14 , millis , 1140L ));
309
+ assertThat (snapshotManager .earlierOrEqualWatermark (1141L ))
310
+ .isEqualTo (createSnapshotWithMillis (14 , millis , 1140L ));
311
+ assertThat (snapshotManager .earlierOrEqualWatermark (1139L ))
312
+ .isEqualTo (createSnapshotWithMillis (13 , millis , 1130L ));
313
+ assertThat (snapshotManager .earlierOrEqualWatermark (1000L ))
314
+ .isEqualTo (createSnapshotWithMillis (0 , millis , 1000L ));
315
+ assertThat (snapshotManager .earlierOrEqualWatermark (999L )).isNull ();
169
316
}
170
317
171
318
private Snapshot createSnapshotWithMillis (long id , long millis ) {
@@ -188,7 +335,7 @@ private Snapshot createSnapshotWithMillis(long id, long millis) {
188
335
null );
189
336
}
190
337
191
- private Snapshot createSnapshotWithMillis (long id , long millis , long watermark ) {
338
+ private Snapshot createSnapshotWithMillis (long id , long millis , Long watermark ) {
192
339
return new Snapshot (
193
340
id ,
194
341
0L ,
@@ -398,4 +545,61 @@ public void testLongLivedChangelog() throws Exception {
398
545
Assertions .assertThat (snapshotManager .latestSnapshotId ()).isEqualTo (10 );
399
546
Assertions .assertThat (snapshotManager .changelog (1 )).isNotNull ();
400
547
}
548
+
549
+ @ Test
550
+ public void testBinarySearch () {
551
+ FileIO localFileIO = LocalFileIO .create ();
552
+ SnapshotManager snapshotManager =
553
+ new SnapshotManager (localFileIO , new Path (tempDir .toString ()));
554
+ // findEarliest = true
555
+ Predicate <Long > condition = id -> id >= 5L ;
556
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , true )).isEqualTo (5L );
557
+ condition = id -> id > 5L ;
558
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , true )).isEqualTo (6L );
559
+ condition = id -> id >= 10L ;
560
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , true ))
561
+ .isEqualTo (10L );
562
+ condition = id -> id > 10L ;
563
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , true )).isNull ();
564
+ condition = id -> id >= 11L ;
565
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , true )).isNull ();
566
+ condition = id -> id > 11L ;
567
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , true )).isNull ();
568
+ condition = id -> id >= 1L ;
569
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , true )).isEqualTo (2L );
570
+ condition = id -> id > 1L ;
571
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , true )).isEqualTo (2L );
572
+ condition = id -> id >= 2L ;
573
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , true )).isEqualTo (2L );
574
+ condition = id -> id > 2L ;
575
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , true )).isEqualTo (3L );
576
+ // findEarliest = false
577
+ condition = id -> id <= 5 ;
578
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , false ))
579
+ .isEqualTo (5L );
580
+ condition = id -> id < 5 ;
581
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , false ))
582
+ .isEqualTo (4L );
583
+ condition = id -> id <= 10 ;
584
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , false ))
585
+ .isEqualTo (10L );
586
+ condition = id -> id < 10 ;
587
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , false ))
588
+ .isEqualTo (9L );
589
+ condition = id -> id <= 11 ;
590
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , false ))
591
+ .isEqualTo (10L );
592
+ condition = id -> id < 11 ;
593
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , false ))
594
+ .isEqualTo (10L );
595
+ condition = id -> id <= 1 ;
596
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , false )).isNull ();
597
+ condition = id -> id < 1 ;
598
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , false )).isNull ();
599
+ condition = id -> id <= 2 ;
600
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , false ))
601
+ .isEqualTo (2L );
602
+ condition = id -> id < 2 ;
603
+ Assertions .assertThat (snapshotManager .binarySearch (2L , 10L , condition , false )).isNull ();
604
+ }
401
605
}
0 commit comments