Skip to content

Commit

Permalink
Make request timeout and TLS properties configurable. (#122)
Browse files Browse the repository at this point in the history
Co-authored-by: Jeanette Booher <[email protected]>
Co-authored-by: Kyle Tolliver <[email protected]>
Co-authored-by: Luke Winikates <[email protected]>
Co-authored-by: Devon Warshaw <[email protected]>
Co-authored-by: Ernst Riemer <[email protected]>
Co-authored-by: Jesse Pye <[email protected]>
  • Loading branch information
7 people authored Jan 24, 2023
1 parent 373ef2f commit 45f78be
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 10 deletions.
15 changes: 12 additions & 3 deletions internal/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package internal
import (
"bytes"
"compress/gzip"
"crypto/tls"
"io"
"io/ioutil"
"net/http"
Expand All @@ -17,15 +18,23 @@ type reporter struct {
client *http.Client
}

// NewReporter create a metrics Reporter
func NewReporter(server string, token string) Reporter {
// NewReporter creates a metrics Reporter
func NewReporter(server string, token string, client *http.Client) Reporter {
return &reporter{
serverURL: server,
token: token,
client: &http.Client{Timeout: time.Second * 10},
client: client,
}
}

func NewClient(timeout time.Duration, tlsConfig *tls.Config) *http.Client {
if tlsConfig == nil {
return &http.Client{Timeout: timeout}
}
transport := &http.Transport{TLSClientConfig: tlsConfig}
return &http.Client{Timeout: timeout, Transport: transport}
}

// Report creates and sends a POST to the reportEndpoint with the given pointLines
func (reporter reporter) Report(format string, pointLines string) (*http.Response, error) {
if format == "" || pointLines == "" {
Expand Down
31 changes: 30 additions & 1 deletion internal/reporter_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,44 @@
package internal

import (
"crypto/tls"
"crypto/x509"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"net/http"
"testing"
"time"
)

func TestBuildRequest(t *testing.T) {
var r *reporter
r = NewReporter("http://localhost:8010/wavefront", "").(*reporter)
r = NewReporter("http://localhost:8010/wavefront", "", &http.Client{}).(*reporter)
request, err := r.buildRequest("wavefront", nil)
require.NoError(t, err)
assert.Equal(t, "http://localhost:8010/wavefront/report?f=wavefront", request.URL.String())
}

func TestNewClientWithNilTLSConfig(t *testing.T) {
client := NewClient(10*time.Second, nil)
assert.Equal(t, nil, client.Transport)
}

func TestNewClientWithCustomTLSConfig(t *testing.T) {
caCertPool := x509.NewCertPool()
fakeCert := []byte("Not a real cert")
caCertPool.AppendCertsFromPEM(fakeCert)

tlsConfig := &tls.Config{
RootCAs: caCertPool,
}

emptyTLSConfig := &tls.Config{}

transport := &http.Transport{TLSClientConfig: tlsConfig}
transportWithEmptyTLSConfig := &http.Transport{TLSClientConfig: emptyTLSConfig}

client := NewClient(10*time.Second, tlsConfig)
assert.Equal(t, transport, client.Transport)
assert.NotEqual(t, transportWithEmptyTLSConfig, client.Transport)

}
27 changes: 25 additions & 2 deletions senders/client_factory.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package senders

import (
"crypto/tls"
"fmt"
"net/url"
"os"
"strconv"
"strings"
"time"

"github.com/wavefronthq/wavefront-sdk-go/internal"
"github.com/wavefronthq/wavefront-sdk-go/version"
Expand All @@ -17,6 +19,7 @@ const (
defaultBatchSize = 10000
defaultBufferSize = 50000
defaultFlushInterval = 1
defaultTimeout = 10 * time.Second
)

// Option Wavefront client configuration options
Expand Down Expand Up @@ -48,6 +51,10 @@ type configuration struct {
FlushIntervalSeconds int
SDKMetricsTags map[string]string
Path string

Timeout time.Duration

TLSConfig *tls.Config
}

func (c *configuration) Direct() bool {
Expand Down Expand Up @@ -85,6 +92,7 @@ func CreateConfig(wfURL string, setters ...Option) (*configuration, error) {
MaxBufferSize: defaultBufferSize,
FlushIntervalSeconds: defaultFlushInterval,
SDKMetricsTags: map[string]string{},
Timeout: defaultTimeout,
}

u, err := url.Parse(wfURL)
Expand Down Expand Up @@ -133,8 +141,9 @@ func CreateConfig(wfURL string, setters ...Option) (*configuration, error) {

// newWavefrontClient creates a Wavefront sender
func newWavefrontClient(cfg *configuration) (Sender, error) {
metricsReporter := internal.NewReporter(cfg.MetricsURL(), cfg.Token)
tracesReporter := internal.NewReporter(cfg.TracesURL(), cfg.Token)
client := internal.NewClient(cfg.Timeout, cfg.TLSConfig)
metricsReporter := internal.NewReporter(fmt.Sprintf("%s:%d", cfg.Server, cfg.MetricsPort), cfg.Token, client)
tracesReporter := internal.NewReporter(fmt.Sprintf("%s:%d", cfg.Server, cfg.TracesPort), cfg.Token, client)

sender := &wavefrontSender{
defaultSource: internal.GetHostname("wavefront_direct_sender"),
Expand Down Expand Up @@ -230,6 +239,20 @@ func TracesPort(port int) Option {
}
}

// Timeout sets the HTTP timeout (in seconds). Defaults to 10 seconds.
func Timeout(timeout time.Duration) Option {
return func(cfg *configuration) {
cfg.Timeout = timeout
}
}

func TLSConfigOptions(tlsCfg *tls.Config) Option {
tlsCfgCopy := tlsCfg.Clone()
return func(cfg *configuration) {
cfg.TLSConfig = tlsCfgCopy
}
}

// SDKMetricsTags adds the additional tags provided in tags to all internal
// metrics this library reports. Clients can use multiple SDKMetricsTags
// calls when creating a sender. In that case, the sender sends all the
Expand Down
25 changes: 25 additions & 0 deletions senders/client_factory_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package senders_test

import (
"crypto/tls"
"crypto/x509"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -98,6 +101,8 @@ func TestDefaults(t *testing.T) {
assert.Equal(t, 50000, cfg.MaxBufferSize)
assert.Equal(t, 2878, cfg.MetricsPort)
assert.Equal(t, 30001, cfg.TracesPort)
assert.Equal(t, 10*time.Second, cfg.Timeout)
assert.Equal(t, (*tls.Config)(nil), cfg.TLSConfig)
}

func TestBatchSize(t *testing.T) {
Expand Down Expand Up @@ -143,6 +148,26 @@ func TestSDKMetricsTags(t *testing.T) {
assert.Equal(t, "bar1", cfg.SDKMetricsTags["foo1"])
}

func TestTimeout(t *testing.T) {
cfg, err := senders.CreateConfig("https://localhost", senders.Timeout(60*time.Second))
require.NoError(t, err)

assert.Equal(t, 60*time.Second, cfg.Timeout)
}

func TestTLSConfigOptions(t *testing.T) {
caCertPool := x509.NewCertPool()
fakeCert := []byte("Not a real cert")
caCertPool.AppendCertsFromPEM(fakeCert)

tlsConfig := tls.Config{
RootCAs: caCertPool,
}
cfg, err := senders.CreateConfig("https://localhost", senders.TLSConfigOptions(&tlsConfig))
require.NoError(t, err)
assert.Equal(t, caCertPool, cfg.TLSConfig.RootCAs)
}

func TestSDKMetricsTags_Immutability(t *testing.T) {
map1 := map[string]string{"foo": "bar"}
map2 := map[string]string{"baz": "none"}
Expand Down
11 changes: 7 additions & 4 deletions senders/example_newsender_options_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package senders_test

import (
"crypto/tls"
wavefront "github.com/wavefronthq/wavefront-sdk-go/senders"
)

func ExampleNewSender_options() {
// NewSender accepts optional arguments. Use these if you need to set non-default ports for your Wavefront Proxy, tune batching parameters, or set tags for internal SDK metrics.
sender, err := wavefront.NewSender(
"http://localhost",
wavefront.BatchSize(20000), // Send batches of 20,000.
wavefront.FlushIntervalSeconds(5), // Flush every 5 seconds.
wavefront.MetricsPort(4321), // Use port 4321 for metrics.
wavefront.TracesPort(40001), // Use port 40001 for traces.
wavefront.BatchSize(20000), // Send batches of 20,000.
wavefront.FlushIntervalSeconds(5), // Flush every 5 seconds.
wavefront.MetricsPort(4321), // Use port 4321 for metrics.
wavefront.TracesPort(40001), // Use port 40001 for traces.
wavefront.Timeout(15), // Set an HTTP timeout in seconds (default is 10s)
wavefront.TLSConfigOptions(&tls.Config{}), // Set TLS config options.
)

if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions senders/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,18 @@ func TestEndToEnd(t *testing.T) {
assert.Equal(t, 1, len(testServer.MetricLines))
assert.Equal(t, "\"my-metric\" 20 source=\"localhost\"", testServer.MetricLines[0])
}

func TestTLSEndToEnd(t *testing.T) {
testServer := startTLSTestServer()
defer testServer.Close()
testServer.httpServer.Client()
tlsConfig := testServer.TLSConfig()

sender, err := NewSender(testServer.URL, TLSConfigOptions(tlsConfig))
require.NoError(t, err)
require.NoError(t, sender.SendMetric("my metric", 20, 0, "localhost", nil))
require.NoError(t, sender.Flush())

assert.Equal(t, 1, len(testServer.MetricLines))
assert.Equal(t, "\"my-metric\" 20 source=\"localhost\"", testServer.MetricLines[0])
}
18 changes: 18 additions & 0 deletions senders/test_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package senders
import (
"bufio"
"compress/gzip"
"crypto/tls"
"crypto/x509"
"net/http"
"net/http/httptest"
)
Expand All @@ -16,12 +18,28 @@ func startTestServer() *testServer {

}

func startTLSTestServer() *testServer {
handler := &testServer{}
server := httptest.NewTLSServer(handler)
handler.httpServer = server
handler.URL = server.URL
return handler
}

type testServer struct {
MetricLines []string
httpServer *httptest.Server
URL string
}

func (s *testServer) TLSConfig() *tls.Config {
certpool := x509.NewCertPool()
certpool.AddCert(s.httpServer.Certificate())
return &tls.Config{
RootCAs: certpool,
}
}

func (s *testServer) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
newLines, err := decodeMetricLines(request)
if err != nil {
Expand Down

0 comments on commit 45f78be

Please sign in to comment.