diff --git a/pubsub/batcher/batcher.go b/pubsub/batcher/batcher.go index de53b68f52..57e53878cf 100644 --- a/pubsub/batcher/batcher.go +++ b/pubsub/batcher/batcher.go @@ -212,7 +212,7 @@ func (b *Batcher) AddNoWait(item interface{}) <-chan error { b.handleBatch(batch) } - if batch == nil && len(b.pending) > 0 { + if batch == nil && len(b.pending) > 0 && b.opts.BatchTimeout > 0 { // If the batch size timeout is zero, this is one of the first items to // be added to the batch under the minimum batch size. Record when this // happens so that .nextBatch() can grab the batch on timeout. @@ -225,6 +225,7 @@ func (b *Batcher) AddNoWait(item interface{}) <-chan error { if atomic.CompareAndSwapInt32(&b.batchTimeoutRunning, 0, 1) { go func() { <-time.After(b.opts.BatchTimeout) + b.batchTimeoutRunning = 0 batch = b.nextBatch() if batch != nil { b.handleBatch(batch) @@ -256,7 +257,7 @@ func (b *Batcher) handleBatch(batch []waiter) { // b.mu must be held. func (b *Batcher) nextBatch() []waiter { // if there's a min batch size, only skip if we haven't yet waited for the batch timeout - if len(b.pending) < b.opts.MinBatchSize && time.Since(b.batchSizeTimeout) < b.opts.BatchTimeout { + if len(b.pending) < b.opts.MinBatchSize && (b.opts.BatchTimeout == 0 || time.Since(b.batchSizeTimeout) < b.opts.BatchTimeout) { return nil }