Skip to content

Commit

Permalink
chore: debug replay for ticketswap
Browse files Browse the repository at this point in the history
  • Loading branch information
BonapartePC authored and cisse21 committed Dec 18, 2024
1 parent 7425659 commit 8fce887
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 9 deletions.
15 changes: 7 additions & 8 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,22 @@ import (
"time"

"github.com/google/uuid"

"github.com/rudderlabs/rudder-server/enterprise/trackedusers"

"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/stringify"

jsoniter "github.com/json-iterator/go"
"github.com/samber/lo"
"github.com/tidwall/gjson"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/ro"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/stats/metric"
"github.com/rudderlabs/rudder-go-kit/stringify"
kitsync "github.com/rudderlabs/rudder-go-kit/sync"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/enterprise/trackedusers"
"github.com/rudderlabs/rudder-server/internal/enricher"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/processor/delayed"
Expand Down Expand Up @@ -3296,7 +3292,10 @@ func (proc *Handle) transformSrcDest(
// in case of custom transformations metadata of first event is returned along with all events in session
// source_id will be same for all events belong to same user in a session
metadata := response.Events[i].Metadata

if metadata.OriginalSourceID != "" {
proc.logger.Infof("OriginalSourceID: %v", metadata.OriginalSourceID)
proc.logger.Infof("SourceID: %v", metadata.SourceID)
}

Check warning on line 3298 in processor/processor.go

View check run for this annotation

Codecov / codecov/patch

processor/processor.go#L3296-L3298

Added lines #L3296 - L3298 were not covered by tests
sourceID := metadata.SourceID
destID := metadata.DestinationID
rudderID := metadata.RudderID
Expand Down
12 changes: 11 additions & 1 deletion processor/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,17 @@ func (trans *handle) transform(
return Response{}
}
// flip sourceID and originalSourceID if it's a replay source for the purpose of any user transformation
// flip back afterwards
// flip back afterward
fmt.Println("before swapping in clientEvents")
for i := range clientEvents {
if clientEvents[i].Metadata.OriginalSourceID != "" {
fmt.Println("Metadata for client events: ", clientEvents[i].Metadata)
fmt.Println("Original Source ID: ", clientEvents[i].Metadata.OriginalSourceID)
fmt.Println("Source ID: ", clientEvents[i].Metadata.SourceID)

Check warning on line 265 in processor/transformer/transformer.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/transformer.go#L263-L265

Added lines #L263 - L265 were not covered by tests
clientEvents[i].Metadata.OriginalSourceID, clientEvents[i].Metadata.SourceID = clientEvents[i].Metadata.SourceID, clientEvents[i].Metadata.OriginalSourceID
}
}
fmt.Println("after swapping in clientEvents")
sTags := stats.Tags{
"dest_type": clientEvents[0].Destination.DestinationDefinition.Name,
"dest_id": clientEvents[0].Destination.ID,
Expand Down Expand Up @@ -316,11 +321,15 @@ func (trans *handle) transform(
var outClientEvents []TransformerResponse
var failedEvents []TransformerResponse

fmt.Println("before swapping in transformResponse")
for _, batch := range transformResponse {
// Transform is one to many mapping so returned
// response for each is an array. We flatten it out
for _, transformerResponse := range batch {
if transformerResponse.Metadata.OriginalSourceID != "" {
fmt.Println("Metadata for transformer response: ", transformerResponse.Metadata)
fmt.Println("Original Source ID: ", transformerResponse.Metadata.OriginalSourceID)
fmt.Println("Source ID: ", transformerResponse.Metadata.SourceID)

Check warning on line 332 in processor/transformer/transformer.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/transformer.go#L330-L332

Added lines #L330 - L332 were not covered by tests
transformerResponse.Metadata.SourceID, transformerResponse.Metadata.OriginalSourceID = transformerResponse.Metadata.OriginalSourceID, transformerResponse.Metadata.SourceID
}
switch transformerResponse.StatusCode {
Expand All @@ -331,6 +340,7 @@ func (trans *handle) transform(
}
}
}
fmt.Println("after swapping in transformResponse")

trans.sentStat.Count(len(clientEvents))
trans.receivedStat.Count(len(outClientEvents))
Expand Down

0 comments on commit 8fce887

Please sign in to comment.