From a21d184ca42f40b20279fd776bfc6a44e1e4e53c Mon Sep 17 00:00:00 2001 From: Aaron Harper Date: Wed, 11 Sep 2024 16:04:16 -0400 Subject: [PATCH 1/3] Add in-band sync --- consts.go | 4 + env.go | 24 +- go.mod | 2 +- go.sum | 4 +- handler.go | 325 ++++++++++++++---- headers.go | 1 + internal/types/structs.go | 21 ++ .../github.com/inngest/inngest/pkg/sdk/sdk.go | 2 + .../telemetry/exporters/batch_processor.go | 246 +++++++++++++ .../inngest/pkg/telemetry/exporters/nats.go | 15 +- .../inngest/pkg/telemetry/metrics/counter.go | 22 +- .../inngest/pkg/telemetry/metrics/gauge.go | 18 + .../inngest/pkg/telemetry/trace/tracer.go | 36 +- vendor/modules.txt | 2 +- 14 files changed, 644 insertions(+), 78 deletions(-) create mode 100644 internal/types/structs.go create mode 100644 vendor/github.com/inngest/inngest/pkg/telemetry/exporters/batch_processor.go diff --git a/consts.go b/consts.go index 8a51078d..2c6bba43 100644 --- a/consts.go +++ b/consts.go @@ -1,6 +1,10 @@ package inngestgo const ( + SDKAuthor = "inngest" SDKLanguage = "go" SDKVersion = "0.7.3" + + SyncKindInBand = "in_band" + SyncKindOutOfBand = "out_of_band" ) diff --git a/env.go b/env.go index 52468b2d..b3128f0d 100644 --- a/env.go +++ b/env.go @@ -3,6 +3,7 @@ package inngestgo import ( "net/url" "os" + "strings" ) const ( @@ -15,8 +16,8 @@ const ( // To use the dev server, set INNGEST_DEV to any non-empty value OR the URL of the development // server, eg: // -// INNGEST_DEV=1 -// INNGEST_DEV=http://192.168.1.254:8288 +// INNGEST_DEV=1 +// INNGEST_DEV=http://192.168.1.254:8288 func IsDev() bool { return os.Getenv("INNGEST_DEV") != "" } @@ -32,3 +33,22 @@ func DevServerURL() string { } return devServerURL } + +func allowInBandSync() bool { + val := os.Getenv("INNGEST_ALLOW_IN_BAND_SYNC") + if val == "" { + // TODO: Default to true once in-band syncing is stable + return false + } + + return isTruthy(val) +} + +func isTruthy(val string) bool { + val = strings.ToLower(val) + if val == "false" || val == "0" || val == "" { + return false + } + + return true +} diff --git a/go.mod b/go.mod index 957a2691..399373c3 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ toolchain go1.21.5 require ( github.com/gosimple/slug v1.12.0 github.com/gowebpki/jcs v1.0.0 - github.com/inngest/inngest v0.30.0-beta-2.0.20240905105759-4df2bfd0dd29 + github.com/inngest/inngest v0.30.0-beta-2.0.20240912142400-4c207d8fb0ce github.com/stretchr/testify v1.9.0 github.com/xhit/go-str2duration/v2 v2.1.0 golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f diff --git a/go.sum b/go.sum index fa786559..968b86b1 100644 --- a/go.sum +++ b/go.sum @@ -45,8 +45,8 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/inngest/expr v0.0.0-20240717151033-03e4378c436c h1:9XaJS0BV+5wYqv3pVh9OxowUPArRdGMRa8xFdV7eTAI= github.com/inngest/expr v0.0.0-20240717151033-03e4378c436c/go.mod h1:Dq8dNC1Q/cZBjl6ltxCoAeNn8fstrBJWWjeSisqTL94= -github.com/inngest/inngest v0.30.0-beta-2.0.20240905105759-4df2bfd0dd29 h1:411jiRzzlz604f+76s+cFqCSMbCdT8J1uDbz4tMbhIM= -github.com/inngest/inngest v0.30.0-beta-2.0.20240905105759-4df2bfd0dd29/go.mod h1:AM/9t/guHM0r4oZFqXBWYyvMJABQ/fP3qkrxZOiFFLI= +github.com/inngest/inngest v0.30.0-beta-2.0.20240912142400-4c207d8fb0ce h1:E1EGAXuWyrH4lHrx0/BxBVpA7JEH2y27M0brtRLArNg= +github.com/inngest/inngest v0.30.0-beta-2.0.20240912142400-4c207d8fb0ce/go.mod h1:AM/9t/guHM0r4oZFqXBWYyvMJABQ/fP3qkrxZOiFFLI= github.com/karlseguin/ccache/v2 v2.0.8 h1:lT38cE//uyf6KcFok0rlgXtGFBWxkI6h/qg4tbFyDnA= github.com/karlseguin/ccache/v2 v2.0.8/go.mod h1:2BDThcfQMf/c0jnZowt16eW405XIqZPavt+HoYEtcxQ= github.com/karlseguin/expect v1.0.2-0.20190806010014-778a5f0c6003 h1:vJ0Snvo+SLMY72r5J4sEfkuE7AFbixEP2qRbEcum/wA= diff --git a/handler.go b/handler.go index 61a3d8eb..870cb979 100644 --- a/handler.go +++ b/handler.go @@ -20,6 +20,7 @@ import ( "github.com/inngest/inngest/pkg/sdk" "github.com/inngest/inngestgo/errors" "github.com/inngest/inngestgo/internal/sdkrequest" + "github.com/inngest/inngestgo/internal/types" "github.com/inngest/inngestgo/step" "golang.org/x/exp/slog" ) @@ -40,6 +41,7 @@ var ( DefaultMaxBodySize = 1024 * 1024 * 100 capabilities = sdk.Capabilities{ + InBandSync: sdk.InBandSyncV1, TrustProbe: sdk.TrustProbeV1, } ) @@ -266,39 +268,185 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // all functions and automatically allows all functions to immediately be triggered // by incoming events or schedules. func (h *handler) register(w http.ResponseWriter, r *http.Request) error { - h.l.Lock() - defer h.l.Unlock() + var err error + if r.Header.Get(HeaderKeySyncKind) == SyncKindInBand && allowInBandSync() { + err = h.inBandSync(w, r) + } else { + err = h.outOfBandSync(w, r) + } - scheme := "http" - if r.TLS != nil { - scheme = "https" + if err != nil { + h.Logger.Error("out-of-band sync error", "error", err) } - host := r.Host + return err +} - // Get the sync ID from the URL and then remove it, since we don't want the - // sync ID to show in the function URLs (that would affect the checksum and - // is ugly in the UI) - qp := r.URL.Query() - syncID := qp.Get("deployId") - qp.Del("deployId") - r.URL.RawQuery = qp.Encode() +type inBandSynchronizeRequest struct { + URL string `json:"url"` +} - pathAndParams := r.URL.String() +func (i inBandSynchronizeRequest) Validate() error { + if i.URL == "" { + return fmt.Errorf("missing URL") + } + return nil +} - config := sdk.RegisterRequest{ - URL: fmt.Sprintf("%s://%s%s", scheme, host, pathAndParams), - V: "1", - DeployType: "ping", - SDK: HeaderValueSDK, - AppName: h.appName, - Headers: sdk.Headers{ - Env: h.GetEnv(), - Platform: platform(), - }, - Capabilities: capabilities, +type inBandSynchronizeResponse struct { + AppID string `json:"app_id"` + Env *string `json:"env"` + Framework *string `json:"framework"` + Functions []sdk.SDKFunction `json:"functions"` + Inspection map[string]any `json:"inspection"` + Platform *string `json:"platform"` + SDKAuthor string `json:"sdk_author"` + SDKLanguage string `json:"sdk_language"` + SDKVersion string `json:"sdk_version"` + URL string `json:"url"` +} + +func (h *handler) inBandSync( + w http.ResponseWriter, + r *http.Request, +) error { + ctx := r.Context() + defer r.Body.Close() + + var sig string + if !IsDev() { + if sig = r.Header.Get(HeaderKeySignature); sig == "" { + return publicerr.Error{ + Message: fmt.Sprintf("missing %s header", HeaderKeySignature), + Status: 401, + } + } + } + + max := h.HandlerOpts.MaxBodySize + if max == 0 { + max = DefaultMaxBodySize + } + reqByt, err := io.ReadAll(http.MaxBytesReader(w, r.Body, int64(max))) + if err != nil { + return publicerr.Error{ + Message: "error reading request body", + Status: 500, + } } - for _, fn := range h.funcs { + valid, skey, err := ValidateSignature( + ctx, + sig, + h.GetSigningKey(), + h.GetSigningKeyFallback(), + reqByt, + ) + if err != nil { + return publicerr.Error{ + Message: "error validating signature", + Status: 401, + } + } + if !valid { + return publicerr.Error{ + Message: "invalid signature", + Status: 401, + } + } + + var reqBody inBandSynchronizeRequest + err = json.Unmarshal(reqByt, &reqBody) + if err != nil { + return publicerr.Error{ + Message: fmt.Errorf("malformed input: %w", err).Error(), + Status: 400, + } + } + err = reqBody.Validate() + if err != nil { + return publicerr.Error{ + Message: fmt.Errorf("malformed input: %w", err).Error(), + Status: 400, + } + } + + appURL, err := url.Parse(reqBody.URL) + if err != nil { + return publicerr.Error{ + Message: fmt.Errorf("malformed input: %w", err).Error(), + Status: 400, + } + } + if h.URL != nil { + appURL = h.URL + } + + fns, err := createFunctionConfigs(h.appName, h.funcs, *appURL) + if err != nil { + return fmt.Errorf("error creating function configs: %w", err) + } + + var env *string + if h.GetEnv() != "" { + val := h.GetEnv() + env = &val + } + + inspection, err := h.createSecureInspection() + if err != nil { + return fmt.Errorf("error creating inspection: %w", err) + } + inspectionMap, err := types.StructToMap(inspection) + if err != nil { + return fmt.Errorf("error converting inspection to map: %w", err) + } + + respBody := inBandSynchronizeResponse{ + AppID: h.appName, + Env: env, + Functions: fns, + Inspection: inspectionMap, + SDKAuthor: SDKAuthor, + SDKLanguage: SDKLanguage, + SDKVersion: SDKVersion, + URL: appURL.String(), + } + + respByt, err := json.Marshal(respBody) + if err != nil { + return fmt.Errorf("error marshalling response: %w", err) + } + + resSig, err := Sign(ctx, time.Now(), []byte(skey), respByt) + if err != nil { + return fmt.Errorf("error signing response: %w", err) + } + w.Header().Add(HeaderKeySignature, resSig) + w.Header().Add(HeaderKeyContentType, "application/json") + w.Header().Add(HeaderKeySyncKind, SyncKindInBand) + + err = json.NewEncoder(w).Encode(respBody) + if err != nil { + return fmt.Errorf("error writing response: %w", err) + } + + return nil +} + +func createFunctionConfigs( + appName string, + fns []ServableFunction, + appURL url.URL, +) ([]sdk.SDKFunction, error) { + if appName == "" { + return nil, fmt.Errorf("missing app name") + } + if appURL == (url.URL{}) { + return nil, fmt.Errorf("missing URL") + } + + fnConfigs := make([]sdk.SDKFunction, len(fns)) + for i, fn := range fns { c := fn.Config() var retries *sdk.StepRetries @@ -309,15 +457,14 @@ func (h *handler) register(w http.ResponseWriter, r *http.Request) error { } // Modify URL to contain fn ID, step params - url := h.url(r) - values := url.Query() + values := appURL.Query() values.Set("fnId", fn.Slug()) values.Set("step", "step") - url.RawQuery = values.Encode() + appURL.RawQuery = values.Encode() f := sdk.SDKFunction{ Name: fn.Name(), - Slug: h.appName + "-" + fn.Slug(), + Slug: appName + "-" + fn.Slug(), Idempotency: c.Idempotency, Priority: fn.Config().Priority, Triggers: inngest.MultipleTriggers{}, @@ -331,7 +478,7 @@ func (h *handler) register(w http.ResponseWriter, r *http.Request) error { Name: fn.Name(), Retries: retries, Runtime: map[string]any{ - "url": url.String(), + "url": appURL.String(), }, }, }, @@ -379,9 +526,51 @@ func (h *handler) register(w http.ResponseWriter, r *http.Request) error { } } - config.Functions = append(config.Functions, f) + fnConfigs[i] = f + } + + return fnConfigs, nil +} + +func (h *handler) outOfBandSync(w http.ResponseWriter, r *http.Request) error { + h.l.Lock() + defer h.l.Unlock() + + scheme := "http" + if r.TLS != nil { + scheme = "https" + } + host := r.Host + + // Get the sync ID from the URL and then remove it, since we don't want the + // sync ID to show in the function URLs (that would affect the checksum and + // is ugly in the UI) + qp := r.URL.Query() + syncID := qp.Get("deployId") + qp.Del("deployId") + r.URL.RawQuery = qp.Encode() + + pathAndParams := r.URL.String() + + config := sdk.RegisterRequest{ + URL: fmt.Sprintf("%s://%s%s", scheme, host, pathAndParams), + V: "1", + DeployType: "ping", + SDK: HeaderValueSDK, + AppName: h.appName, + Headers: sdk.Headers{ + Env: h.GetEnv(), + Platform: platform(), + }, + Capabilities: capabilities, } + fns, err := createFunctionConfigs(h.appName, h.funcs, *h.url(r)) + if err != nil { + return fmt.Errorf("error creating function configs: %w", err) + } + config.Functions = fns + registerURL := defaultRegisterURL if IsDev() { // TODO: Check if dev server is up. If not, error. We can't deploy to production. @@ -650,6 +839,45 @@ type secureIntrospection struct { SigningKeyHash *string `json:"signing_key_hash"` } +func (h *handler) createSecureInspection() (*secureIntrospection, error) { + mode := "cloud" + if IsDev() { + mode = "dev" + } + + var signingKeyHash *string + if h.GetSigningKey() != "" { + key, err := hashedSigningKey([]byte(h.GetSigningKey())) + if err != nil { + return nil, fmt.Errorf("error hashing signing key: %w", err) + } + hash := string(key) + signingKeyHash = &hash + } + + var signingKeyFallbackHash *string + if h.GetSigningKeyFallback() != "" { + key, err := hashedSigningKey([]byte(h.GetSigningKeyFallback())) + if err != nil { + return nil, fmt.Errorf("error hashing signing key fallback: %w", err) + } + hash := string(key) + signingKeyFallbackHash = &hash + } + + return &secureIntrospection{ + insecureIntrospection: insecureIntrospection{ + FunctionCount: len(h.funcs), + HasEventKey: os.Getenv("INNGEST_EVENT_KEY") != "", + HasSigningKey: h.GetSigningKey() != "", + Mode: mode, + }, + Capabilities: capabilities, + SigningKeyFallbackHash: signingKeyFallbackHash, + SigningKeyHash: signingKeyHash, + }, nil +} + func (h *handler) introspect(w http.ResponseWriter, r *http.Request) error { defer r.Body.Close() @@ -667,36 +895,9 @@ func (h *handler) introspect(w http.ResponseWriter, r *http.Request) error { []byte{}, ) if valid { - var signingKeyHash *string - if h.GetSigningKey() != "" { - key, err := hashedSigningKey([]byte(h.GetSigningKey())) - if err != nil { - return fmt.Errorf("error hashing signing key: %w", err) - } - hash := string(key) - signingKeyHash = &hash - } - - var signingKeyFallbackHash *string - if h.GetSigningKeyFallback() != "" { - key, err := hashedSigningKey([]byte(h.GetSigningKeyFallback())) - if err != nil { - return fmt.Errorf("error hashing signing key fallback: %w", err) - } - hash := string(key) - signingKeyFallbackHash = &hash - } - - introspection := secureIntrospection{ - insecureIntrospection: insecureIntrospection{ - FunctionCount: len(h.funcs), - HasEventKey: os.Getenv("INNGEST_EVENT_KEY") != "", - HasSigningKey: h.GetSigningKey() != "", - Mode: mode, - }, - Capabilities: capabilities, - SigningKeyFallbackHash: signingKeyFallbackHash, - SigningKeyHash: signingKeyHash, + introspection, err := h.createSecureInspection() + if err != nil { + return err } w.Header().Set(HeaderKeyContentType, "application/json") diff --git a/headers.go b/headers.go index 0e64755c..239dcf8f 100644 --- a/headers.go +++ b/headers.go @@ -15,6 +15,7 @@ const ( HeaderKeySDK = "X-Inngest-SDK" HeaderKeyServerKind = "X-Inngest-Server-Kind" HeaderKeySignature = "X-Inngest-Signature" + HeaderKeySyncKind = "x-inngest-sync-kind" HeaderKeyUserAgent = "User-Agent" ) diff --git a/internal/types/structs.go b/internal/types/structs.go new file mode 100644 index 00000000..940120b2 --- /dev/null +++ b/internal/types/structs.go @@ -0,0 +1,21 @@ +package types + +import ( + "encoding/json" + "fmt" +) + +func StructToMap(v any) (map[string]any, error) { + byt, err := json.Marshal(v) + if err != nil { + return nil, fmt.Errorf("failed to marshal: %w", err) + } + + var out = make(map[string]any) + err = json.Unmarshal(byt, &out) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal: %w", err) + } + + return out, nil +} diff --git a/vendor/github.com/inngest/inngest/pkg/sdk/sdk.go b/vendor/github.com/inngest/inngest/pkg/sdk/sdk.go index a7689cec..5717c669 100644 --- a/vendor/github.com/inngest/inngest/pkg/sdk/sdk.go +++ b/vendor/github.com/inngest/inngest/pkg/sdk/sdk.go @@ -84,10 +84,12 @@ type RegisterRequest struct { } const ( + InBandSyncV1 string = "v1" TrustProbeV1 string = "v1" ) type Capabilities struct { + InBandSync string `json:"in_band_sync"` TrustProbe string `json:"trust_probe"` } diff --git a/vendor/github.com/inngest/inngest/pkg/telemetry/exporters/batch_processor.go b/vendor/github.com/inngest/inngest/pkg/telemetry/exporters/batch_processor.go new file mode 100644 index 00000000..e6ab32ff --- /dev/null +++ b/vendor/github.com/inngest/inngest/pkg/telemetry/exporters/batch_processor.go @@ -0,0 +1,246 @@ +package exporters + +import ( + "context" + "sync" + "time" + + "github.com/google/uuid" + "github.com/hashicorp/go-multierror" + "github.com/inngest/inngest/pkg/logger" + "github.com/inngest/inngest/pkg/telemetry/metrics" + "go.opentelemetry.io/otel/sdk/trace" +) + +const ( + defaultBatchMaxSize = 10_000 + defaultConcurrency = 100 + defaultBatchTimeout = 200 * time.Millisecond +) + +type BatchSpanProcessorOpt func(b *batchSpanProcessor) + +func WithBatchProcessorBufferSize(size int) BatchSpanProcessorOpt { + return func(b *batchSpanProcessor) { + if size > 0 { + b.maxSize = size + } + } +} + +func WithBatchProcessorInterval(timeout time.Duration) BatchSpanProcessorOpt { + return func(b *batchSpanProcessor) { + if timeout > 0 { + b.timeout = timeout + } + } +} + +func WithBatchProcessorConcurrency(c int) BatchSpanProcessorOpt { + return func(b *batchSpanProcessor) { + if c > 0 { + b.concurrency = c + } + } +} + +type batchSpanProcessor struct { + mt sync.RWMutex + exporter trace.SpanExporter + maxSize int + concurrency int + timeout time.Duration + buffer map[string][]trace.ReadOnlySpan + pointer uuid.UUID + in chan *trace.ReadOnlySpan + out chan string +} + +func NewBatchSpanProcessor(ctx context.Context, exporter trace.SpanExporter, opts ...BatchSpanProcessorOpt) trace.SpanProcessor { + p := &batchSpanProcessor{ + mt: sync.RWMutex{}, + exporter: exporter, + maxSize: defaultBatchMaxSize, + timeout: defaultBatchTimeout, + concurrency: defaultConcurrency, + buffer: map[string][]trace.ReadOnlySpan{}, + pointer: uuid.New(), + } + + for _, apply := range opts { + apply(p) + } + p.in = make(chan *trace.ReadOnlySpan, p.maxSize) + p.out = make(chan string, p.maxSize) + + // start process loop + for i := 0; i < p.concurrency; i++ { + go p.run(ctx) + } + go p.instrument(ctx) + + return p +} + +// No op +func (b *batchSpanProcessor) OnStart(ctx context.Context, s trace.ReadWriteSpan) {} + +func (b *batchSpanProcessor) OnEnd(s trace.ReadOnlySpan) { + // pass span into the channel + b.in <- &s + metrics.IncrSpanBatchProcessorEnqueuedCounter(context.TODO(), metrics.CounterOpt{PkgName: pkgName}) +} + +func (b *batchSpanProcessor) Shutdown(ctx context.Context) error { + if err := b.flush(ctx); err != nil { + logger.StdlibLogger(ctx).Error("error flushing spans on shutdown", "error", err) + } + return b.exporter.Shutdown(ctx) +} + +func (b *batchSpanProcessor) ForceFlush(ctx context.Context) error { + return b.flush(ctx) +} + +func (b *batchSpanProcessor) run(ctx context.Context) { + for { + select { + case id := <-b.out: + if err := b.send(ctx, id); err != nil { + logger.StdlibLogger(ctx).Error("error sending out batched spans", "error", err, "batch_id", id) + } + + case span := <-b.in: + b.append(ctx, span) + + case <-ctx.Done(): + if err := b.flush(ctx); err != nil { + logger.StdlibLogger(ctx).Error("error flushing spans on completion", "error", err) + } + return + } + } +} + +// append add the span into the buffer the pointer is currently pointing to +func (b *batchSpanProcessor) append(ctx context.Context, span *trace.ReadOnlySpan) { + b.mt.Lock() + defer b.mt.Unlock() + + p := b.pointer + buf, ok := b.buffer[p.String()] + if !ok { + buf = []trace.ReadOnlySpan{} + } + + buf = append(buf, *span) + b.buffer[p.String()] = buf + + switch len(buf) { + case 1: + // attempt to send the spans on timeout if this is a new batch + go b.sendLater(ctx, p.String()) + + case b.maxSize: + // reset buffer + newPointer := uuid.New() + b.pointer = newPointer + + // start execution + b.out <- p.String() + } +} + +// sendLater defers the sending after the timeout +func (b *batchSpanProcessor) sendLater(ctx context.Context, id string) { + <-time.After(b.timeout) + + // update the pointer to something else so it doesn't attempt to update the same buffer + b.mt.Lock() + defer b.mt.Unlock() + + // only update if the pointer value is still the same + if b.pointer.String() == id { + b.pointer = uuid.New() + } + + _, ok := b.buffer[id] + if !ok { + // already processed, not need to deal with it + return + } + + b.out <- id +} + +// send attempts to process the buffer of spans identified by id +func (b *batchSpanProcessor) send(ctx context.Context, id string) error { + b.mt.Lock() + // if the pointer and the id is still the same, change it so nothing can append to the same buffer + if b.pointer.String() == id { + b.pointer = uuid.New() + } + + spans, ok := b.buffer[id] + b.mt.Unlock() + + if !ok { + // likely already processed + return nil + } + + count := len(spans) + metrics.IncrSpanBatchProcessorAttemptCounter(ctx, int64(count), metrics.CounterOpt{PkgName: pkgName}) + + err := b.exporter.ExportSpans(ctx, spans) + if err != nil { + logger.StdlibLogger(ctx).Error("error batch exporting spans", "error", err, "id", id) + } + + // remove the buffer from the map so it doesn't build up memory + b.mt.Lock() + delete(b.buffer, id) + b.mt.Unlock() + + return err +} + +// flush attempts to send out all spans in the buffer +func (b *batchSpanProcessor) flush(ctx context.Context) error { + var errs error + + for id := range b.buffer { + if err := b.send(ctx, id); err != nil { + errs = multierror.Append(err, errs) + } + } + + return errs +} + +// instrument checks on the size of the buffer and keys used +// neither should be increasing over time +func (b *batchSpanProcessor) instrument(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + + default: + <-time.After(20 * time.Second) + + var keys, total int64 + + // count things very quickly + b.mt.Lock() + for _, spans := range b.buffer { + keys += 1 + total += int64(len(spans)) + } + b.mt.Unlock() + + metrics.GaugeSpanBatchProcessorBufferKeys(ctx, keys, metrics.GaugeOpt{PkgName: pkgName}) + metrics.GaugeSpanBatchProcessorBufferSize(ctx, total, metrics.GaugeOpt{PkgName: pkgName}) + } + } +} diff --git a/vendor/github.com/inngest/inngest/pkg/telemetry/exporters/nats.go b/vendor/github.com/inngest/inngest/pkg/telemetry/exporters/nats.go index 2f941f7b..707925f0 100644 --- a/vendor/github.com/inngest/inngest/pkg/telemetry/exporters/nats.go +++ b/vendor/github.com/inngest/inngest/pkg/telemetry/exporters/nats.go @@ -92,12 +92,10 @@ func (e *natsSpanExporter) ExportSpans(ctx context.Context, spans []trace.ReadOn } // publish to all subjects defined for _, subj := range e.subjects { - sub := subj - for _, sp := range spans { wg.Add(1) - go func(ctx context.Context, sp trace.ReadOnlySpan) { + go func(ctx context.Context, sub string, sp trace.ReadOnlySpan) { defer wg.Done() ts := sp.StartTime() @@ -112,7 +110,7 @@ func (e *natsSpanExporter) ExportSpans(ctx context.Context, spans []trace.ReadOn id, status, kind, attr, err := e.parseSpanAttributes(sp.Attributes()) if err != nil { - logger.StdlibLogger(ctx).Error("error parsing span attribures", + logger.StdlibLogger(ctx).Error("error parsing span attributes", "error", err, "spanAttr", sp.Attributes(), ) @@ -140,6 +138,10 @@ func (e *natsSpanExporter) ExportSpans(ctx context.Context, spans []trace.ReadOn logger.StdlibLogger(ctx).Error("error parsing span events", "error", err, "spanEvents", sp.Events(), + "acctID", id.AccountId, + "wsID", id.EnvId, + "wfID", id.FunctionId, + "runID", id.RunId, ) } @@ -187,6 +189,7 @@ func (e *natsSpanExporter) ExportSpans(ctx context.Context, spans []trace.ReadOn "wfID", id.FunctionId, "runID", id.RunId, ) + return } pstatus := "unknown" @@ -205,14 +208,14 @@ func (e *natsSpanExporter) ExportSpans(ctx context.Context, spans []trace.ReadOn ) } - metrics.IncrExportedSpansCounter(ctx, metrics.CounterOpt{ + metrics.IncrSpanExportedCounter(ctx, metrics.CounterOpt{ PkgName: pkgName, Tags: map[string]any{ "subject": sub, "status": pstatus, }, }) - }(ctx, sp) + }(ctx, subj, sp) } } diff --git a/vendor/github.com/inngest/inngest/pkg/telemetry/metrics/counter.go b/vendor/github.com/inngest/inngest/pkg/telemetry/metrics/counter.go index 27fd3e9e..77c1db1c 100644 --- a/vendor/github.com/inngest/inngest/pkg/telemetry/metrics/counter.go +++ b/vendor/github.com/inngest/inngest/pkg/telemetry/metrics/counter.go @@ -128,11 +128,29 @@ func IncrInstrumentationLeaseClaimsCounter(ctx context.Context, opts CounterOpt) }) } -func IncrExportedSpansCounter(ctx context.Context, opts CounterOpt) { +func IncrSpanExportedCounter(ctx context.Context, opts CounterOpt) { RecordCounterMetric(ctx, 1, CounterOpt{ PkgName: opts.PkgName, - MetricName: "exported_spans_total", + MetricName: "span_exported_total", Description: "Total number of run spans exported", Tags: opts.Tags, }) } + +func IncrSpanBatchProcessorEnqueuedCounter(ctx context.Context, opts CounterOpt) { + RecordCounterMetric(ctx, 1, CounterOpt{ + PkgName: opts.PkgName, + MetricName: "span_batch_processor_enqueued_total", + Description: "Total number of spans enqueued for batch processing", + Tags: opts.Tags, + }) +} + +func IncrSpanBatchProcessorAttemptCounter(ctx context.Context, incr int64, opts CounterOpt) { + RecordCounterMetric(ctx, incr, CounterOpt{ + PkgName: opts.PkgName, + MetricName: "span_batch_processor_attempt_total", + Description: "Total number of spans attempted to export", + Tags: opts.Tags, + }) +} diff --git a/vendor/github.com/inngest/inngest/pkg/telemetry/metrics/gauge.go b/vendor/github.com/inngest/inngest/pkg/telemetry/metrics/gauge.go index 626d98a5..5c712925 100644 --- a/vendor/github.com/inngest/inngest/pkg/telemetry/metrics/gauge.go +++ b/vendor/github.com/inngest/inngest/pkg/telemetry/metrics/gauge.go @@ -74,3 +74,21 @@ func GaugeQueueShardPartitionAvailableCount(ctx context.Context, val int64, opts Tags: opts.Tags, }) } + +func GaugeSpanBatchProcessorBufferSize(ctx context.Context, val int64, opts GaugeOpt) { + RecordGaugeMetric(ctx, val, GaugeOpt{ + PkgName: opts.PkgName, + MetricName: "span_batch_processor_buffer_size", + Description: "The number of items in buffer point in time", + Tags: opts.Tags, + }) +} + +func GaugeSpanBatchProcessorBufferKeys(ctx context.Context, val int64, opts GaugeOpt) { + RecordGaugeMetric(ctx, val, GaugeOpt{ + PkgName: opts.PkgName, + MetricName: "span_batch_processor_buffer_keys", + Description: "The number of keys used in buffer point in time", + Tags: opts.Tags, + }) +} diff --git a/vendor/github.com/inngest/inngest/pkg/telemetry/trace/tracer.go b/vendor/github.com/inngest/inngest/pkg/telemetry/trace/tracer.go index 1e868a55..daba43b8 100644 --- a/vendor/github.com/inngest/inngest/pkg/telemetry/trace/tracer.go +++ b/vendor/github.com/inngest/inngest/pkg/telemetry/trace/tracer.go @@ -6,6 +6,7 @@ import ( "os" "strconv" "sync" + "time" "github.com/inngest/inngest/pkg/consts" "github.com/inngest/inngest/pkg/inngest/log" @@ -290,7 +291,7 @@ func newOTLPHTTPTraceProvider(ctx context.Context, opts TracerOpts) (Tracer, err return nil, fmt.Errorf("error create otlp http trace client: %w", err) } - sp := trace.NewBatchSpanProcessor(exp) + sp := trace.NewBatchSpanProcessor(exp, trace.WithBatchTimeout(100*time.Millisecond)) tp := trace.NewTracerProvider( trace.WithSpanProcessor(sp), trace.WithResource(resource.NewWithAttributes( @@ -385,7 +386,38 @@ func newNatsTraceProvider(ctx context.Context, opts TracerOpts) (Tracer, error) return nil, fmt.Errorf("error creating NATS trace client: %w", err) } - sp := trace.NewBatchSpanProcessor(exp) + // configure options + bopts := []exporters.BatchSpanProcessorOpt{} + { + val := os.Getenv("SPAN_BATCH_PROCESSOR_BUFFER_SIZE") + if val != "" { + bufferSize, err := strconv.Atoi(val) + if err == nil && bufferSize > 0 { + bopts = append(bopts, exporters.WithBatchProcessorBufferSize(bufferSize)) + } + } + } + + { + val := os.Getenv("SPAN_BATCH_PROCESSOR_INTERVAL") + if val != "" { + if dur, err := time.ParseDuration(val); err == nil { + bopts = append(bopts, exporters.WithBatchProcessorInterval(dur)) + } + } + } + + { + val := os.Getenv("SPAN_BATCH_PROCESSOR_CONCURRENCY") + if val != "" { + c, err := strconv.Atoi(val) + if err == nil && c > 0 { + bopts = append(bopts, exporters.WithBatchProcessorConcurrency(c)) + } + } + } + + sp := exporters.NewBatchSpanProcessor(ctx, exp, bopts...) tp := trace.NewTracerProvider( trace.WithSpanProcessor(sp), trace.WithResource(resource.NewWithAttributes( diff --git a/vendor/modules.txt b/vendor/modules.txt index 5538f9b3..fdd8fd97 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -89,7 +89,7 @@ github.com/hashicorp/hcl/json/token # github.com/inngest/expr v0.0.0-20240717151033-03e4378c436c ## explicit; go 1.21.0 github.com/inngest/expr -# github.com/inngest/inngest v0.30.0-beta-2.0.20240905105759-4df2bfd0dd29 +# github.com/inngest/inngest v0.30.0-beta-2.0.20240912142400-4c207d8fb0ce ## explicit; go 1.21.0 github.com/inngest/inngest/pkg/consts github.com/inngest/inngest/pkg/dateutil From 8a67f8e22ae531d29fb3f543138ebcdeabf434a7 Mon Sep 17 00:00:00 2001 From: Aaron Harper Date: Thu, 12 Sep 2024 12:39:13 -0400 Subject: [PATCH 2/3] Add tests --- env.go | 4 +- handler.go | 236 +++++++++++++++++++++++++----------------------- handler_test.go | 199 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 324 insertions(+), 115 deletions(-) diff --git a/env.go b/env.go index b3128f0d..d35f1822 100644 --- a/env.go +++ b/env.go @@ -8,6 +8,8 @@ import ( const ( devServerURL = "http://127.0.0.1:8288" + + envKeyAllowInBandSync = "INNGEST_ALLOW_IN_BAND_SYNC" ) // IsDev returns whether to use the dev server, by checking the presence of the INNGEST_DEV @@ -35,7 +37,7 @@ func DevServerURL() string { } func allowInBandSync() bool { - val := os.Getenv("INNGEST_ALLOW_IN_BAND_SYNC") + val := os.Getenv(envKeyAllowInBandSync) if val == "" { // TODO: Default to true once in-band syncing is stable return false diff --git a/handler.go b/handler.go index 870cb979..8658ab82 100644 --- a/handler.go +++ b/handler.go @@ -254,7 +254,12 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if err := h.register(w, r); err != nil { h.Logger.Error("error registering functions", "error", err.Error()) - w.WriteHeader(500) + status := http.StatusInternalServerError + if err, ok := err.(publicerr.Error); ok { + status = err.Status + } + w.WriteHeader(status) + w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]string{ "message": err.Error(), @@ -316,8 +321,8 @@ func (h *handler) inBandSync( if !IsDev() { if sig = r.Header.Get(HeaderKeySignature); sig == "" { return publicerr.Error{ - Message: fmt.Sprintf("missing %s header", HeaderKeySignature), - Status: 401, + Err: fmt.Errorf("missing %s header", HeaderKeySignature), + Status: 401, } } } @@ -329,8 +334,8 @@ func (h *handler) inBandSync( reqByt, err := io.ReadAll(http.MaxBytesReader(w, r.Body, int64(max))) if err != nil { return publicerr.Error{ - Message: "error reading request body", - Status: 500, + Err: fmt.Errorf("error reading request body"), + Status: 500, } } @@ -343,14 +348,14 @@ func (h *handler) inBandSync( ) if err != nil { return publicerr.Error{ - Message: "error validating signature", - Status: 401, + Err: fmt.Errorf("error validating signature"), + Status: 401, } } if !valid { return publicerr.Error{ - Message: "invalid signature", - Status: 401, + Err: fmt.Errorf("invalid signature"), + Status: 401, } } @@ -358,23 +363,23 @@ func (h *handler) inBandSync( err = json.Unmarshal(reqByt, &reqBody) if err != nil { return publicerr.Error{ - Message: fmt.Errorf("malformed input: %w", err).Error(), - Status: 400, + Err: fmt.Errorf("malformed input: %w", err), + Status: 400, } } err = reqBody.Validate() if err != nil { return publicerr.Error{ - Message: fmt.Errorf("malformed input: %w", err).Error(), - Status: 400, + Err: fmt.Errorf("malformed input: %w", err), + Status: 400, } } appURL, err := url.Parse(reqBody.URL) if err != nil { return publicerr.Error{ - Message: fmt.Errorf("malformed input: %w", err).Error(), - Status: 400, + Err: fmt.Errorf("malformed input: %w", err), + Status: 400, } } if h.URL != nil { @@ -433,105 +438,6 @@ func (h *handler) inBandSync( return nil } -func createFunctionConfigs( - appName string, - fns []ServableFunction, - appURL url.URL, -) ([]sdk.SDKFunction, error) { - if appName == "" { - return nil, fmt.Errorf("missing app name") - } - if appURL == (url.URL{}) { - return nil, fmt.Errorf("missing URL") - } - - fnConfigs := make([]sdk.SDKFunction, len(fns)) - for i, fn := range fns { - c := fn.Config() - - var retries *sdk.StepRetries - if c.Retries != nil { - retries = &sdk.StepRetries{ - Attempts: *c.Retries, - } - } - - // Modify URL to contain fn ID, step params - values := appURL.Query() - values.Set("fnId", fn.Slug()) - values.Set("step", "step") - appURL.RawQuery = values.Encode() - - f := sdk.SDKFunction{ - Name: fn.Name(), - Slug: appName + "-" + fn.Slug(), - Idempotency: c.Idempotency, - Priority: fn.Config().Priority, - Triggers: inngest.MultipleTriggers{}, - RateLimit: fn.Config().GetRateLimit(), - Cancel: fn.Config().Cancel, - Timeouts: (*inngest.Timeouts)(fn.Config().Timeouts), - Throttle: (*inngest.Throttle)(fn.Config().Throttle), - Steps: map[string]sdk.SDKStep{ - "step": { - ID: "step", - Name: fn.Name(), - Retries: retries, - Runtime: map[string]any{ - "url": appURL.String(), - }, - }, - }, - } - - if c.Debounce != nil { - f.Debounce = &inngest.Debounce{ - Key: &c.Debounce.Key, - Period: c.Debounce.Period.String(), - } - if c.Debounce.Timeout != nil { - str := c.Debounce.Timeout.String() - f.Debounce.Timeout = &str - } - } - - if c.BatchEvents != nil { - f.EventBatch = map[string]any{ - "maxSize": c.BatchEvents.MaxSize, - "timeout": c.BatchEvents.Timeout, - "key": c.BatchEvents.Key, - } - } - - if len(c.Concurrency) > 0 { - // Marshal as an array, as the sdk/handler unmarshals correctly. - f.Concurrency = &inngest.ConcurrencyLimits{Limits: c.Concurrency} - } - - triggers := fn.Trigger().Triggers() - for _, trigger := range triggers { - if trigger.EventTrigger != nil { - f.Triggers = append(f.Triggers, inngest.Trigger{ - EventTrigger: &inngest.EventTrigger{ - Event: trigger.Event, - Expression: trigger.Expression, - }, - }) - } else { - f.Triggers = append(f.Triggers, inngest.Trigger{ - CronTrigger: &inngest.CronTrigger{ - Cron: trigger.Cron, - }, - }) - } - } - - fnConfigs[i] = f - } - - return fnConfigs, nil -} - func (h *handler) outOfBandSync(w http.ResponseWriter, r *http.Request) error { h.l.Lock() defer h.l.Unlock() @@ -630,6 +536,9 @@ func (h *handler) outOfBandSync(w http.ResponseWriter, r *http.Request) error { } return fmt.Errorf("Error registering functions: %s", body["error"]) } + + w.Header().Add(HeaderKeySyncKind, SyncKindOutOfBand) + return nil } @@ -647,6 +556,105 @@ func (h *handler) url(r *http.Request) *url.URL { return u } +func createFunctionConfigs( + appName string, + fns []ServableFunction, + appURL url.URL, +) ([]sdk.SDKFunction, error) { + if appName == "" { + return nil, fmt.Errorf("missing app name") + } + if appURL == (url.URL{}) { + return nil, fmt.Errorf("missing URL") + } + + fnConfigs := make([]sdk.SDKFunction, len(fns)) + for i, fn := range fns { + c := fn.Config() + + var retries *sdk.StepRetries + if c.Retries != nil { + retries = &sdk.StepRetries{ + Attempts: *c.Retries, + } + } + + // Modify URL to contain fn ID, step params + values := appURL.Query() + values.Set("fnId", fn.Slug()) + values.Set("step", "step") + appURL.RawQuery = values.Encode() + + f := sdk.SDKFunction{ + Name: fn.Name(), + Slug: appName + "-" + fn.Slug(), + Idempotency: c.Idempotency, + Priority: fn.Config().Priority, + Triggers: inngest.MultipleTriggers{}, + RateLimit: fn.Config().GetRateLimit(), + Cancel: fn.Config().Cancel, + Timeouts: (*inngest.Timeouts)(fn.Config().Timeouts), + Throttle: (*inngest.Throttle)(fn.Config().Throttle), + Steps: map[string]sdk.SDKStep{ + "step": { + ID: "step", + Name: fn.Name(), + Retries: retries, + Runtime: map[string]any{ + "url": appURL.String(), + }, + }, + }, + } + + if c.Debounce != nil { + f.Debounce = &inngest.Debounce{ + Key: &c.Debounce.Key, + Period: c.Debounce.Period.String(), + } + if c.Debounce.Timeout != nil { + str := c.Debounce.Timeout.String() + f.Debounce.Timeout = &str + } + } + + if c.BatchEvents != nil { + f.EventBatch = map[string]any{ + "maxSize": c.BatchEvents.MaxSize, + "timeout": c.BatchEvents.Timeout, + "key": c.BatchEvents.Key, + } + } + + if len(c.Concurrency) > 0 { + // Marshal as an array, as the sdk/handler unmarshals correctly. + f.Concurrency = &inngest.ConcurrencyLimits{Limits: c.Concurrency} + } + + triggers := fn.Trigger().Triggers() + for _, trigger := range triggers { + if trigger.EventTrigger != nil { + f.Triggers = append(f.Triggers, inngest.Trigger{ + EventTrigger: &inngest.EventTrigger{ + Event: trigger.Event, + Expression: trigger.Expression, + }, + }) + } else { + f.Triggers = append(f.Triggers, inngest.Trigger{ + CronTrigger: &inngest.CronTrigger{ + Cron: trigger.Cron, + }, + }) + } + } + + fnConfigs[i] = f + } + + return fnConfigs, nil +} + // invoke handles incoming POST calls to invoke a function, delegating to invoke() after validating // the request. func (h *handler) invoke(w http.ResponseWriter, r *http.Request) error { diff --git a/handler_test.go b/handler_test.go index dbf8e12a..99679c2c 100644 --- a/handler_test.go +++ b/handler_test.go @@ -18,6 +18,7 @@ import ( "github.com/inngest/inngest/pkg/enums" "github.com/inngest/inngest/pkg/execution/state" "github.com/inngest/inngest/pkg/inngest" + "github.com/inngest/inngest/pkg/sdk" "github.com/inngest/inngestgo/internal/sdkrequest" "github.com/inngest/inngestgo/step" "github.com/stretchr/testify/require" @@ -552,6 +553,200 @@ func TestIntrospection(t *testing.T) { }) } +func TestInBandSync(t *testing.T) { + os.Setenv(envKeyAllowInBandSync, "1") + defer os.Unsetenv(envKeyAllowInBandSync) + + appID := "test-in-band-sync" + + fn := CreateFunction( + FunctionOpts{Name: "my-fn"}, + EventTrigger("my-event", nil), + func(ctx context.Context, input Input[any]) (any, error) { + return nil, nil + }, + ) + h := NewHandler(appID, HandlerOpts{ + Env: toPtr("my-env"), + }) + h.Register(fn) + server := httptest.NewServer(h) + defer server.Close() + + reqBodyByt, _ := json.Marshal(inBandSynchronizeRequest{ + URL: "http://test.local", + }) + + t.Run("success", func(t *testing.T) { + // SDK responds with sync data when receiving a valid in-band sync + // request + + r := require.New(t) + ctx := context.Background() + + sig, _ := Sign(ctx, time.Now(), []byte(testKey), reqBodyByt) + req, err := http.NewRequest( + http.MethodPut, + server.URL, + bytes.NewReader(reqBodyByt), + ) + r.NoError(err) + req.Header.Set("x-inngest-signature", sig) + req.Header.Set("x-inngest-sync-kind", "in_band") + resp, err := http.DefaultClient.Do(req) + r.NoError(err) + r.Equal(http.StatusOK, resp.StatusCode) + r.Equal(resp.Header.Get("x-inngest-sync-kind"), "in_band") + + var respBody inBandSynchronizeResponse + err = json.NewDecoder(resp.Body).Decode(&respBody) + r.NoError(err) + + r.Equal( + inBandSynchronizeResponse{ + AppID: appID, + Env: toPtr("my-env"), + Functions: []sdk.SDKFunction{{ + Name: "my-fn", + Slug: fmt.Sprintf("%s-my-fn", appID), + Steps: map[string]sdk.SDKStep{ + "step": { + ID: "step", + Name: "my-fn", + Runtime: map[string]any{ + "url": "http://test.local?fnId=my-fn&step=step", + }, + }, + }, + Triggers: []inngest.Trigger{EventTrigger("my-event", nil)}, + }}, + Inspection: map[string]any{ + "capabilities": map[string]any{ + "in_band_sync": "v1", + "trust_probe": "v1", + }, + "function_count": float64(1), + "has_event_key": false, + "has_signing_key": true, + "mode": "cloud", + "signing_key_fallback_hash": "signkey-test-df3f619804a92fdb4057192dc43dd748ea778adc52bc498ce80524c014b81119", + "signing_key_hash": "signkey-test-b2ed992186a5cb19f6668aade821f502c1d00970dfd0e35128d51bac4649916c", + }, + SDKAuthor: "inngest", + SDKLanguage: "go", + SDKVersion: SDKVersion, + URL: "http://test.local", + }, + respBody, + ) + }) + + t.Run("invalid signature", func(t *testing.T) { + // SDK responds with an error when receiving an in-band sync request + // with an invalid signature + + r := require.New(t) + ctx := context.Background() + + invalidKey := "deadbeef" + sig, _ := Sign(ctx, time.Now(), []byte(invalidKey), reqBodyByt) + req, err := http.NewRequest( + http.MethodPut, + server.URL, + bytes.NewReader(reqBodyByt), + ) + r.NoError(err) + req.Header.Set("x-inngest-signature", sig) + req.Header.Set("x-inngest-sync-kind", "in_band") + resp, err := http.DefaultClient.Do(req) + r.NoError(err) + r.Equal(http.StatusUnauthorized, resp.StatusCode) + r.Equal(resp.Header.Get("x-inngest-sync-kind"), "") + + var respBody map[string]any + err = json.NewDecoder(resp.Body).Decode(&respBody) + r.NoError(err) + + r.Equal(map[string]any{ + "message": "error validating signature", + }, respBody) + }) + + t.Run("missing signature", func(t *testing.T) { + // SDK responds with an error when receiving an in-band sync request + // with a missing signature + + r := require.New(t) + + req, err := http.NewRequest( + http.MethodPut, + server.URL, + bytes.NewReader(reqBodyByt), + ) + r.NoError(err) + req.Header.Set("x-inngest-sync-kind", "in_band") + resp, err := http.DefaultClient.Do(req) + r.NoError(err) + r.Equal(http.StatusUnauthorized, resp.StatusCode) + r.Equal(resp.Header.Get("x-inngest-sync-kind"), "") + + var respBody map[string]any + err = json.NewDecoder(resp.Body).Decode(&respBody) + r.NoError(err) + + r.Equal(map[string]any{ + "message": "missing X-Inngest-Signature header", + }, respBody) + }) + + t.Run("missing sync kind header", func(t *testing.T) { + // SDK attempts an out-of-band sync when the sync kind header is missing + + r := require.New(t) + ctx := context.Background() + + // Create a simple Go HTTP mockCloud that responds with hello world + mockCloud := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"ok":true,"modified":true}`)) + })) + defer mockCloud.Close() + + appID := "test-in-band-sync-missing-header" + fn := CreateFunction( + FunctionOpts{Name: "my-fn"}, + EventTrigger("my-event", nil), + func(ctx context.Context, input Input[any]) (any, error) { + return nil, nil + }, + ) + h := NewHandler(appID, HandlerOpts{ + Env: toPtr("my-env"), + RegisterURL: &mockCloud.URL, + }) + h.Register(fn) + server := httptest.NewServer(h) + defer server.Close() + + sig, _ := Sign(ctx, time.Now(), []byte(testKey), reqBodyByt) + req, err := http.NewRequest( + http.MethodPut, + server.URL, + bytes.NewReader(reqBodyByt), + ) + r.NoError(err) + req.Header.Set("x-inngest-signature", sig) + resp, err := http.DefaultClient.Do(req) + r.NoError(err) + r.Equal(http.StatusOK, resp.StatusCode) + r.Equal("out_of_band", resp.Header.Get("x-inngest-sync-kind")) + + respByt, err := io.ReadAll(resp.Body) + r.NoError(err) + r.Equal("", string(respByt)) + }) +} + func createRequest(t *testing.T, evt any) *sdkrequest.Request { t.Helper() @@ -611,3 +806,7 @@ func marshalRequest(t *testing.T, r *sdkrequest.Request) []byte { require.NoError(t, err) return byt } + +func toPtr[T any](v T) *T { + return &v +} From ee526dc1466acf96c153cc88bb64babe246bf46a Mon Sep 17 00:00:00 2001 From: Aaron Harper Date: Thu, 12 Sep 2024 12:49:33 -0400 Subject: [PATCH 3/3] Fix failing test --- handler_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/handler_test.go b/handler_test.go index 99679c2c..8c37807f 100644 --- a/handler_test.go +++ b/handler_test.go @@ -513,7 +513,8 @@ func TestIntrospection(t *testing.T) { r.NoError(err) r.Equal(map[string]any{ "capabilities": map[string]any{ - "trust_probe": "v1", + "in_band_sync": "v1", + "trust_probe": "v1", }, "function_count": float64(1), "has_event_key": false,