-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Support upper bound in dynamic bucket mode #4974
base: master
Are you sure you want to change the base?
Conversation
@JingsongLi CI passed, PTAL, thanks! |
@@ -1076,6 +1076,14 @@ public class CoreOptions implements Serializable { | |||
.withDescription( | |||
"Initial buckets for a partition in assigner operator for dynamic bucket mode."); | |||
|
|||
public static final ConfigOption<Integer> DYNAMIC_BUCKET_MAX_BUCKETS_PER_ASSIGNER = | |||
key("dynamic-bucket.max-buckets-per-assigner") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should limit total buckets?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also my initial purpose, but I found that the bucket generation logic in BucketAssigner
is a instance behavior and could be parallelized, every assigner handles their own buckets. On the other hand, This feature is equivalent to the total bucket limit by default (1 parallellism).
Furthuremore, the universal bucket limit may cause some assigners to work less, resulting in CPU and data skew. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think introducing a max bucket is better.
The option should be very easy to understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Add option should be easy to user understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reasonable enough, I have found a way to implement total max bucket feature, thanks for reminding that :)
public static int[] distributeBucketsToAssigners(int maxBuckets, int assigners) {
int[] result = new int[assigners];
int avg = maxBuckets / assigners;
int remainder = maxBuckets % assigners;
for (int i = 0; i < assigners; i++) {
result[i] = avg;
if (remainder > 0) {
result[i]++;
remainder--;
}
}
return result;
}
int bucket = | ||
KeyAndBucketExtractor.bucketWithUpperBound( | ||
bucketInformation.keySet(), hash, maxBucketsNum); | ||
hash2Bucket.put(hash, (short) bucket); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a util function of PartitionIndex#cacheBucketAndGet, then here use?
@@ -1076,6 +1076,14 @@ public class CoreOptions implements Serializable { | |||
.withDescription( | |||
"Initial buckets for a partition in assigner operator for dynamic bucket mode."); | |||
|
|||
public static final ConfigOption<Integer> DYNAMIC_BUCKET_MAX_BUCKETS_PER_ASSIGNER = | |||
key("dynamic-bucket.max-buckets-per-assigner") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Add option should be easy to user understand.
ff4404b
to
c7b7755
Compare
c7b7755
to
df576e2
Compare
Purpose
In dynamic bucket mode, unlimited buckets lead to an unpredicable number of small files, which lead to stability problems. so we should support upper bound in dynamic bucket mode.
Linked issue: close #4942
Tests
HashBucketAssignerTest
SimpleHashBucketAssignerTest
API and Format
Documentation
docs/layouts/shortcodes/generated/core_configuration.html