diff --git a/processor/processor.go b/processor/processor.go index cbf40de16b..a4ffd3ef02 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -13,16 +13,10 @@ import ( "time" "github.com/google/uuid" - - "github.com/rudderlabs/rudder-server/enterprise/trackedusers" - - "golang.org/x/sync/errgroup" - - "github.com/rudderlabs/rudder-go-kit/stringify" - jsoniter "github.com/json-iterator/go" "github.com/samber/lo" "github.com/tidwall/gjson" + "golang.org/x/sync/errgroup" "github.com/rudderlabs/rudder-go-kit/bytesize" "github.com/rudderlabs/rudder-go-kit/config" @@ -30,9 +24,11 @@ import ( "github.com/rudderlabs/rudder-go-kit/ro" "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-go-kit/stats/metric" + "github.com/rudderlabs/rudder-go-kit/stringify" kitsync "github.com/rudderlabs/rudder-go-kit/sync" backendconfig "github.com/rudderlabs/rudder-server/backend-config" + "github.com/rudderlabs/rudder-server/enterprise/trackedusers" "github.com/rudderlabs/rudder-server/internal/enricher" "github.com/rudderlabs/rudder-server/jobsdb" "github.com/rudderlabs/rudder-server/processor/delayed" @@ -3296,7 +3292,10 @@ func (proc *Handle) transformSrcDest( // in case of custom transformations metadata of first event is returned along with all events in session // source_id will be same for all events belong to same user in a session metadata := response.Events[i].Metadata - + if metadata.OriginalSourceID != "" { + proc.logger.Infof("OriginalSourceID: %v", metadata.OriginalSourceID) + proc.logger.Infof("SourceID: %v", metadata.SourceID) + } sourceID := metadata.SourceID destID := metadata.DestinationID rudderID := metadata.RudderID diff --git a/processor/transformer/transformer.go b/processor/transformer/transformer.go index 8af9ec60e1..93706bb5c5 100644 --- a/processor/transformer/transformer.go +++ b/processor/transformer/transformer.go @@ -256,12 +256,17 @@ func (trans *handle) transform( return Response{} } // flip sourceID and originalSourceID if it's a replay source for the purpose of any user transformation - // flip back afterwards + // flip back afterward + fmt.Println("before swapping in clientEvents") for i := range clientEvents { if clientEvents[i].Metadata.OriginalSourceID != "" { + fmt.Println("Metadata for client events: ", clientEvents[i].Metadata) + fmt.Println("Original Source ID: ", clientEvents[i].Metadata.OriginalSourceID) + fmt.Println("Source ID: ", clientEvents[i].Metadata.SourceID) clientEvents[i].Metadata.OriginalSourceID, clientEvents[i].Metadata.SourceID = clientEvents[i].Metadata.SourceID, clientEvents[i].Metadata.OriginalSourceID } } + fmt.Println("after swapping in clientEvents") sTags := stats.Tags{ "dest_type": clientEvents[0].Destination.DestinationDefinition.Name, "dest_id": clientEvents[0].Destination.ID, @@ -316,11 +321,15 @@ func (trans *handle) transform( var outClientEvents []TransformerResponse var failedEvents []TransformerResponse + fmt.Println("before swapping in transformResponse") for _, batch := range transformResponse { // Transform is one to many mapping so returned // response for each is an array. We flatten it out for _, transformerResponse := range batch { if transformerResponse.Metadata.OriginalSourceID != "" { + fmt.Println("Metadata for transformer response: ", transformerResponse.Metadata) + fmt.Println("Original Source ID: ", transformerResponse.Metadata.OriginalSourceID) + fmt.Println("Source ID: ", transformerResponse.Metadata.SourceID) transformerResponse.Metadata.SourceID, transformerResponse.Metadata.OriginalSourceID = transformerResponse.Metadata.OriginalSourceID, transformerResponse.Metadata.SourceID } switch transformerResponse.StatusCode { @@ -331,6 +340,7 @@ func (trans *handle) transform( } } } + fmt.Println("after swapping in transformResponse") trans.sentStat.Count(len(clientEvents)) trans.receivedStat.Count(len(outClientEvents))