@@ -253,6 +253,7 @@ impl Inner {
253
253
} ) ;
254
254
}
255
255
}
256
+ let total_slots = slots. len ( ) ;
256
257
257
258
let bucket_interval = u64:: from ( bucket_interval) ;
258
259
let num_partitions = u64:: from ( num_partitions) ;
@@ -266,7 +267,22 @@ impl Inner {
266
267
. map_or ( u64:: MAX , |v| v. div_ceil ( bucket_interval) ) ,
267
268
} ;
268
269
269
- let total_slots = slots. len ( ) ;
270
+ // Break down the maximum project cost to a maximum cost per partition.
271
+ let max_partition_project = {
272
+ let ratio_per_partition = match config. partition_by {
273
+ // All buckets in the same timeslot of a project are in the same partition.
274
+ // -> The total maximum allowed ratio is just determined by the amount of timeslots
275
+ FlushBatching :: None | FlushBatching :: Project => u64:: from ( num_time_slots) ,
276
+ // Buckets are evenly distributed across all existing partitions and timeslots.
277
+ _ => total_slots as u64 ,
278
+ } ;
279
+
280
+ config
281
+ . max_project_key_bucket_bytes
282
+ . map ( |c| c. div_ceil ( ratio_per_partition) )
283
+ . unwrap_or ( u64:: MAX )
284
+ } ;
285
+
270
286
Self {
271
287
slots : VecDeque :: from ( slots) ,
272
288
num_partitions,
@@ -276,11 +292,7 @@ impl Inner {
276
292
stats : stats:: Total :: default ( ) ,
277
293
limits : stats:: Limits {
278
294
max_total : config. max_total_bucket_bytes . unwrap_or ( u64:: MAX ) ,
279
- // Break down the maximum project cost to a maximum cost per partition.
280
- max_partition_project : config
281
- . max_project_key_bucket_bytes
282
- . map ( |c| c. div_ceil ( total_slots as u64 ) )
283
- . unwrap_or ( u64:: MAX ) ,
295
+ max_partition_project,
284
296
} ,
285
297
slot_range : slot_diff,
286
298
partition_by : config. partition_by ,
0 commit comments