Skip to content

Commit df576e2

Browse files
committed
refactor to support total buckets upper bound
1 parent ed6ebe2 commit df576e2

File tree

10 files changed

+209
-35
lines changed

10 files changed

+209
-35
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

+2-2
Original file line numberDiff line numberDiff line change
@@ -291,10 +291,10 @@
291291
<td>Initial buckets for a partition in assigner operator for dynamic bucket mode.</td>
292292
</tr>
293293
<tr>
294-
<td><h5>dynamic-bucket.max-buckets-per-assigner</h5></td>
294+
<td><h5>dynamic-bucket.max-buckets</h5></td>
295295
<td style="word-wrap: break-word;">-1</td>
296296
<td>Integer</td>
297-
<td>Max buckets per assigner operator for a partition in dynamic bucket mode, It should either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound).</td>
297+
<td>Max buckets for a partition in dynamic bucket mode, It should either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound).</td>
298298
</tr>
299299
<tr>
300300
<td><h5>dynamic-bucket.target-row-num</h5></td>

paimon-common/src/main/java/org/apache/paimon/CoreOptions.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -1076,12 +1076,12 @@ public class CoreOptions implements Serializable {
10761076
.withDescription(
10771077
"Initial buckets for a partition in assigner operator for dynamic bucket mode.");
10781078

1079-
public static final ConfigOption<Integer> DYNAMIC_BUCKET_MAX_BUCKETS_PER_ASSIGNER =
1080-
key("dynamic-bucket.max-buckets-per-assigner")
1079+
public static final ConfigOption<Integer> DYNAMIC_BUCKET_MAX_BUCKETS =
1080+
key("dynamic-bucket.max-buckets")
10811081
.intType()
10821082
.defaultValue(-1)
10831083
.withDescription(
1084-
"Max buckets per assigner operator for a partition in dynamic bucket mode, It should "
1084+
"Max buckets for a partition in dynamic bucket mode, It should "
10851085
+ "either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound).");
10861086

10871087
public static final ConfigOption<Integer> DYNAMIC_BUCKET_ASSIGNER_PARALLELISM =
@@ -2227,8 +2227,8 @@ public Integer dynamicBucketInitialBuckets() {
22272227
return options.get(DYNAMIC_BUCKET_INITIAL_BUCKETS);
22282228
}
22292229

2230-
public Integer dynamicBucketMaxBucketsPerAssigner() {
2231-
return options.get(DYNAMIC_BUCKET_MAX_BUCKETS_PER_ASSIGNER);
2230+
public Integer dynamicBucketMaxBuckets() {
2231+
return options.get(DYNAMIC_BUCKET_MAX_BUCKETS);
22322232
}
22332233

22342234
public Integer dynamicBucketAssignerParallelism() {

paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java

+48-9
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,13 @@
2424
import org.apache.paimon.utils.Int2ShortHashMap;
2525
import org.apache.paimon.utils.IntIterator;
2626

27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
2730
import java.io.EOFException;
2831
import java.io.IOException;
2932
import java.io.UncheckedIOException;
33+
import java.util.Arrays;
3034
import java.util.HashMap;
3135
import java.util.Iterator;
3236
import java.util.LinkedHashSet;
@@ -39,6 +43,7 @@
3943

4044
/** Bucket Index Per Partition. */
4145
public class PartitionIndex {
46+
private static final Logger LOG = LoggerFactory.getLogger(PartitionIndex.class);
4247

4348
public final Int2ShortHashMap hash2Bucket;
4449

@@ -81,15 +86,9 @@ public int assign(int hash, IntPredicate bucketFilter, int maxBucketsNum) {
8186
Long number = entry.getValue();
8287
if (number < targetBucketRowNumber) {
8388
entry.setValue(number + 1);
84-
return cacheBucketAndGet(hash, bucket);
89+
return cacheBucketAndGet(hash2Bucket, hash, bucket);
8590
} else {
8691
iterator.remove();
87-
if (-1 != maxBucketsNum && totalBucket.size() == maxBucketsNum) {
88-
return cacheBucketAndGet(
89-
hash,
90-
KeyAndBucketExtractor.bucketWithUpperBound(
91-
totalBucket, hash, maxBucketsNum));
92-
}
9392
}
9493
}
9594

@@ -99,7 +98,7 @@ public int assign(int hash, IntPredicate bucketFilter, int maxBucketsNum) {
9998
if (bucketFilter.test(i) && !totalBucket.contains(i)) {
10099
nonFullBucketInformation.put(i, 1L);
101100
totalBucket.add(i);
102-
return cacheBucketAndGet(hash, i);
101+
return cacheBucketAndGet(hash2Bucket, hash, i);
103102
}
104103
}
105104

@@ -110,7 +109,9 @@ public int assign(int hash, IntPredicate bucketFilter, int maxBucketsNum) {
110109
"Too more bucket %s, you should increase target bucket row number %s.",
111110
maxBucket, targetBucketRowNumber));
112111
} else {
112+
// exceed buckets upper bound
113113
return cacheBucketAndGet(
114+
hash2Bucket,
114115
hash,
115116
KeyAndBucketExtractor.bucketWithUpperBound(totalBucket, hash, maxBucketsNum));
116117
}
@@ -149,8 +150,46 @@ public static PartitionIndex loadIndex(
149150
return new PartitionIndex(mapBuilder.build(), buckets, targetBucketRowNumber);
150151
}
151152

152-
private int cacheBucketAndGet(int hash, int bucket) {
153+
public static int cacheBucketAndGet(Int2ShortHashMap hash2Bucket, int hash, int bucket) {
153154
hash2Bucket.put(hash, (short) bucket);
154155
return bucket;
155156
}
157+
158+
public static int[] getMaxBucketsPerAssigner(int maxBuckets, int assigners) {
159+
int[] maxBucketsArr = new int[assigners];
160+
if (-1 == maxBuckets) {
161+
Arrays.fill(maxBucketsArr, -1);
162+
return maxBucketsArr;
163+
}
164+
if (0 >= maxBuckets) {
165+
throw new IllegalArgumentException(
166+
"Max-buckets should either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound).");
167+
}
168+
int avg = maxBuckets / assigners;
169+
int remainder = maxBuckets % assigners;
170+
for (int i = 0; i < assigners; i++) {
171+
maxBucketsArr[i] = avg;
172+
if (remainder > 0) {
173+
maxBucketsArr[i]++;
174+
remainder--;
175+
}
176+
}
177+
LOG.info(
178+
"After distributing max-buckets {} to {} assigners evenly, maxBuckets layout: {}.",
179+
maxBuckets,
180+
assigners,
181+
Arrays.toString(maxBucketsArr));
182+
return maxBucketsArr;
183+
}
184+
185+
public static int getSpecifiedMaxBuckets(int[] maxBucketsArr, int assignerId) {
186+
int length = maxBucketsArr.length;
187+
if (length == 0) {
188+
throw new IllegalStateException("maxBuckets layout should exists!");
189+
} else if (assignerId < length) {
190+
return maxBucketsArr[assignerId];
191+
} else {
192+
return -1 == maxBucketsArr[0] ? -1 : 0;
193+
}
194+
}
156195
}

paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import java.util.Map;
2929
import java.util.Set;
3030

31+
import static org.apache.paimon.index.PartitionIndex.cacheBucketAndGet;
32+
3133
/** When we need to overwrite the table, we should use this to avoid loading index. */
3234
public class SimpleHashBucketAssigner implements BucketAssigner {
3335

@@ -89,11 +91,11 @@ public int assign(int hash) {
8991
Long num = bucketInformation.computeIfAbsent(currentBucket, i -> 0L);
9092
if (num >= targetBucketRowNumber) {
9193
if (-1 != maxBucketsNum && bucketInformation.size() >= maxBucketsNum) {
92-
int bucket =
94+
return cacheBucketAndGet(
95+
hash2Bucket,
96+
hash,
9397
KeyAndBucketExtractor.bucketWithUpperBound(
94-
bucketInformation.keySet(), hash, maxBucketsNum);
95-
hash2Bucket.put(hash, (short) bucket);
96-
return bucket;
98+
bucketInformation.keySet(), hash, maxBucketsNum));
9799
} else {
98100
loadNewBucket();
99101
}

paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import java.util.Set;
2828
import java.util.concurrent.ThreadLocalRandom;
2929

30-
import static org.apache.paimon.CoreOptions.DYNAMIC_BUCKET_MAX_BUCKETS_PER_ASSIGNER;
30+
import static org.apache.paimon.CoreOptions.DYNAMIC_BUCKET_MAX_BUCKETS;
3131
import static org.apache.paimon.utils.Preconditions.checkArgument;
3232

3333
/**
@@ -66,7 +66,7 @@ static int bucketWithUpperBound(Set<Integer> bucketsSet, int hashcode, int maxBu
6666
"Assign record (hashcode '{}') to new bucket exceed upper bound '{}' defined in '{}', Stop creating new buckets.",
6767
hashcode,
6868
maxBucketsNum,
69-
DYNAMIC_BUCKET_MAX_BUCKETS_PER_ASSIGNER.key());
69+
DYNAMIC_BUCKET_MAX_BUCKETS.key());
7070
return bucketsSet.stream()
7171
.skip(ThreadLocalRandom.current().nextInt(maxBucketsNum))
7272
.findFirst()

paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java

+76-3
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import java.util.Arrays;
3838
import java.util.Collections;
3939

40+
import static org.apache.paimon.index.PartitionIndex.getMaxBucketsPerAssigner;
41+
import static org.apache.paimon.index.PartitionIndex.getSpecifiedMaxBuckets;
4042
import static org.apache.paimon.io.DataFileTestUtils.row;
4143
import static org.assertj.core.api.Assertions.assertThat;
4244
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -136,7 +138,7 @@ public void testAssignWithUpperBound() {
136138
assertThatThrownBy(() -> assigner.assign(row(1), 1))
137139
.hasMessageContaining("This is a bug, record assign id");
138140

139-
// exceed upper bound
141+
// exceed buckets upper bound
140142
// partition 1
141143
int hash = 18;
142144
for (int i = 0; i < 200; i++) {
@@ -151,7 +153,78 @@ public void testAssignWithUpperBound() {
151153
}
152154
}
153155

154-
@ParameterizedTest(name = "maxBucket: {0}")
156+
@Test
157+
public void testMultiAssigners() {
158+
int[] maxBucketsArr = getMaxBucketsPerAssigner(4, 2);
159+
Assertions.assertThat(maxBucketsArr).isEqualTo(new int[] {2, 2});
160+
161+
maxBucketsArr = getMaxBucketsPerAssigner(8, 3);
162+
Assertions.assertThat(maxBucketsArr).isEqualTo(new int[] {3, 3, 2});
163+
164+
maxBucketsArr = getMaxBucketsPerAssigner(3, 2);
165+
Assertions.assertThat(maxBucketsArr).isEqualTo(new int[] {2, 1});
166+
167+
Assertions.assertThat(getSpecifiedMaxBuckets(maxBucketsArr, 0)).isEqualTo(2);
168+
Assertions.assertThat(getSpecifiedMaxBuckets(maxBucketsArr, 1)).isEqualTo(1);
169+
Assertions.assertThat(getSpecifiedMaxBuckets(maxBucketsArr, 2)).isEqualTo(0);
170+
171+
maxBucketsArr = getMaxBucketsPerAssigner(-1, 2);
172+
Assertions.assertThat(maxBucketsArr).isEqualTo(new int[] {-1, -1});
173+
174+
Assertions.assertThat(getSpecifiedMaxBuckets(maxBucketsArr, 0)).isEqualTo(-1);
175+
Assertions.assertThat(getSpecifiedMaxBuckets(maxBucketsArr, 1)).isEqualTo(-1);
176+
Assertions.assertThat(getSpecifiedMaxBuckets(maxBucketsArr, 2)).isEqualTo(-1);
177+
178+
assertThatThrownBy(() -> getMaxBucketsPerAssigner(-10, 2))
179+
.hasMessageContaining(
180+
"Max-buckets should either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound).");
181+
}
182+
183+
@Test
184+
public void testAssignWithUpperBoundMultiAssigners() {
185+
int[] maxBucketsArr = getMaxBucketsPerAssigner(3, 2);
186+
Assertions.assertThat(maxBucketsArr).isEqualTo(new int[] {2, 1});
187+
188+
HashBucketAssigner assigner0 = createAssigner(2, 2, 0, maxBucketsArr[0]);
189+
HashBucketAssigner assigner1 = createAssigner(2, 2, 1, maxBucketsArr[1]);
190+
191+
// assigner0: assign
192+
assertThat(assigner0.assign(row(1), 0)).isEqualTo(0);
193+
assertThat(assigner0.assign(row(1), 2)).isEqualTo(0);
194+
assertThat(assigner0.assign(row(1), 4)).isEqualTo(0);
195+
assertThat(assigner0.assign(row(1), 6)).isEqualTo(0);
196+
assertThat(assigner0.assign(row(1), 8)).isEqualTo(0);
197+
198+
// assigner0: full
199+
assertThat(assigner0.assign(row(1), 10)).isEqualTo(2);
200+
assertThat(assigner0.assign(row(1), 12)).isEqualTo(2);
201+
assertThat(assigner0.assign(row(1), 14)).isEqualTo(2);
202+
assertThat(assigner0.assign(row(1), 16)).isEqualTo(2);
203+
assertThat(assigner0.assign(row(1), 18)).isEqualTo(2);
204+
205+
// assigner0: exceed buckets upper bound
206+
int hash = 18;
207+
for (int i = 0; i < 200; i++) {
208+
int bucket = assigner0.assign(row(2), hash += 2);
209+
Assertions.assertThat(bucket).isIn(0, 2);
210+
}
211+
212+
// assigner1: assign
213+
assertThat(assigner1.assign(row(1), 1)).isEqualTo(1);
214+
assertThat(assigner1.assign(row(1), 3)).isEqualTo(1);
215+
assertThat(assigner1.assign(row(1), 5)).isEqualTo(1);
216+
assertThat(assigner1.assign(row(1), 7)).isEqualTo(1);
217+
assertThat(assigner1.assign(row(1), 9)).isEqualTo(1);
218+
219+
// assigner1: exceed buckets upper bound
220+
hash = 9;
221+
for (int i = 0; i < 200; i++) {
222+
int bucket = assigner1.assign(row(2), hash += 2);
223+
Assertions.assertThat(bucket).isIn(1);
224+
}
225+
}
226+
227+
@ParameterizedTest(name = "maxBuckets: {0}")
155228
@ValueSource(ints = {-1, 1, 2})
156229
public void testPartitionCopy(int maxBucketsNum) {
157230
HashBucketAssigner assigner = createAssigner(1, 1, 0, maxBucketsNum);
@@ -229,7 +302,7 @@ public void testAssignRestoreWithUpperBound() {
229302
assertThat(assigner0.assign(row(1), 11)).isEqualTo(0);
230303
assertThat(assigner0.assign(row(1), 14)).isEqualTo(0);
231304
assertThat(assigner2.assign(row(1), 16)).isEqualTo(2);
232-
// exceed buckets limits
305+
// exceed buckets upper bound
233306
assertThat(assigner0.assign(row(1), 17)).isEqualTo(0);
234307
}
235308

paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java

+42-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.junit.jupiter.params.ParameterizedTest;
2626
import org.junit.jupiter.params.provider.ValueSource;
2727

28+
import static org.apache.paimon.index.PartitionIndex.getMaxBucketsPerAssigner;
2829
import static org.apache.paimon.io.DataFileTestUtils.row;
2930
import static org.assertj.core.api.Assertions.assertThat;
3031

@@ -83,7 +84,46 @@ public void testAssignWithUpperBound() {
8384
}
8485
}
8586

86-
@ParameterizedTest(name = "maxBucket: {0}")
87+
@Test
88+
public void testAssignWithUpperBoundMultiAssigners() {
89+
int[] maxBucketsArr = getMaxBucketsPerAssigner(3, 2);
90+
Assertions.assertThat(maxBucketsArr).isEqualTo(new int[] {2, 1});
91+
92+
SimpleHashBucketAssigner simpleHashBucketAssigner0 =
93+
new SimpleHashBucketAssigner(2, 0, 100, maxBucketsArr[0]);
94+
SimpleHashBucketAssigner simpleHashBucketAssigner1 =
95+
new SimpleHashBucketAssigner(2, 1, 100, maxBucketsArr[1]);
96+
97+
BinaryRow binaryRow = BinaryRow.EMPTY_ROW;
98+
int hash = 0;
99+
100+
for (int i = 0; i < 100; i++) {
101+
int bucket = simpleHashBucketAssigner0.assign(binaryRow, hash++);
102+
Assertions.assertThat(bucket).isEqualTo(0);
103+
}
104+
105+
for (int i = 0; i < 100; i++) {
106+
int bucket = simpleHashBucketAssigner1.assign(binaryRow, hash++);
107+
Assertions.assertThat(bucket).isEqualTo(1);
108+
}
109+
110+
for (int i = 0; i < 100; i++) {
111+
int bucket = simpleHashBucketAssigner0.assign(binaryRow, hash++);
112+
Assertions.assertThat(bucket).isEqualTo(2);
113+
}
114+
115+
// exceed upper bound
116+
for (int i = 0; i < 200; i++) {
117+
int bucket = simpleHashBucketAssigner0.assign(binaryRow, hash++);
118+
Assertions.assertThat(bucket).isIn(0, 2);
119+
}
120+
for (int i = 0; i < 200; i++) {
121+
int bucket = simpleHashBucketAssigner1.assign(binaryRow, hash++);
122+
Assertions.assertThat(bucket).isIn(1);
123+
}
124+
}
125+
126+
@ParameterizedTest(name = "maxBuckets: {0}")
87127
@ValueSource(ints = {-1, 1, 2})
88128
public void testAssignWithSameHash(int maxBucketsNum) {
89129
SimpleHashBucketAssigner simpleHashBucketAssigner =
@@ -105,7 +145,7 @@ public void testAssignWithSameHash(int maxBucketsNum) {
105145
}
106146
}
107147

108-
@ParameterizedTest(name = "maxBucket: {0}")
148+
@ParameterizedTest(name = "maxBuckets: {0}")
109149
@ValueSource(ints = {-1, 1, 2})
110150
public void testPartitionCopy(int maxBucketsNum) {
111151
SimpleHashBucketAssigner assigner = new SimpleHashBucketAssigner(1, 0, 5, maxBucketsNum);

0 commit comments

Comments
 (0)