Skip to content

Commit

Permalink
Merge pull request #112 from gabin/buffer-pool
Browse files Browse the repository at this point in the history
Use a new buffer to avoid race conditions
  • Loading branch information
Emanuele Palazzetti authored Sep 22, 2017
2 parents 8227589 + d5bba0a commit 9234800
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 30 deletions.
26 changes: 26 additions & 0 deletions tracer/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tracer
import (
"bytes"
"encoding/json"
"sync"

"github.com/ugorji/go/codec"
)
Expand All @@ -14,12 +15,15 @@ type Encoder interface {
EncodeServices(services map[string]Service) error
Read(p []byte) (int, error)
ContentType() string
Buffer() *bytes.Buffer
SetBuffer(*bytes.Buffer)
}

var mh codec.MsgpackHandle

// msgpackEncoder encodes a list of traces in Msgpack format
type msgpackEncoder struct {
sync.Mutex
buffer *bytes.Buffer
encoder *codec.Encoder
contentType string
Expand Down Expand Up @@ -51,6 +55,8 @@ func (e *msgpackEncoder) EncodeServices(services map[string]Service) error {

// Read values from the internal buffer
func (e *msgpackEncoder) Read(p []byte) (int, error) {
e.Lock()
defer e.Unlock()
return e.buffer.Read(p)
}

Expand All @@ -59,6 +65,18 @@ func (e *msgpackEncoder) ContentType() string {
return e.contentType
}

func (e *msgpackEncoder) Buffer() *bytes.Buffer {
e.Lock()
defer e.Unlock()
return e.buffer
}

func (e *msgpackEncoder) SetBuffer(b *bytes.Buffer) {
e.Lock()
defer e.Unlock()
e.buffer = b
}

// jsonEncoder encodes a list of traces in JSON format
type jsonEncoder struct {
buffer *bytes.Buffer
Expand Down Expand Up @@ -101,6 +119,14 @@ func (e *jsonEncoder) ContentType() string {
return e.contentType
}

func (e *jsonEncoder) Buffer() *bytes.Buffer {
return e.buffer
}

func (e *jsonEncoder) SetBuffer(b *bytes.Buffer) {
e.buffer = b
}

const (
JSON_ENCODER = iota
MSGPACK_ENCODER
Expand Down
60 changes: 30 additions & 30 deletions tracer/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"strconv"
Expand Down Expand Up @@ -97,14 +95,27 @@ func (t *httpTransport) SendTraces(traces [][]*Span) (*http.Response, error) {

// borrow an encoder
encoder := t.pool.Borrow()
defer t.pool.Return(encoder)
encoderBuffer := encoder.Buffer()
defer func() {
// We set the encoder's buffer to the old one before returning the encoder to the pool
encoder.SetBuffer(encoderBuffer)
t.pool.Return(encoder)
}()

// encode the spans and return the error if any
err := encoder.EncodeTraces(traces)
if err != nil {
return nil, err
}

// When we send the encoder as the request body, the persistConn.writeLoop() goroutine
// can theoretically read the underlying buffer whereas the encoder has been returned to the pool.
// This can lead to a race condition and make the app panicking.
// That's why we create a new buffer here, though we use the same slice of bytes to avoid allocating new memory.
// It's fine here because the two functions that can happen at the same time (bytes.Reset and bytes.Read),
// doesn't modify the underlying data.
encoder.SetBuffer(bytes.NewBuffer(encoderBuffer.Bytes()))

// prepare the client and send the payload
req, _ := http.NewRequest("POST", t.traceURL, encoder)
for header, value := range t.headers {
Expand All @@ -117,19 +128,7 @@ func (t *httpTransport) SendTraces(traces [][]*Span) (*http.Response, error) {
if err != nil {
return &http.Response{StatusCode: 0}, err
}
defer func() {
// The default HTTP client's Transport does not
// attempt to reuse HTTP/1.0 or HTTP/1.1 TCP connections
// ("keep-alive") unless the Body is read to completion and is
// closed.
// Buffer the response body so the caller doesn't need to worry about
// reading and closing the response. This isn't very expensive because
// the responses from the Agent are always short.
var buf bytes.Buffer
io.Copy(&buf, response.Body)
response.Body.Close()
response.Body = ioutil.NopCloser(&buf)
}()
defer response.Body.Close()

// if we got a 404 we should downgrade the API to a stable version (at most once)
if (response.StatusCode == 404 || response.StatusCode == 415) && !t.compatibilityMode {
Expand All @@ -152,12 +151,25 @@ func (t *httpTransport) SendServices(services map[string]Service) (*http.Respons

// Encode the service table
encoder := t.pool.Borrow()
defer t.pool.Return(encoder)
encoderBuffer := encoder.Buffer()
defer func() {
// We set the encoder's buffer to the old one before returning the encoder to the pool
encoder.SetBuffer(encoderBuffer)
t.pool.Return(encoder)
}()

if err := encoder.EncodeServices(services); err != nil {
return nil, err
}

// When we send the encoder as the request body, the persistConn.writeLoop() goroutine
// can theoretically read the underlying buffer whereas the encoder has been returned to the pool.
// This can lead to a race condition and make the app panicking.
// That's why we create a new buffer here, though we use the same slice of bytes to avoid allocating new memory.
// It's fine here because the two functions that can happen at the same time are bytes.Reset and bytes.Read,
// and they doesn't modify the underlying data.
encoder.SetBuffer(bytes.NewBuffer(encoderBuffer.Bytes()))

// Send it
req, err := http.NewRequest("POST", t.serviceURL, encoder)
if err != nil {
Expand All @@ -171,19 +183,7 @@ func (t *httpTransport) SendServices(services map[string]Service) (*http.Respons
if err != nil {
return &http.Response{StatusCode: 0}, err
}
defer func() {
// The default HTTP client's Transport does not
// attempt to reuse HTTP/1.0 or HTTP/1.1 TCP connections
// ("keep-alive") unless the Body is read to completion and is
// closed.
// Buffer the response body so the caller doesn't need to worry about
// reading and closing the response. This isn't very expensive because
// the responses from the Agent are always short.
var buf bytes.Buffer
io.Copy(&buf, response.Body)
response.Body.Close()
response.Body = ioutil.NopCloser(&buf)
}()
defer response.Body.Close()

// Downgrade if necessary
if (response.StatusCode == 404 || response.StatusCode == 415) && !t.compatibilityMode {
Expand Down

0 comments on commit 9234800

Please sign in to comment.