From 1468f8f62d0ac3e5e40de9e1fd93c7665207674c Mon Sep 17 00:00:00 2001 From: Chris Vanderschuere Date: Wed, 13 Jan 2016 13:33:33 -0800 Subject: [PATCH] Converted flush to use single access model --- triton/writer_batch.go | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/triton/writer_batch.go b/triton/writer_batch.go index 8f86ae8..9e75a8b 100644 --- a/triton/writer_batch.go +++ b/triton/writer_batch.go @@ -29,10 +29,11 @@ func NewBatchWriterSize(w Writer, size int, intr time.Duration) *BatchWriter { bufferSize := 10000 bw := &BatchWriter{ - w: w, - buf: make(chan Record, bufferSize), - signal: make(chan struct{}, bufferSize), - errors: make(chan error, 1), + w: w, + buf: make(chan Record, bufferSize), + signal: make(chan struct{}, bufferSize), + flushSignal: make(chan struct{}, 0), // must be blocking write/read + errors: make(chan error, 1), size: size, intr: intr, @@ -60,8 +61,9 @@ type BatchWriter struct { size int intr time.Duration - ticker *time.Ticker - signal chan struct{} + ticker *time.Ticker + signal chan struct{} + flushSignal chan struct{} } func (bw *BatchWriter) writeLoop() { @@ -77,11 +79,17 @@ func (bw *BatchWriter) writeLoop() { } case <-bw.ticker.C: bw.flush() + case _, ok := <-bw.flushSignal: + if ok { + bw.flush() + bw.flushSignal <- struct{}{} + } } } } -// flush writes everything in the current buffer +// flush writes everything in the current buffer. To prevent concurrent flushes, +// only write_loop() is allowed to call this. func (bw *BatchWriter) flush() { numBuffered := len(bw.buf) if numBuffered == 0 { @@ -109,7 +117,8 @@ func (bw *BatchWriter) flush() { // Flush forces all buffered records to be sent. // If there is an error it will have been written to the Errors chan. func (bw *BatchWriter) Flush() { - bw.flush() + bw.flushSignal <- struct{}{} // ask to flush + <-bw.flushSignal // block until finish } // Close prevents future writes and flushes all currently buffered records. @@ -118,6 +127,7 @@ func (bw *BatchWriter) Close() { bw.ticker.Stop() close(bw.signal) bw.Flush() + close(bw.flushSignal) close(bw.errors) } @@ -143,6 +153,6 @@ func (bw *BatchWriter) WriteRecords(rs ...Record) error { for _, r := range rs { bw.buf <- r } - bw.signal <- struct{}{} // sigal that buffer changed + bw.signal <- struct{}{} // signal that buffer changed return nil }