Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: debug replay for ticketswap #5380

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,22 @@
"time"

"github.com/google/uuid"

"github.com/rudderlabs/rudder-server/enterprise/trackedusers"

"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/stringify"

jsoniter "github.com/json-iterator/go"
"github.com/samber/lo"
"github.com/tidwall/gjson"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/ro"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/stats/metric"
"github.com/rudderlabs/rudder-go-kit/stringify"
kitsync "github.com/rudderlabs/rudder-go-kit/sync"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/enterprise/trackedusers"
"github.com/rudderlabs/rudder-server/internal/enricher"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/processor/delayed"
Expand Down Expand Up @@ -3296,7 +3292,10 @@
// in case of custom transformations metadata of first event is returned along with all events in session
// source_id will be same for all events belong to same user in a session
metadata := response.Events[i].Metadata

if metadata.OriginalSourceID != "" {
proc.logger.Infof("OriginalSourceID: %v", metadata.OriginalSourceID)
proc.logger.Infof("SourceID: %v", metadata.SourceID)
}

Check warning on line 3298 in processor/processor.go

View check run for this annotation

Codecov / codecov/patch

processor/processor.go#L3296-L3298

Added lines #L3296 - L3298 were not covered by tests
sourceID := metadata.SourceID
destID := metadata.DestinationID
rudderID := metadata.RudderID
Expand Down
12 changes: 11 additions & 1 deletion processor/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,17 @@
return Response{}
}
// flip sourceID and originalSourceID if it's a replay source for the purpose of any user transformation
// flip back afterwards
// flip back afterward
fmt.Println("before swapping in clientEvents")
for i := range clientEvents {
if clientEvents[i].Metadata.OriginalSourceID != "" {
fmt.Println("Metadata for client events: ", clientEvents[i].Metadata)
fmt.Println("Original Source ID: ", clientEvents[i].Metadata.OriginalSourceID)
fmt.Println("Source ID: ", clientEvents[i].Metadata.SourceID)

Check warning on line 265 in processor/transformer/transformer.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/transformer.go#L263-L265

Added lines #L263 - L265 were not covered by tests
clientEvents[i].Metadata.OriginalSourceID, clientEvents[i].Metadata.SourceID = clientEvents[i].Metadata.SourceID, clientEvents[i].Metadata.OriginalSourceID
}
}
fmt.Println("after swapping in clientEvents")
sTags := stats.Tags{
"dest_type": clientEvents[0].Destination.DestinationDefinition.Name,
"dest_id": clientEvents[0].Destination.ID,
Expand Down Expand Up @@ -316,11 +321,15 @@
var outClientEvents []TransformerResponse
var failedEvents []TransformerResponse

fmt.Println("before swapping in transformResponse")
for _, batch := range transformResponse {
// Transform is one to many mapping so returned
// response for each is an array. We flatten it out
for _, transformerResponse := range batch {
if transformerResponse.Metadata.OriginalSourceID != "" {
fmt.Println("Metadata for transformer response: ", transformerResponse.Metadata)
fmt.Println("Original Source ID: ", transformerResponse.Metadata.OriginalSourceID)
fmt.Println("Source ID: ", transformerResponse.Metadata.SourceID)

Check warning on line 332 in processor/transformer/transformer.go

View check run for this annotation

Codecov / codecov/patch

processor/transformer/transformer.go#L330-L332

Added lines #L330 - L332 were not covered by tests
transformerResponse.Metadata.SourceID, transformerResponse.Metadata.OriginalSourceID = transformerResponse.Metadata.OriginalSourceID, transformerResponse.Metadata.SourceID
}
switch transformerResponse.StatusCode {
Expand All @@ -331,6 +340,7 @@
}
}
}
fmt.Println("after swapping in transformResponse")

trans.sentStat.Count(len(clientEvents))
trans.receivedStat.Count(len(outClientEvents))
Expand Down
Loading