Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions nats/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (d *natsDispatcher) processMsg(ctx context.Context, opt DurableCfg, msg *na
log.Warn().Func(logDetailsFn(true)).
Uint64("done.sequence", seq).
Str("durable", opt.Durable).
Str("nats.msg.headers", fmt.Sprintf("%+v", msg.Header)).
Str("nats.msg.header", fmt.Sprintf("%+v", msg.Header)).
Msg("dispatcher lost order")
}
}
Expand All @@ -210,14 +210,14 @@ func (d *natsDispatcher) processMsg(ctx context.Context, opt DurableCfg, msg *na
d.duraSeqs.Set(opt.Durable, meta.Sequence.Stream, -1)
log.Info().Func(logDetailsFn()).
Str("durable", opt.Durable).
Str("nats.msg.headers", fmt.Sprintf("%+v", msg.Header)).
Str("nats.msg.header", fmt.Sprintf("%+v", msg.Header)).
Msg("dispatcher delivered")
return nil
}
if !errors.Is(err, tcgerr.ErrTransient) {
log.Warn().Err(err).Func(logDetailsFn(true)).
Str("durable", opt.Durable).
Str("nats.msg.headers", fmt.Sprintf("%+v", msg.Header)).
Str("nats.msg.header", fmt.Sprintf("%+v", msg.Header)).
Msg("dispatcher could not deliver: will not retry")
return nil
}
Expand All @@ -238,7 +238,7 @@ func (d *natsDispatcher) processMsg(ctx context.Context, opt DurableCfg, msg *na
d.retries.Delete(opt.Durable)
log.Warn().Err(err).Func(logDetailsFn(true)).
Str("durable", opt.Durable).
Str("nats.msg.headers", fmt.Sprintf("%+v", msg.Header)).
Str("nats.msg.header", fmt.Sprintf("%+v", msg.Header)).
Msg("dispatcher could not deliver: stop retrying")
return nil
}
Expand All @@ -247,7 +247,7 @@ func (d *natsDispatcher) processMsg(ctx context.Context, opt DurableCfg, msg *na
log.Info().Err(err).Func(logDetailsFn()).
Int("retry", retry.Retry).
Str("durable", opt.Durable).
Str("nats.msg.headers", fmt.Sprintf("%+v", msg.Header)).
Str("nats.msg.header", fmt.Sprintf("%+v", msg.Header)).
Msg("dispatcher could not deliver: will retry")

return retry
Expand Down
73 changes: 39 additions & 34 deletions nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"expvar"
"fmt"
"maps"
"net/http"
"os"
"strings"
Expand Down Expand Up @@ -135,36 +136,20 @@ func StartServer(config Config) error {

nc, err := nats.Connect(s.server.ClientURL())
if err != nil {
log.Warn().Err(err).Msg("nats failed Connect")
log.Err(err).Msg("nats failed Connect")
return err
}
s.ncPublisher = nc

if err := defineStream(nc); err != nil {
log.Err(err).Msg("nats failed defineStream")
return err
}

ctx, cancel := context.WithCancel(context.Background())
s.cancel = cancel

go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case msg := <-s.pub:
if err := s.ncPublisher.PublishMsg(msg); err != nil {
log.Warn().Err(err).
Str("headers", fmt.Sprintf("%+v", msg.Header)).
Msg("nats failed PublishMsg: reconnecting")
if nc, err := nats.Connect(s.server.ClientURL()); err == nil {
s.ncPublisher = nc
} else {
log.Warn().Err(err).Msg("nats failed Connect")
}
s.ncPublisher = nc
}
}
}
}(ctx)

return defineStream(nc)
go handlePubchan(ctx)
return nil
}

func defineStream(nc *nats.Conn) error {
Expand Down Expand Up @@ -374,7 +359,7 @@ func StopDispatcher() error {
}

// Pub sends NATS message in buffered channel
func Pub(subj string, data []byte, headers http.Header) error {
func Pub(subj string, data []byte, header http.Header) error {
if len(data) > int(s.config.MaxPayload) {
err := fmt.Errorf("%w: %v / %v / %v",
ErrPayloadLim, subj, s.config.MaxPayload, len(data))
Expand All @@ -383,10 +368,7 @@ func Pub(subj string, data []byte, headers http.Header) error {
}
msg := nats.NewMsg(subj)
msg.Data = data
if headers != nil {
// nats.Header compatible with http.Header as type of map[string][]string
msg.Header = nats.Header(headers)
}
maps.Copy(msg.Header, header)
// use goroutine as L2 buffer
go func(msg *nats.Msg) { s.pub <- msg }(msg)
return nil
Expand All @@ -395,7 +377,7 @@ func Pub(subj string, data []byte, headers http.Header) error {
// Publish sends NATS message
//
// Deprecated: Use Pub
func Publish(subj string, data []byte, headers http.Header) error {
func Publish(subj string, data []byte, header http.Header) error {
if len(data) > int(s.config.MaxPayload) {
err := fmt.Errorf("%w: %v / %v / %v",
ErrPayloadLim, subj, s.config.MaxPayload, len(data))
Expand All @@ -422,10 +404,7 @@ func Publish(subj string, data []byte, headers http.Header) error {

msg := nats.NewMsg(subj)
msg.Data = data
if headers != nil {
// nats.Header compatible with http.Header as type of map[string][]string
msg.Header = nats.Header(headers)
}
maps.Copy(msg.Header, header)
return s.ncPublisher.PublishMsg(msg)
}

Expand All @@ -436,3 +415,29 @@ func IsStartedDispatcher() bool {
func IsStartedServer() bool {
return s != nil && s.server != nil
}

func handlePubchan(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case msg := <-s.pub:
if err := s.ncPublisher.PublishMsg(msg); err != nil {
log.Warn().Err(err).
Str("header", fmt.Sprintf("%+v", msg.Header)).
Msg("nats failed PublishMsg: reconnecting")
if nc, err := nats.Connect(s.server.ClientURL()); err == nil {
s.ncPublisher = nc
if err := s.ncPublisher.PublishMsg(msg); err != nil {
log.Warn().Err(err).
Str("header", fmt.Sprintf("%+v", msg.Header)).
Msg("nats failed PublishMsg")
}
} else {
log.Warn().Err(err).Msg("nats failed reConnect")
time.Sleep(time.Second)
}
}
}
}
}
53 changes: 25 additions & 28 deletions sdk/clients/gwClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,61 +361,57 @@ func (client *GWClient) ValidateToken(appName, apiToken string) error {
// SynchronizeInventory calls API
func (client *GWClient) SynchronizeInventory(ctx context.Context, payload []byte) ([]byte, error) {
headers := []string{}
hdrCompressed := ""
if h := ctx.Value(CtxHeaders); h != nil {
if h, ok := h.(interface{ Get(string) string }); ok {
if hdrCompressed = h.Get(HdrCompressed); hdrCompressed != "" {
headers = append(headers, "Content-Encoding", hdrCompressed)
}
}
encoding := ""
if IsGZipped(payload) {
encoding = "gzip"
}
mergeParam := make(map[string]string)
mergeHosts := client.GWConnection.MergeHosts
if client.GWConnection.HTTPEncode && hdrCompressed == "" {
if client.GWConnection.HTTPEncode && encoding == "" {
var buf bytes.Buffer
var err error
ctx, err = GZIP(ctx, &buf, payload)
if err != nil {
if ctx, err = GZip(ctx, &buf, payload); err != nil {
return nil, err
}
payload = buf.Bytes()
headers = append(headers, "Content-Encoding", "gzip")
encoding = "gzip"
}
if encoding != "" {
headers = append(headers, "Content-Encoding", encoding)
}
mergeParam["merge"] = strconv.FormatBool(mergeHosts)
if client.PrefixResourceNames && client.ResourceNamePrefix != "" {
headers = append(headers, "HostNamePrefix", client.ResourceNamePrefix)
}
mergeParam := make(map[string]string)
mergeParam["merge"] = strconv.FormatBool(client.GWConnection.MergeHosts)
return client.sendRequest(ctx, http.MethodPost, GWEntrypointSynchronizeInventory, BuildQueryParams(mergeParam),
payload, headers...)
}

// SendResourcesWithMetrics calls API
func (client *GWClient) SendResourcesWithMetrics(ctx context.Context, payload []byte) ([]byte, error) {
headers := []string{}
hdrCompressed := ""
if h := ctx.Value(CtxHeaders); h != nil {
if h, ok := h.(interface{ Get(string) string }); ok {
if hdrCompressed = h.Get(HdrCompressed); hdrCompressed != "" {
headers = append(headers, "Content-Encoding", hdrCompressed)
}
}
encoding := ""
if IsGZipped(payload) {
encoding = "gzip"
}
if client.GWConnection.HTTPEncode && hdrCompressed == "" {
if client.GWConnection.HTTPEncode && encoding == "" {
var buf bytes.Buffer
var err error
if ctx, err = GZIP(ctx, &buf, payload); err != nil {
if ctx, err = GZip(ctx, &buf, payload); err != nil {
return nil, err
}
payload = buf.Bytes()
headers = append(headers, "Content-Encoding", "gzip")
encoding = "gzip"
}
entrypoint := GWEntrypointSendResourceWithMetrics
if client.IsDynamicInventory {
entrypoint = GWEntrypointSendResourceWithMetricsDyn
if encoding != "" {
headers = append(headers, "Content-Encoding", encoding)
}
if client.PrefixResourceNames && client.ResourceNamePrefix != "" {
headers = append(headers, "HostNamePrefix", client.ResourceNamePrefix)
}
entrypoint := GWEntrypointSendResourceWithMetrics
if client.IsDynamicInventory {
entrypoint = GWEntrypointSendResourceWithMetricsDyn
}
return client.sendRequest(ctx, http.MethodPost, entrypoint, "", payload, headers...)
}

Expand Down Expand Up @@ -548,11 +544,12 @@ func (client *GWClient) sendRequest(ctx context.Context, httpMethod string, entr

switch {
case err != nil:
sdklog.Logger.LogAttrs(ctx, slog.LevelError, "could not send request", req.LogAttrs()...)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) ||
tcgerr.IsErrorDNS(err) || tcgerr.IsErrorConnection(err) || tcgerr.IsErrorTimedOut(err) {
sdklog.Logger.LogAttrs(ctx, slog.LevelWarn, "could not send request", req.LogAttrs()...)
return nil, fmt.Errorf("%w: %v", tcgerr.ErrTransient, err.Error())
}
sdklog.Logger.LogAttrs(ctx, slog.LevelError, "could not send request", req.LogAttrs()...)
return nil, err

case req.Status == 401:
Expand Down
36 changes: 36 additions & 0 deletions sdk/clients/header.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package clients

import (
"context"
"maps"
"net/http"
)

// Header key is canonicalized by [textproto.CanonicalMIMEHeaderKey].
// That's important for Get/Set operations on http.Header
const (
HdrCompressed = "Compressed"
HdrPayloadLen = "Payload-Lenght"
HdrPayloadType = "Payload-Type"
HdrSpanSpanID = "Span-Span-Id"
HdrSpanTraceID = "Span-Trace-Id"
HdrSpanTraceFlags = "Span-Trace-Flags"
HdrTodoTracerCtx = "Todo-Tracer-Ctx"
)

type CtxKey any

var ctxHeader = CtxKey("header")

func CtxWithHeader(ctx context.Context, header http.Header) context.Context {
if h, ok := ctx.Value(ctxHeader).(http.Header); ok {
maps.Copy(h, header)
return context.WithValue(ctx, ctxHeader, h)
}
return context.WithValue(ctx, ctxHeader, header)
}

func HeaderFromCtx(ctx context.Context) (http.Header, bool) {
h, ok := ctx.Value(ctxHeader).(http.Header)
return h, ok
}
17 changes: 0 additions & 17 deletions sdk/clients/headers.go

This file was deleted.

12 changes: 11 additions & 1 deletion sdk/clients/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"io"
"log/slog"
"maps"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -50,13 +51,19 @@ var HookRequestContext = func(ctx context.Context, req *http.Request) (context.C
return ctx, req
}

var GZIP = func(ctx context.Context, w io.Writer, p []byte) (context.Context, error) {
var GZip = func(ctx context.Context, w io.Writer, p []byte) (context.Context, error) {
gw := gzip.NewWriter(w)
_, err := gw.Write(p)
_ = gw.Close()
return ctx, err
}

// IsGZipped detects if payload was compressed with gzip
// by magic number: 1st byte is 0x1f and 2nd is 0x8b
func IsGZipped(p []byte) bool {
return len(p) > 2 && p[0] == 31 && p[1] == 139
}

// SendRequest wraps HTTP methods
func SendRequest(httpMethod string, requestURL string,
headers map[string]string, formValues map[string]string, body []byte) (int, []byte, error) {
Expand Down Expand Up @@ -146,6 +153,9 @@ func (q *Req) SendWithContext(ctx context.Context) error {
q.Status, q.Err = -1, err
return err
}
if h, ok := ctx.Value(ctxHeader).(http.Header); ok {
maps.Copy(request.Header, h)
}
request.Header.Set("Connection", "close")
for k, v := range q.Headers {
request.Header.Add(k, v)
Expand Down
2 changes: 1 addition & 1 deletion services/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ func (service *AgentService) initOTEL() {
ctx, req = nestedHook(ctx, req)
return tracing.HookRequestContext(ctx, req)
}
clients.GZIP = tracing.GZIP
clients.GZip = tracing.GZip
}

// initProM inits Prometheus metrics
Expand Down
Loading