Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce httplb for ut #5375

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
github.com/apache/pulsar-client-go v0.14.0
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
github.com/aws/aws-sdk-go v1.55.5
github.com/bufbuild/httplb v0.3.0
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cenkalti/backoff/v4 v4.3.0
github.com/confluentinc/confluent-kafka-go/v2 v2.6.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdb
github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/bufbuild/httplb v0.3.0 h1:sCMPD+89ydD3atcVareDsiv/kUT+pLHolENMoCGZJV8=
github.com/bufbuild/httplb v0.3.0/go.mod h1:qDNs7dSFxIhKi/DA/rCCPVzbQfHs1JVxPMl9EvrbL4Q=
github.com/buger/goterm v1.0.4 h1:Z9YvGmOih81P0FbVtEYTFF6YsSgxSUKEhf/f9bTMXbY=
github.com/buger/goterm v1.0.4/go.mod h1:HiFWV3xnkolgrBV3mY8m0X0Pumt4zg4QhbdOzQtB8tE=
github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4=
Expand Down
87 changes: 87 additions & 0 deletions processor/transformer/embeded/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package embeded

Check failure on line 1 in processor/transformer/embeded/pubsub.go

View workflow job for this annotation

GitHub Actions / lint

`embeded` is a misspelling of `embedded` (misspell)

import (
"context"
"strings"

"github.com/rudderlabs/rudder-server/processor/transformer"
)

type PubSubTransformer struct{}

func (t *PubSubTransformer) Transform(ctx context.Context, clientEvents []transformer.TransformerEvent, batchSize int) transformer.Response {
var response transformer.Response

for _, clientEvent := range clientEvents {
msg := clientEvent.Message

// Get topic ID for the event
topicID := getTopicID(clientEvent)
if topicID == "" {
response.FailedEvents = append(response.FailedEvents, transformer.TransformerResponse{
Output: clientEvent.Message,
})
continue

Check warning on line 24 in processor/transformer/embeded/pubsub.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/embeded/pubsub.go#L12-L24

Added lines #L12 - L24 were not covered by tests
}

// Create attributes metadata
attributes := createAttributesMetadata(clientEvent)

// Create transformed event
transformedEvent := transformer.TransformerResponse{
Output: map[string]interface{}{
"message": msg,
"topic_id": topicID,
"attributes": attributes,
"user_id": getEventUserID(clientEvent),
},
}

response.Events = append(response.Events, transformedEvent)

Check warning on line 40 in processor/transformer/embeded/pubsub.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/embeded/pubsub.go#L28-L40

Added lines #L28 - L40 were not covered by tests
}

return response

Check warning on line 43 in processor/transformer/embeded/pubsub.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/embeded/pubsub.go#L43

Added line #L43 was not covered by tests
}

func getTopicID(event transformer.TransformerEvent) string {
// Get topic mapping from config
topicMap := event.Destination.Config["eventToTopicMap"].(map[string]string)

if event.Message["type"] == "" {
return ""
}

Check warning on line 52 in processor/transformer/embeded/pubsub.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/embeded/pubsub.go#L46-L52

Added lines #L46 - L52 were not covered by tests

// Try to match event name first, then type, then fallback to wildcard
if event.Message["event"] != "" {
if topic, ok := topicMap[strings.ToLower(event.Message["event"].(string))]; ok {
return topic
}

Check warning on line 58 in processor/transformer/embeded/pubsub.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/embeded/pubsub.go#L55-L58

Added lines #L55 - L58 were not covered by tests
}

if topic, ok := topicMap[strings.ToLower(event.Message["type"].(string))]; ok {
return topic
}

Check warning on line 63 in processor/transformer/embeded/pubsub.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/embeded/pubsub.go#L61-L63

Added lines #L61 - L63 were not covered by tests

return topicMap["*"]

Check warning on line 65 in processor/transformer/embeded/pubsub.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/embeded/pubsub.go#L65

Added line #L65 was not covered by tests
}

func createAttributesMetadata(event transformer.TransformerEvent) map[string]string {
attributes := make(map[string]string)

// Get attributes mapping from config
attrMap := event.Destination.Config["eventToAttributesMap"].(map[string]string)
if len(attrMap) == 0 {
return attributes
}

Check warning on line 75 in processor/transformer/embeded/pubsub.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/embeded/pubsub.go#L68-L75

Added lines #L68 - L75 were not covered by tests

// ... rest of attributes logic ...

return attributes

Check warning on line 79 in processor/transformer/embeded/pubsub.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/embeded/pubsub.go#L79

Added line #L79 was not covered by tests
}

func getEventUserID(event transformer.TransformerEvent) string {
if event.Message["user_id"] != "" {
return event.Message["user_id"].(string)
}
return event.Message["anonymous_id"].(string)

Check warning on line 86 in processor/transformer/embeded/pubsub.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/embeded/pubsub.go#L82-L86

Added lines #L82 - L86 were not covered by tests
}
64 changes: 57 additions & 7 deletions processor/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"context"
"fmt"
"io"
"net"
"net/http"
"os"
"runtime/trace"
Expand All @@ -15,6 +16,8 @@
"sync"
"time"

"github.com/bufbuild/httplb"
"github.com/bufbuild/httplb/resolver"
"github.com/cenkalti/backoff"
jsoniter "github.com/json-iterator/go"
"github.com/samber/lo"
Expand All @@ -26,6 +29,7 @@
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/processor/integrations"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/sysUtils"
"github.com/rudderlabs/rudder-server/utils/types"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)
Expand Down Expand Up @@ -136,9 +140,9 @@

type Opt func(*handle)

func WithClient(client *http.Client) Opt {
func WithClient(client HTTPDoer) Opt {
return func(s *handle) {
s.client = client
s.httpClient = client
}
}

Expand All @@ -149,6 +153,10 @@
Validate(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
}

type HTTPDoer interface {
Do(req *http.Request) (*http.Response, error)
}

// handle is the handle for this class
type handle struct {
sentStat stats.Measurement
Expand All @@ -159,7 +167,7 @@
logger logger.Logger
stat stats.Stats

client *http.Client
httpClient HTTPDoer

guardConcurrency chan struct{}

Expand Down Expand Up @@ -211,8 +219,11 @@

trans.guardConcurrency = make(chan struct{}, trans.config.maxConcurrency)

if trans.client == nil {
trans.client = &http.Client{
clientType := config.GetString("Transformer.Client.type", "stdlib")

switch clientType {
case "stdlib":
trans.httpClient = &http.Client{
Transport: &http.Transport{
DisableKeepAlives: trans.config.disableKeepAlives,
MaxConnsPerHost: trans.config.maxHTTPConnections,
Expand All @@ -221,6 +232,38 @@
},
Timeout: trans.config.timeoutDuration,
}
case "recycled":
trans.httpClient = sysUtils.NewRecycledHTTPClient(func() *http.Client {
return &http.Client{
Transport: &http.Transport{
DisableKeepAlives: trans.config.disableKeepAlives,
MaxConnsPerHost: trans.config.maxHTTPConnections,
MaxIdleConnsPerHost: trans.config.maxHTTPIdleConnections,
IdleConnTimeout: trans.config.maxIdleConnDuration,
},
Timeout: trans.config.timeoutDuration,
}
}, config.GetDuration("Transformer.Client.ttl", 120, time.Second))
case "httplb":
trans.httpClient = httplb.NewClient(
httplb.WithTransport("http", &HTTPLBTransport{
Transport: &http.Transport{
DisableKeepAlives: trans.config.disableKeepAlives,
MaxConnsPerHost: trans.config.maxHTTPConnections,
MaxIdleConnsPerHost: trans.config.maxHTTPIdleConnections,
IdleConnTimeout: trans.config.maxIdleConnDuration,
},
}),
httplb.WithResolver(
resolver.NewDNSResolver(
net.DefaultResolver,
resolver.PreferIPv6,
config.GetDuration("Transformer.Client.ttl", 120, time.Second), // TTL value
),
),
)
default:
panic(fmt.Sprintf("unknown transformer client type: %s", clientType))

Check warning on line 266 in processor/transformer/transformer.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/transformer.go#L235-L266

Added lines #L235 - L266 were not covered by tests
}

for _, opt := range opts {
Expand All @@ -245,6 +288,14 @@
return trans.transform(ctx, clientEvents, trans.trackingPlanValidationURL(), batchSize, trackingPlanValidationStage)
}

type HTTPLBTransport struct {
*http.Transport
}

func (t *HTTPLBTransport) NewRoundTripper(scheme, target string, config httplb.TransportConfig) httplb.RoundTripperResult {
return httplb.RoundTripperResult{RoundTripper: t.Transport, Close: t.CloseIdleConnections}

Check warning on line 296 in processor/transformer/transformer.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/transformer.go#L295-L296

Added lines #L295 - L296 were not covered by tests
}

func (trans *handle) transform(
ctx context.Context,
clientEvents []TransformerEvent,
Expand Down Expand Up @@ -474,7 +525,7 @@
// Header to let transformer know that the client understands event filter code
req.Header.Set("X-Feature-Filter-Code", "?1")

resp, reqErr = trans.client.Do(req)
resp, reqErr = trans.httpClient.Do(req)
})
trans.stat.NewTaggedStat("processor.transformer_request_time", stats.TimerType, tags).SendTiming(time.Since(requestStartTime))
if reqErr != nil {
Expand All @@ -495,7 +546,6 @@
retryCount++
trans.logger.Warnn(
"JS HTTP connection error",
logger.NewStringField("URL", url),
logger.NewErrorField(err),
logger.NewIntField("attempts", int64(retryCount)),
)
Expand Down
10 changes: 5 additions & 5 deletions processor/transformer/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestTransformer(t *testing.T) {
tr.stat = statsStore
tr.logger = logger.NOP
tr.conf = config.Default
tr.client = srv.Client()
tr.httpClient = srv.Client()
tr.guardConcurrency = make(chan struct{}, 200)
tr.sentStat = tr.stat.NewStat("transformer_sent", stats.CountType)
tr.receivedStat = tr.stat.NewStat("transformer_received", stats.CountType)
Expand Down Expand Up @@ -359,7 +359,7 @@ func TestTransformer(t *testing.T) {
tr.stat = stats.Default
tr.logger = logger.NOP
tr.conf = config.Default
tr.client = client
tr.httpClient = client
tr.config.maxRetry = config.SingleValueLoader(tc.retries)
tr.config.failOnUserTransformTimeout = config.SingleValueLoader(tc.failOnUserTransformTimeout)
tr.cpDownGauge = tr.stat.NewStat("control_plane_down", stats.GaugeType)
Expand Down Expand Up @@ -422,7 +422,7 @@ func TestTransformer(t *testing.T) {
tr.stat = stats.Default
tr.logger = logger.NOP
tr.conf = config.Default
tr.client = srv.Client()
tr.httpClient = srv.Client()
tr.config.maxRetry = config.SingleValueLoader(1)
tr.config.maxRetryBackoffInterval = config.SingleValueLoader(1 * time.Second)
tr.config.timeoutDuration = 1 * time.Second
Expand Down Expand Up @@ -553,7 +553,7 @@ func TestTransformer(t *testing.T) {
tr.stat = stats.Default
tr.logger = logger.NOP
tr.conf = config.Default
tr.client = srv.Client()
tr.httpClient = srv.Client()
tr.config.failOnUserTransformTimeout = config.SingleValueLoader(false)
tr.config.maxRetry = config.SingleValueLoader(tc.retries)
tr.config.failOnError = config.SingleValueLoader(tc.failOnError)
Expand Down Expand Up @@ -653,7 +653,7 @@ func TestTransformer(t *testing.T) {
defer srv.Close()

tr := handle{}
tr.client = srv.Client()
tr.httpClient = srv.Client()
tr.stat = stats.Default
tr.conf = config.Default
tr.logger = logger.NOP
Expand Down
35 changes: 35 additions & 0 deletions utils/sysUtils/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,44 @@

import (
"net/http"
"sync"
"time"
)

// HTTPClient interface
type HTTPClientI interface {
Do(req *http.Request) (*http.Response, error)
}

type RecycledHTTPClient struct {
client *http.Client
lastRefreshTime time.Time
ttl time.Duration
clientFunc func() *http.Client
lock sync.Mutex
}

func NewRecycledHTTPClient(_clientFunc func() *http.Client, _ttl time.Duration) *RecycledHTTPClient {
return &RecycledHTTPClient{
client: _clientFunc(),
clientFunc: _clientFunc,
ttl: _ttl,
lastRefreshTime: time.Now(),
}

Check warning on line 29 in utils/sysUtils/httpclient.go

View check run for this annotation

Codecov / codecov/patch

utils/sysUtils/httpclient.go#L23-L29

Added lines #L23 - L29 were not covered by tests
}

func (r *RecycledHTTPClient) GetClient() *http.Client {
r.lock.Lock()
defer r.lock.Unlock()

if r.ttl > 0 && time.Since(r.lastRefreshTime) > r.ttl {
r.client.CloseIdleConnections()
r.client = r.clientFunc()
r.lastRefreshTime = time.Now()
}
return r.client

Check warning on line 41 in utils/sysUtils/httpclient.go

View check run for this annotation

Codecov / codecov/patch

utils/sysUtils/httpclient.go#L32-L41

Added lines #L32 - L41 were not covered by tests
}

func (r *RecycledHTTPClient) Do(req *http.Request) (*http.Response, error) {
return r.GetClient().Do(req)

Check warning on line 45 in utils/sysUtils/httpclient.go

View check run for this annotation

Codecov / codecov/patch

utils/sysUtils/httpclient.go#L44-L45

Added lines #L44 - L45 were not covered by tests
}
Loading