Skip to content

Commit

Permalink
Allow for nil min time, reset batch timeout atomic
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyhb committed Feb 27, 2024
1 parent 7a77e0b commit e6a3015
Showing 1 changed file with 21 additions and 18 deletions.
39 changes: 21 additions & 18 deletions pubsub/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (b *Batcher) AddNoWait(item interface{}) <-chan error {
b.handleBatch(batch)
}

if batch == nil && len(b.pending) > 0 {
if batch == nil && len(b.pending) > 0 && b.opts.BatchTimeout > 0 {
// If the batch size timeout is zero, this is one of the first items to
// be added to the batch under the minimum batch size. Record when this
// happens so that .nextBatch() can grab the batch on timeout.
Expand All @@ -225,6 +225,7 @@ func (b *Batcher) AddNoWait(item interface{}) <-chan error {
if atomic.CompareAndSwapInt32(&b.batchTimeoutRunning, 0, 1) {
go func() {
<-time.After(b.opts.BatchTimeout)
b.batchTimeoutRunning = 0
batch = b.nextBatch()
if batch != nil {
b.handleBatch(batch)
Expand Down Expand Up @@ -255,23 +256,8 @@ func (b *Batcher) handleBatch(batch []waiter) {
// It returns nil if there's no batch ready for processing.
// b.mu must be held.
func (b *Batcher) nextBatch() []waiter {
if len(b.pending) < b.opts.MinBatchSize {
// We handle minimum batch sizes depending on specific
// situations.
if time.Since(b.batchSizeTimeout) < b.opts.BatchTimeout {
// If we're within the max batch lifetime, respect minimum batch
// sizes and return nil.
return nil
}
if b.shutdown == false {
// If we're not shutting down, respect minimums. If we're
// shutting down, though, we ignore minimums to flush the
// entire batch.
return nil
}
// At this point, either we're shutting down or we've we've waited
// too long for the minimum size to be met. We're going to proceed
// with flushing the batch.
if len(b.pending) < b.opts.MinBatchSize && b.respectMinBatchSize() {
return nil
}

if len(b.pending) < b.opts.MinBatchSize {
Expand Down Expand Up @@ -307,6 +293,23 @@ func (b *Batcher) nextBatch() []waiter {
return batch
}

func (b *Batcher) respectMinBatchSize() bool {
// We handle minimum batch sizes depending on specific
// situations.
if b.shutdown {
// If we're shutting down, do not respect minimums. This takes priority.
return false
}
if b.opts.BatchTimeout > 0 && time.Since(b.batchSizeTimeout) >= b.opts.BatchTimeout {
// If we have a maximum wait before sending batches below the minimum, and we've
// waited longer than that period, do not respect minimum batches and send!
return false
}
// At this point, either we're not shutting down and we're not forcing a batch
// due to timeouts. Respect the batch size.
return true
}

func (b *Batcher) callHandler(batch []waiter) {
for batch != nil {

Expand Down

0 comments on commit e6a3015

Please sign in to comment.