batch
is a library to make concurrent work batcheable and reliable.
Each worker either has its work committed or gets an error.
Hope is not a strategy. (from Google SRE book)
No more batch operations that add its data to a batch and go away hoping it would be committed.
This is all without timeouts, additional goroutines, allocations, and channels.
- Each worker adds its work to a shared batch.
- If there are no more workers ready to commit the last one runs commit, the others wait.
- Every worker in the batch gets the same result and error.
// General pattern is
// Queue.In -> Enter -> defer Exit -> Commit/Cancel/return/panic
var sum int
bc := batch.Coordinator[int]{
// Required
Commit: func(ctx context.Context) (int, error) {
// commit sum
return sum, err
},
}
for j := 0; j < N; j++ {
go func(j int) {
ctx := context.WithValue(ctx, workerID{}, j) // can be obtained in Coordinator.Commit
bc.Queue().In() // let others know we are going to join
data := 1 // prepare data
idx := bc.Enter(true)
defer bc.Exit()
if idx == 0 { // we are first in the batch, reset it
sum = 0
}
sum += data // add data to the batch
res, err := bc.Commit(ctx)
if err != nil { // works the same as we had independent commit in each goroutine
_ = err
}
// batching is transparent for worker
_ = res
}(j)
}
See the all available options in the doc.
Batch is error and panic proof which means the user code can return error or panic in any place, but as soon as all the workers left the batch its state is reset. But not the external state, it's callers responsibility to keep it consistent.