From fa8585ef0fc93b7deb049ef8a7c31bc1bc4b48e6 Mon Sep 17 00:00:00 2001 From: Dilip Kola <33080863+koladilip@users.noreply.github.com> Date: Thu, 5 Dec 2024 15:57:33 +0530 Subject: [PATCH 1/8] fix: struct tag for connection details reporting type (#5342) --- utils/types/reporting_types.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/types/reporting_types.go b/utils/types/reporting_types.go index 2486dd1ace5..5168c4d0383 100644 --- a/utils/types/reporting_types.go +++ b/utils/types/reporting_types.go @@ -159,7 +159,7 @@ type ConnectionDetails struct { SourceJobID string `json:"sourceJobId"` SourceJobRunID string `json:"sourceJobRunId"` SourceDefinitionID string `json:"sourceDefinitionId"` - DestinationDefinitionID string `string:"destinationDefinitionId"` + DestinationDefinitionID string `json:"DestinationDefinitionId"` SourceCategory string `json:"sourceCategory"` TransformationID string `json:"transformationId"` TransformationVersionID string `json:"transformationVersionId"` From fc1684db5121bef29a6cb8870ac093398ab8dada Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 5 Dec 2024 16:38:00 +0530 Subject: [PATCH 2/8] chore(deps): bump the go-deps group with 2 updates (#5343) --- go.mod | 4 ++-- go.sum | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 6de13ceb878..7bff5fdb88c 100644 --- a/go.mod +++ b/go.mod @@ -106,10 +106,10 @@ require ( go.uber.org/mock v0.5.0 golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 golang.org/x/oauth2 v0.24.0 - golang.org/x/sync v0.9.0 + golang.org/x/sync v0.10.0 google.golang.org/api v0.209.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697 - google.golang.org/grpc v1.68.0 + google.golang.org/grpc v1.68.1 google.golang.org/protobuf v1.35.2 ) diff --git a/go.sum b/go.sum index 7e6b9a9c9f6..913d8ee74d5 100644 --- a/go.sum +++ b/go.sum @@ -1533,8 +1533,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ= -golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1900,8 +1900,8 @@ google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9K google.golang.org/grpc v1.40.1/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ= -google.golang.org/grpc v1.68.0 h1:aHQeeJbo8zAkAa3pRzrVjZlbz6uSfeOXlJNQM0RAbz0= -google.golang.org/grpc v1.68.0/go.mod h1:fmSPC5AsjSBCK54MyHRx48kpOti1/jRfOlwEWywNjWA= +google.golang.org/grpc v1.68.1 h1:oI5oTa11+ng8r8XMMN7jAOmWfPZWbYpCFaMUTACxkM0= +google.golang.org/grpc v1.68.1/go.mod h1:+q1XYFJjShcqn0QZHvCyeR4CXPA+llXIeUIfIe00waw= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/grpc/stats/opentelemetry v0.0.0-20240907200651-3ffb98b2c93a h1:UIpYSuWdWHSzjwcAFRLjKcPXFZVVLXGEM23W+NWqipw= google.golang.org/grpc/stats/opentelemetry v0.0.0-20240907200651-3ffb98b2c93a/go.mod h1:9i1T9n4ZinTUZGgzENMi8MDDgbGC5mqTS75JAv6xN3A= From 815dded7477ebd9a2c4e98038c5358d5295089df Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 5 Dec 2024 11:35:59 +0000 Subject: [PATCH 3/8] chore(deps): bump the frequent group across 1 directory with 2 updates (#5344) --- go.mod | 10 +++++----- go.sum | 24 ++++++++++++------------ 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/go.mod b/go.mod index 7bff5fdb88c..8974b7b16b8 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ replace ( require ( cloud.google.com/go/bigquery v1.64.0 - cloud.google.com/go/pubsub v1.45.1 + cloud.google.com/go/pubsub v1.45.3 cloud.google.com/go/storage v1.47.0 github.com/Azure/azure-storage-blob-go v0.15.0 github.com/ClickHouse/clickhouse-go v1.5.4 @@ -107,7 +107,7 @@ require ( golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 golang.org/x/oauth2 v0.24.0 golang.org/x/sync v0.10.0 - google.golang.org/api v0.209.0 + google.golang.org/api v0.210.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697 google.golang.org/grpc v1.68.1 google.golang.org/protobuf v1.35.2 @@ -123,8 +123,8 @@ require ( require ( cel.dev/expr v0.16.1 // indirect cloud.google.com/go v0.116.0 // indirect - cloud.google.com/go/auth v0.10.2 // indirect - cloud.google.com/go/auth/oauth2adapt v0.2.5 // indirect + cloud.google.com/go/auth v0.11.0 // indirect + cloud.google.com/go/auth/oauth2adapt v0.2.6 // indirect cloud.google.com/go/compute/metadata v0.5.2 // indirect cloud.google.com/go/iam v1.2.2 // indirect cloud.google.com/go/monitoring v1.21.2 // indirect @@ -346,7 +346,7 @@ require ( golang.org/x/time v0.8.0 // indirect golang.org/x/tools v0.26.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect - google.golang.org/genproto v0.0.0-20241113202542-65e8d215514f // indirect + google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20241118233622-e639e219e697 // indirect google.golang.org/grpc/stats/opentelemetry v0.0.0-20240907200651-3ffb98b2c93a // indirect gopkg.in/alexcesaro/statsd.v2 v2.0.0 // indirect diff --git a/go.sum b/go.sum index 913d8ee74d5..4005fe1b4fb 100644 --- a/go.sum +++ b/go.sum @@ -34,10 +34,10 @@ cloud.google.com/go v0.100.1/go.mod h1:fs4QogzfH5n2pBXBP9vRiU+eCny7lD2vmFZy79Iuw cloud.google.com/go v0.100.2/go.mod h1:4Xra9TjzAeYHrl5+oeLlzbM2k3mjVhZh4UqTZ//w99A= cloud.google.com/go v0.116.0 h1:B3fRrSDkLRt5qSHWe40ERJvhvnQwdZiHu0bJOpldweE= cloud.google.com/go v0.116.0/go.mod h1:cEPSRWPzZEswwdr9BxE6ChEn01dWlTaF05LiC2Xs70U= -cloud.google.com/go/auth v0.10.2 h1:oKF7rgBfSHdp/kuhXtqU/tNDr0mZqhYbEh+6SiqzkKo= -cloud.google.com/go/auth v0.10.2/go.mod h1:xxA5AqpDrvS+Gkmo9RqrGGRh6WSNKKOXhY3zNOr38tI= -cloud.google.com/go/auth/oauth2adapt v0.2.5 h1:2p29+dePqsCHPP1bqDJcKj4qxRyYCcbzKpFyKGt3MTk= -cloud.google.com/go/auth/oauth2adapt v0.2.5/go.mod h1:AlmsELtlEBnaNTL7jCj8VQFLy6mbZv0s4Q7NGBeQ5E8= +cloud.google.com/go/auth v0.11.0 h1:Ic5SZz2lsvbYcWT5dfjNWgw6tTlGi2Wc8hyQSC9BstA= +cloud.google.com/go/auth v0.11.0/go.mod h1:xxA5AqpDrvS+Gkmo9RqrGGRh6WSNKKOXhY3zNOr38tI= +cloud.google.com/go/auth/oauth2adapt v0.2.6 h1:V6a6XDu2lTwPZWOawrAa9HUK+DB2zfJyTuciBG5hFkU= +cloud.google.com/go/auth/oauth2adapt v0.2.6/go.mod h1:AlmsELtlEBnaNTL7jCj8VQFLy6mbZv0s4Q7NGBeQ5E8= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc= @@ -52,8 +52,8 @@ cloud.google.com/go/compute v1.3.0/go.mod h1:cCZiE1NHEtai4wiufUhW8I8S1JKkAnhnQJW cloud.google.com/go/compute v1.5.0/go.mod h1:9SMHyhJlzhlkJqrPAc839t2BZFTSk6Jdj6mkzQJeu0M= cloud.google.com/go/compute/metadata v0.5.2 h1:UxK4uu/Tn+I3p2dYWTfiX4wva7aYlKixAHn3fyqngqo= cloud.google.com/go/compute/metadata v0.5.2/go.mod h1:C66sj2AluDcIqakBq/M8lw8/ybHgOZqin2obFxa/E5k= -cloud.google.com/go/datacatalog v1.22.2 h1:9Bi8YO+WBE0YSSQL1tX62Gy/KcdNGLufyVlEJ0eYMrc= -cloud.google.com/go/datacatalog v1.22.2/go.mod h1:9Wamq8TDfL2680Sav7q3zEhBJSPBrDxJU8WtPJ25dBM= +cloud.google.com/go/datacatalog v1.23.0 h1:9F2zIbWNNmtrSkPIyGRQNsIugG5VgVVFip6+tXSdWLg= +cloud.google.com/go/datacatalog v1.23.0/go.mod h1:9Wamq8TDfL2680Sav7q3zEhBJSPBrDxJU8WtPJ25dBM= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= cloud.google.com/go/firestore v1.6.1/go.mod h1:asNXNOzBdyVQmEU+ggO8UPodTkEVFW5Qx+rwHnAz+EY= @@ -79,8 +79,8 @@ cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+ cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA= cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU= cloud.google.com/go/pubsub v1.19.0/go.mod h1:/O9kmSe9bb9KRnIAWkzmqhPjHo6LtzGOBYd/kr06XSs= -cloud.google.com/go/pubsub v1.45.1 h1:ZC/UzYcrmK12THWn1P72z+Pnp2vu/zCZRXyhAfP1hJY= -cloud.google.com/go/pubsub v1.45.1/go.mod h1:3bn7fTmzZFwaUjllitv1WlsNMkqBgGUb3UdMhI54eCc= +cloud.google.com/go/pubsub v1.45.3 h1:prYj8EEAAAwkp6WNoGTE4ahe0DgHoyJd5Pbop931zow= +cloud.google.com/go/pubsub v1.45.3/go.mod h1:cGyloK/hXC4at7smAtxFnXprKEFTqmMXNNd9w+bd94Q= cloud.google.com/go/secretmanager v1.3.0/go.mod h1:+oLTkouyiYiabAQNugCeTS3PAArGiMJuBqvJnJsyH+U= cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw= cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos= @@ -1770,8 +1770,8 @@ google.golang.org/api v0.69.0/go.mod h1:boanBiw+h5c3s+tBPgEzLDRHfFLWV0qXxRHz3ws7 google.golang.org/api v0.70.0/go.mod h1:Bs4ZM2HGifEvXwd50TtW70ovgJffJYw2oRCOFU/SkfA= google.golang.org/api v0.71.0/go.mod h1:4PyU6e6JogV1f9eA4voyrTY2batOLdgZ5qZ5HOCc4j8= google.golang.org/api v0.74.0/go.mod h1:ZpfMZOVRMywNyvJFeqL9HRWBgAuRfSjJFpe9QtRRyDs= -google.golang.org/api v0.209.0 h1:Ja2OXNlyRlWCWu8o+GgI4yUn/wz9h/5ZfFbKz+dQX+w= -google.golang.org/api v0.209.0/go.mod h1:I53S168Yr/PNDNMi5yPnDc0/LGRZO6o7PoEbl/HY3CM= +google.golang.org/api v0.210.0 h1:HMNffZ57OoZCRYSbdWVRoqOa8V8NIHLL0CzdBPLztWk= +google.golang.org/api v0.210.0/go.mod h1:B9XDZGnx2NtyjzVkOVTGrFSAVZgPcbedzKg/gTLwqBs= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -1865,8 +1865,8 @@ google.golang.org/genproto v0.0.0-20220304144024-325a89244dc8/go.mod h1:kGP+zUP2 google.golang.org/genproto v0.0.0-20220310185008-1973136f34c6/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI= google.golang.org/genproto v0.0.0-20220324131243-acbaeb5b85eb/go.mod h1:hAL49I2IFola2sVEjAn7MEwsja0xp51I0tlGAf9hz4E= google.golang.org/genproto v0.0.0-20220401170504-314d38edb7de/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo= -google.golang.org/genproto v0.0.0-20241113202542-65e8d215514f h1:zDoHYmMzMacIdjNe+P2XiTmPsLawi/pCbSPfxt6lTfw= -google.golang.org/genproto v0.0.0-20241113202542-65e8d215514f/go.mod h1:Q5m6g8b5KaFFzsQFIGdJkSJDGeJiybVenoYFMMa3ohI= +google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 h1:ToEetK57OidYuqD4Q5w+vfEnPvPpuTwedCNVohYJfNk= +google.golang.org/genproto v0.0.0-20241118233622-e639e219e697/go.mod h1:JJrvXBWRZaFMxBufik1a4RpFw4HhgVtBBWQeQgUj2cc= google.golang.org/genproto/googleapis/api v0.0.0-20241118233622-e639e219e697 h1:pgr/4QbFyktUv9CtQ/Fq4gzEE6/Xs7iCXbktaGzLHbQ= google.golang.org/genproto/googleapis/api v0.0.0-20241118233622-e639e219e697/go.mod h1:+D9ySVjN8nY8YCVjc5O7PZDIdZporIDY3KaGfJunh88= google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697 h1:LWZqQOEjDyONlF1H6afSWpAL/znlREo2tHfLoe+8LMA= From 4ac2de1f97b9c1e45b30a1ec288bdd22d104cd6e Mon Sep 17 00:00:00 2001 From: Akash Chetty Date: Thu, 5 Dec 2024 17:39:19 +0530 Subject: [PATCH 4/8] chore: cleanup staging repo to use upload_id for queries (#5340) --- warehouse/internal/repo/staging.go | 88 ++++++------------- warehouse/internal/repo/staging_test.go | 49 ++--------- warehouse/router/router.go | 2 +- warehouse/router/state_generate_load_files.go | 2 +- warehouse/router/testdata/sql/upload_test.sql | 12 +-- warehouse/router/upload.go | 4 +- warehouse/router/upload_stats.go | 8 +- warehouse/router/upload_stats_test.go | 3 +- 8 files changed, 51 insertions(+), 117 deletions(-) diff --git a/warehouse/internal/repo/staging.go b/warehouse/internal/repo/staging.go index dfffb7747b7..e4db9fb4680 100644 --- a/warehouse/internal/repo/staging.go +++ b/warehouse/internal/repo/staging.go @@ -291,27 +291,18 @@ func (sf *StagingFiles) GetSchemasByIDs(ctx context.Context, ids []int64) ([]mod return schemas, nil } -// GetForUploadID returns all the staging files for that uploadID -func (sf *StagingFiles) GetForUploadID(ctx context.Context, sourceID, destinationID string, uploadId int64) ([]*model.StagingFile, error) { - query := `SELECT ` + stagingTableColumns + ` FROM ` + stagingTableName + ` ST - WHERE - upload_id = $1 - AND source_id = $2 - AND destination_id = $3 - ORDER BY - id ASC;` - rows, err := sf.db.QueryContext(ctx, query, uploadId, sourceID, destinationID) +// GetForUploadID retrieves all the staging files associated with a specific upload ID. +func (sf *StagingFiles) GetForUploadID(ctx context.Context, uploadID int64) ([]*model.StagingFile, error) { + query := `SELECT ` + stagingTableColumns + ` FROM ` + stagingTableName + ` WHERE upload_id = $1 ORDER BY id ASC;` + + rows, err := sf.db.QueryContext(ctx, query, uploadID) if err != nil { - return nil, fmt.Errorf("querying staging files: %w", err) + return nil, fmt.Errorf("querying staging files for upload %d: %w", uploadID, err) } return parseStagingFiles(rows) } -func (sf *StagingFiles) GetForUpload(ctx context.Context, upload model.Upload) ([]*model.StagingFile, error) { - return sf.GetForUploadID(ctx, upload.SourceID, upload.DestinationID, upload.ID) -} - func (sf *StagingFiles) Pending(ctx context.Context, sourceID, destinationID string) ([]*model.StagingFile, error) { var ( uploadID int64 @@ -375,27 +366,13 @@ func (sf *StagingFiles) countPending(ctx context.Context, query string, value in return count, nil } -func (sf *StagingFiles) TotalEventsForUpload(ctx context.Context, upload model.Upload) (int64, error) { +func (sf *StagingFiles) TotalEventsForUploadID(ctx context.Context, uploadID int64) (int64, error) { var total sql.NullInt64 - err := sf.db.QueryRowContext(ctx, ` - SELECT - sum(total_events) - FROM - `+stagingTableName+` - WHERE - id >= $1 - AND id <= $2 - AND source_id = $3 - AND destination_id = $4; - `, - upload.StagingFileStartID, - upload.StagingFileEndID, - upload.SourceID, - upload.DestinationID, - ).Scan(&total) + query := `SELECT SUM(total_events) FROM ` + stagingTableName + ` WHERE upload_id = $1` + err := sf.db.QueryRowContext(ctx, query, uploadID).Scan(&total) if err != nil { - return 0, fmt.Errorf("querying total rows for upload: %w", err) + return 0, fmt.Errorf("querying total events for upload %d: %w", uploadID, err) } return total.Int64, nil @@ -438,47 +415,36 @@ func (sf *StagingFiles) GetEventTimeRangesByUploadID(ctx context.Context, upload return eventTimeRanges, nil } -func (sf *StagingFiles) DestinationRevisionIDs(ctx context.Context, upload model.Upload) ([]string, error) { - sqlStatement := ` - SELECT - DISTINCT metadata ->> 'destination_revision_id' AS destination_revision_id - FROM - ` + stagingTableName + ` - WHERE - id >= $1 - AND id <= $2 - AND source_id = $3 - AND destination_id = $4 +func (sf *StagingFiles) DestinationRevisionIDsForUploadID(ctx context.Context, uploadID int64) ([]string, error) { + query := ` + SELECT DISTINCT metadata ->> 'destination_revision_id' AS destination_revision_id + FROM ` + stagingTableName + ` + WHERE upload_id = $1 AND metadata ->> 'destination_revision_id' <> ''; ` - rows, err := sf.db.QueryContext(ctx, sqlStatement, - upload.StagingFileStartID, - upload.StagingFileEndID, - upload.SourceID, - upload.DestinationID, - ) - if errors.Is(err, sql.ErrNoRows) { - return nil, nil - } + + rows, err := sf.db.QueryContext(ctx, query, uploadID) if err != nil { - return nil, fmt.Errorf("query destination revisionID: %w", err) + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, fmt.Errorf("querying destination revision IDs for upload %d: %w", uploadID, err) } defer func() { _ = rows.Close() }() - var revisionIDs []string + var destinationRevisionIDs []string for rows.Next() { var revisionID string - err = rows.Scan(&revisionID) - if err != nil { - return nil, fmt.Errorf("scan destination revisionID: %w", err) + if err := rows.Scan(&revisionID); err != nil { + return nil, fmt.Errorf("scanning destination revision IDs for upload %d: %w", uploadID, err) } - revisionIDs = append(revisionIDs, revisionID) + destinationRevisionIDs = append(destinationRevisionIDs, revisionID) } if err = rows.Err(); err != nil { - return nil, fmt.Errorf("iterate destination revisionID: %w", err) + return nil, fmt.Errorf("iterating rows for destination revision IDs for upload %d: %w", uploadID, err) } - return revisionIDs, nil + return destinationRevisionIDs, nil } func (sf *StagingFiles) SetStatuses(ctx context.Context, ids []int64, status string) error { diff --git a/warehouse/internal/repo/staging_test.go b/warehouse/internal/repo/staging_test.go index d422b51705a..858a6dee86d 100644 --- a/warehouse/internal/repo/staging_test.go +++ b/warehouse/internal/repo/staging_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres" + migrator "github.com/rudderlabs/rudder-server/services/sql-migrator" sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" "github.com/rudderlabs/rudder-server/warehouse/internal/model" @@ -188,44 +189,10 @@ func TestStagingFileRepo_Many(t *testing.T) { u := repo.NewUploads(db) uploadId, err := u.CreateWithStagingFiles(ctx, model.Upload{}, stagingFiles) require.NoError(t, err) - testcases := []struct { - name string - sourceID string - destinationID string - uploadId int64 - expected []*model.StagingFile - }{ - { - name: "get all", - sourceID: "source_id", - destinationID: "destination_id", - uploadId: uploadId, - expected: stagingFiles, - }, - { - name: "missing source id", - sourceID: "bad_source_id", - destinationID: "destination_id", - uploadId: uploadId, - expected: []*model.StagingFile(nil), - }, - { - name: "missing destination id", - sourceID: "source_id", - destinationID: "bad_destination_id", - uploadId: uploadId, - expected: []*model.StagingFile(nil), - }, - } - for _, tc := range testcases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - retrieved, err := r.GetForUploadID(ctx, tc.sourceID, tc.destinationID, tc.uploadId) - require.NoError(t, err) - require.Equal(t, tc.expected, retrieved) - }) - } + + retrieved, err := r.GetForUploadID(ctx, uploadId) + require.NoError(t, err) + require.Equal(t, stagingFiles, retrieved) }) t.Run("GetSchemasByIDs", func(t *testing.T) { @@ -400,11 +367,11 @@ func TestStagingFileRepo_Pending(t *testing.T) { upload, err := uploadRepo.Get(ctx, uploadID) require.NoError(t, err) - events, err := r.TotalEventsForUpload(ctx, upload) + events, err := r.TotalEventsForUploadID(ctx, upload.ID) require.NoError(t, err) require.Equal(t, int64(input.Files)*100, events) - revisionIDs, err := r.DestinationRevisionIDs(ctx, upload) + revisionIDs, err := r.DestinationRevisionIDsForUploadID(ctx, upload.ID) require.NoError(t, err) require.Equal(t, []string{"destination_revision_id"}, revisionIDs) }) @@ -461,7 +428,7 @@ func TestStagingFileRepo_Status(t *testing.T) { ) require.NoError(t, err) - files, err := r.GetForUploadID(ctx, "source_id", "destination_id", 1) + files, err := r.GetForUploadID(ctx, 1) require.NoError(t, err) for _, file := range files { diff --git a/warehouse/router/router.go b/warehouse/router/router.go index 3fdf03cf7de..7c021554777 100644 --- a/warehouse/router/router.go +++ b/warehouse/router/router.go @@ -441,7 +441,7 @@ func (r *Router) uploadsToProcess(ctx context.Context, availableWorkers int, ski continue } - stagingFilesList, err := r.stagingRepo.GetForUpload(ctx, upload) + stagingFilesList, err := r.stagingRepo.GetForUploadID(ctx, upload.ID) if err != nil { return nil, err } diff --git a/warehouse/router/state_generate_load_files.go b/warehouse/router/state_generate_load_files.go index 7ac0546f9c9..06fec2335b2 100644 --- a/warehouse/router/state_generate_load_files.go +++ b/warehouse/router/state_generate_load_files.go @@ -57,7 +57,7 @@ func (job *UploadJob) setLoadFileIDs(startLoadFileID, endLoadFileID int64) error } func (job *UploadJob) matchRowsInStagingAndLoadFiles(ctx context.Context) error { - rowsInStagingFiles, err := job.stagingFileRepo.TotalEventsForUpload(ctx, job.upload) + rowsInStagingFiles, err := job.stagingFileRepo.TotalEventsForUploadID(ctx, job.upload.ID) if err != nil { return fmt.Errorf("total rows: %w", err) } diff --git a/warehouse/router/testdata/sql/upload_test.sql b/warehouse/router/testdata/sql/upload_test.sql index 1fb6686a26a..05fb38adca7 100644 --- a/warehouse/router/testdata/sql/upload_test.sql +++ b/warehouse/router/testdata/sql/upload_test.sql @@ -44,38 +44,38 @@ INSERT INTO wh_staging_files ( id, location, schema, source_id, destination_id, status, total_events, first_event_at, last_event_at, created_at, updated_at, - metadata + metadata,upload_id ) VALUES ( 1, 'rudder/rudder-warehouse-staging-logs/2EUralUySYUs7hgsdU1lFXRSm/2022-09-20/1663650685.2EUralsdsDyZjOKU1lFXRSm.eeadsb4-a066-42f4-a90b-460161378e1b.json.gz', '{}', 'test-sourceID', 'test-destinationID', 'succeeded', 1, NOW(), NOW(), NOW(), - NOW(), '{}' + NOW(), '{}',1 ), ( 2, 'rudder/rudder-warehouse-staging-logs/2EUralUySYUs7hgsdU1lFXRSm/2022-09-20/1663650685.2EUralsdsDyZjOKU1lFXRSm.eeadsb4-a066-42f4-a90b-460161378e1b.json.gz', '{}', 'test-sourceID', 'test-destinationID', 'succeeded', 1, NOW(), NOW(), NOW(), - NOW(), '{}' + NOW(), '{}',1 ), ( 3, 'rudder/rudder-warehouse-staging-logs/2EUralUySYUs7hgsdU1lFXRSm/2022-09-20/1663650685.2EUralsdsDyZjOKU1lFXRSm.eeadsb4-a066-42f4-a90b-460161378e1b.json.gz', '{}', 'test-sourceID', 'test-destinationID', 'succeeded', 1, NOW(), NOW(), NOW(), - NOW(), '{}' + NOW(), '{}',1 ), ( 4, 'rudder/rudder-warehouse-staging-logs/2EUralUySYUs7hgsdU1lFXRSm/2022-09-20/1663650685.2EUralsdsDyZjOKU1lFXRSm.eeadsb4-a066-42f4-a90b-460161378e1b.json.gz', '{}', 'test-sourceID', 'test-destinationID', 'succeeded', 1, NOW(), NOW(), NOW(), - NOW(), '{}' + NOW(), '{}',1 ), ( 5, 'rudder/rudder-warehouse-staging-logs/2EUralUySYUs7hgsdU1lFXRSm/2022-09-20/1663650685.2EUralsdsDyZjOKU1lFXRSm.eeadsb4-a066-42f4-a90b-460161378e1b.json.gz', '{}', 'test-sourceID', 'test-destinationID', 'succeeded', 1, NOW(), NOW(), NOW(), - NOW(), '{}' + NOW(), '{}',1 ); INSERT INTO wh_load_files ( id, staging_file_id, location, source_id, diff --git a/warehouse/router/upload.go b/warehouse/router/upload.go index 3ed6af1711b..4fc9ca18860 100644 --- a/warehouse/router/upload.go +++ b/warehouse/router/upload.go @@ -383,7 +383,7 @@ func (job *UploadJob) run() (err error) { uploadStatusOpts := UploadStatusOpts{Status: newStatus} if newStatus == model.ExportedData { - rowCount, _ := job.stagingFileRepo.TotalEventsForUpload(job.ctx, job.upload) + rowCount, _ := job.stagingFileRepo.TotalEventsForUploadID(job.ctx, job.upload.ID) reportingMetric := types.PUReportedMetric{ ConnectionDetails: types.ConnectionDetails{ @@ -662,7 +662,7 @@ func (job *UploadJob) setUploadError(statusError error, state string) (string, e return "", fmt.Errorf("changing upload columns: %w", err) } - inputCount, _ := job.stagingFileRepo.TotalEventsForUpload(job.ctx, upload) + inputCount, _ := job.stagingFileRepo.TotalEventsForUploadID(job.ctx, upload.ID) outputCount, _ := job.tableUploadsRepo.TotalExportedEvents(job.ctx, job.upload.ID, []string{ whutils.ToProviderCase(job.warehouse.Type, whutils.DiscardsTable), }) diff --git a/warehouse/router/upload_stats.go b/warehouse/router/upload_stats.go index c11fea32832..73f95faf24c 100644 --- a/warehouse/router/upload_stats.go +++ b/warehouse/router/upload_stats.go @@ -60,9 +60,9 @@ func (job *UploadJob) generateUploadSuccessMetrics() { return } - numStagedEvents, err = job.stagingFileRepo.TotalEventsForUpload( + numStagedEvents, err = job.stagingFileRepo.TotalEventsForUploadID( job.ctx, - job.upload, + job.upload.ID, ) if err != nil { job.logger.Warnw("total events for upload", logfield.Error, err.Error()) @@ -101,9 +101,9 @@ func (job *UploadJob) generateUploadAbortedMetrics() { return } - numStagedEvents, err = job.stagingFileRepo.TotalEventsForUpload( + numStagedEvents, err = job.stagingFileRepo.TotalEventsForUploadID( job.ctx, - job.upload, + job.upload.ID, ) if err != nil { job.logger.Warnw("total events for upload", logfield.Error, err.Error()) diff --git a/warehouse/router/upload_stats_test.go b/warehouse/router/upload_stats_test.go index be424191785..fd67039baf5 100644 --- a/warehouse/router/upload_stats_test.go +++ b/warehouse/router/upload_stats_test.go @@ -16,6 +16,7 @@ import ( "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-go-kit/stats/mock_stats" "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres" + backendconfig "github.com/rudderlabs/rudder-server/backend-config" migrator "github.com/rudderlabs/rudder-server/services/sql-migrator" sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" @@ -243,7 +244,7 @@ func TestUploadJob_MatchRows(t *testing.T) { }, }, nil) - count, err := repo.NewStagingFiles(sqlmiddleware.New(db)).TotalEventsForUpload(context.Background(), job.upload) + count, err := repo.NewStagingFiles(sqlmiddleware.New(db)).TotalEventsForUploadID(context.Background(), job.upload.ID) require.NoError(t, err) require.EqualValues(t, 5, count) }) From bfbe9f834f39590567d2b67c41d11eaf2f532ad1 Mon Sep 17 00:00:00 2001 From: Vamsi Krishna Kandi Date: Thu, 5 Dec 2024 18:36:04 +0530 Subject: [PATCH 5/8] chore: send multiple reports in a single request to reporting (#5264) --- app/apphandlers/embeddedAppHandler.go | 2 +- app/apphandlers/processorAppHandler.go | 2 +- app/features.go | 2 +- enterprise/reporting/mediator.go | 4 +- enterprise/reporting/reporting.go | 13 +- enterprise/reporting/reporting_test.go | 303 ++++++++++++++++++++++--- enterprise/reporting/setup.go | 5 +- enterprise/reporting/setup_test.go | 14 +- warehouse/app.go | 2 +- warehouse/app_test.go | 3 +- 10 files changed, 294 insertions(+), 56 deletions(-) diff --git a/app/apphandlers/embeddedAppHandler.go b/app/apphandlers/embeddedAppHandler.go index e4d128df306..68141dfd3fa 100644 --- a/app/apphandlers/embeddedAppHandler.go +++ b/app/apphandlers/embeddedAppHandler.go @@ -97,7 +97,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) if err != nil { return fmt.Errorf("could not run tracked users database migration: %w", err) } - reporting := a.app.Features().Reporting.Setup(ctx, backendconfig.DefaultBackendConfig) + reporting := a.app.Features().Reporting.Setup(ctx, config, backendconfig.DefaultBackendConfig) defer reporting.Stop() syncer := reporting.DatabaseSyncer(types.SyncerConfig{ConnInfo: misc.GetConnectionString(config, "reporting")}) g.Go(func() error { diff --git a/app/apphandlers/processorAppHandler.go b/app/apphandlers/processorAppHandler.go index 0e97da041d0..92faedb9405 100644 --- a/app/apphandlers/processorAppHandler.go +++ b/app/apphandlers/processorAppHandler.go @@ -110,7 +110,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options return fmt.Errorf("could not run tracked users database migration: %w", err) } - reporting := a.app.Features().Reporting.Setup(ctx, backendconfig.DefaultBackendConfig) + reporting := a.app.Features().Reporting.Setup(ctx, config, backendconfig.DefaultBackendConfig) defer reporting.Stop() syncer := reporting.DatabaseSyncer(types.SyncerConfig{ConnInfo: misc.GetConnectionString(config, "reporting")}) g.Go(crash.Wrapper(func() error { diff --git a/app/features.go b/app/features.go index c5f7b0be4d2..ae7c3006183 100644 --- a/app/features.go +++ b/app/features.go @@ -32,7 +32,7 @@ Reporting Feature // ReportingFeature handles reporting statuses / errors to reporting service type ReportingFeature interface { - Setup(cxt context.Context, backendConfig backendconfig.BackendConfig) types.Reporting + Setup(cxt context.Context, conf *config.Config, backendConfig backendconfig.BackendConfig) types.Reporting } // Features contains optional implementations of Enterprise only features. diff --git a/enterprise/reporting/mediator.go b/enterprise/reporting/mediator.go index 8146dcb1853..599b16b4ee8 100644 --- a/enterprise/reporting/mediator.go +++ b/enterprise/reporting/mediator.go @@ -34,7 +34,7 @@ type Mediator struct { cronRunners []flusher.Runner } -func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToken string, backendConfig backendconfig.BackendConfig) *Mediator { +func NewReportingMediator(ctx context.Context, conf *config.Config, log logger.Logger, enterpriseToken string, backendConfig backendconfig.BackendConfig) *Mediator { ctx, cancel := context.WithCancel(ctx) g, ctx := errgroup.WithContext(ctx) @@ -58,7 +58,7 @@ func NewReportingMediator(ctx context.Context, log logger.Logger, enterpriseToke }) // default reporting implementation - defaultReporter := NewDefaultReporter(rm.ctx, rm.log, configSubscriber, rm.stats) + defaultReporter := NewDefaultReporter(rm.ctx, conf, rm.log, configSubscriber, rm.stats) rm.reporters = append(rm.reporters, defaultReporter) // error reporting implementation diff --git a/enterprise/reporting/reporting.go b/enterprise/reporting/reporting.go index 2214c025662..e411c1079fa 100644 --- a/enterprise/reporting/reporting.go +++ b/enterprise/reporting/reporting.go @@ -74,9 +74,10 @@ type DefaultReporter struct { getReportsQueryTime stats.Measurement requestLatency stats.Measurement stats stats.Stats + maxReportsCountInARequest config.ValueLoader[int] } -func NewDefaultReporter(ctx context.Context, log logger.Logger, configSubscriber *configSubscriber, stats stats.Stats) *DefaultReporter { +func NewDefaultReporter(ctx context.Context, conf *config.Config, log logger.Logger, configSubscriber *configSubscriber, stats stats.Stats) *DefaultReporter { var dbQueryTimeout *config.Reloadable[time.Duration] reportingServiceURL := config.GetString("REPORTING_URL", "https://reporting.rudderstack.com/") @@ -88,6 +89,7 @@ func NewDefaultReporter(ctx context.Context, log logger.Logger, configSubscriber maxConcurrentRequests := config.GetReloadableIntVar(32, 1, "Reporting.maxConcurrentRequests") maxOpenConnections := config.GetIntVar(32, 1, "Reporting.maxOpenConnections") dbQueryTimeout = config.GetReloadableDurationVar(60, time.Second, "Reporting.dbQueryTimeout") + maxReportsCountInARequest := conf.GetReloadableIntVar(10, 1, "Reporting.maxReportsCountInARequest") // only send reports for wh actions sources if whActionsOnly is configured whActionsOnly := config.GetBool("REPORTING_WH_ACTIONS_ONLY", false) if whActionsOnly { @@ -114,6 +116,7 @@ func NewDefaultReporter(ctx context.Context, log logger.Logger, configSubscriber maxOpenConnections: maxOpenConnections, maxConcurrentRequests: maxConcurrentRequests, dbQueryTimeout: dbQueryTimeout, + maxReportsCountInARequest: maxReportsCountInARequest, stats: stats, } } @@ -265,8 +268,9 @@ func (r *DefaultReporter) getReports(currentMs int64, syncerKey string) (reports return metricReports, queryMin.Int64, err } -func (*DefaultReporter) getAggregatedReports(reports []*types.ReportByStatus) []*types.Metric { +func (r *DefaultReporter) getAggregatedReports(reports []*types.ReportByStatus) []*types.Metric { metricsByGroup := map[string]*types.Metric{} + maxReportsCountInARequest := r.maxReportsCountInARequest.Load() var values []*types.Metric reportIdentifier := func(report *types.ReportByStatus) string { @@ -282,16 +286,13 @@ func (*DefaultReporter) getAggregatedReports(reports []*types.ReportByStatus) [] report.ConnectionDetails.TrackingPlanID, strconv.Itoa(report.ConnectionDetails.TrackingPlanVersion), report.PUDetails.InPU, report.PUDetails.PU, - report.StatusDetail.Status, - strconv.Itoa(report.StatusDetail.StatusCode), - report.StatusDetail.EventName, report.StatusDetail.EventType, } return strings.Join(groupingIdentifiers, `::`) } for _, report := range reports { identifier := reportIdentifier(report) - if _, ok := metricsByGroup[identifier]; !ok { + if _, ok := metricsByGroup[identifier]; !ok || len(metricsByGroup[identifier].StatusDetails) >= maxReportsCountInARequest { metricsByGroup[identifier] = &types.Metric{ InstanceDetails: types.InstanceDetails{ WorkspaceID: report.WorkspaceID, diff --git a/enterprise/reporting/reporting_test.go b/enterprise/reporting/reporting_test.go index a26b1148cff..8bfbf7b3b79 100644 --- a/enterprise/reporting/reporting_test.go +++ b/enterprise/reporting/reporting_test.go @@ -10,6 +10,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/rudderlabs/rudder-go-kit/logger" @@ -74,9 +75,100 @@ var _ = Describe("Reporting", func() { assertReportMetric(expectedResponse, transformedMetric) }) }) +}) - Context("getAggregatedReports Tests", func() { - inputReports := []*types.ReportByStatus{ +func TestGetAggregatedReports(t *testing.T) { + inputReports := []*types.ReportByStatus{ + { + InstanceDetails: types.InstanceDetails{ + WorkspaceID: "some-workspace-id", + }, + ConnectionDetails: types.ConnectionDetails{ + SourceID: "some-source-id", + DestinationID: "some-destination-id", + TransformationID: "some-transformation-id", + TrackingPlanID: "some-tracking-plan-id", + }, + PUDetails: types.PUDetails{ + InPU: "some-in-pu", + PU: "some-pu", + }, + ReportMetadata: types.ReportMetadata{ + ReportedAt: 28017690, + }, + StatusDetail: &types.StatusDetail{ + Status: "some-status", + Count: 3, + ViolationCount: 5, + StatusCode: 200, + SampleResponse: "", + SampleEvent: []byte(`{}`), + ErrorType: "", + }, + }, + { + InstanceDetails: types.InstanceDetails{ + WorkspaceID: "some-workspace-id", + }, + ConnectionDetails: types.ConnectionDetails{ + SourceID: "some-source-id", + DestinationID: "some-destination-id", + TransformationID: "some-transformation-id", + TrackingPlanID: "some-tracking-plan-id", + }, + PUDetails: types.PUDetails{ + InPU: "some-in-pu", + PU: "some-pu", + }, + ReportMetadata: types.ReportMetadata{ + ReportedAt: 28017690, + }, + StatusDetail: &types.StatusDetail{ + Status: "some-status", + Count: 2, + ViolationCount: 10, + StatusCode: 200, + SampleResponse: "", + SampleEvent: []byte(`{}`), + ErrorType: "some-error-type", + }, + }, + { + InstanceDetails: types.InstanceDetails{ + WorkspaceID: "some-workspace-id", + }, + ConnectionDetails: types.ConnectionDetails{ + SourceID: "some-source-id-2", + DestinationID: "some-destination-id", + TransformationID: "some-transformation-id", + TrackingPlanID: "some-tracking-plan-id", + }, + PUDetails: types.PUDetails{ + InPU: "some-in-pu", + PU: "some-pu", + }, + ReportMetadata: types.ReportMetadata{ + ReportedAt: 28017690, + }, + StatusDetail: &types.StatusDetail{ + Status: "some-status", + Count: 3, + ViolationCount: 10, + StatusCode: 200, + SampleResponse: "", + SampleEvent: []byte(`{}`), + ErrorType: "some-error-type", + }, + }, + } + conf := config.New() + configSubscriber := newConfigSubscriber(logger.NOP) + reportHandle := NewDefaultReporter(context.Background(), conf, logger.NOP, configSubscriber, stats.NOP) + + t.Run("Should provide aggregated reports when batch size is 1", func(t *testing.T) { + conf.Set("Reporting.maxReportsCountInARequest", 1) + assert.Equal(t, 1, reportHandle.maxReportsCountInARequest.Load()) + expectedResponse := []*types.Metric{ { InstanceDetails: types.InstanceDetails{ WorkspaceID: "some-workspace-id", @@ -92,16 +184,18 @@ var _ = Describe("Reporting", func() { PU: "some-pu", }, ReportMetadata: types.ReportMetadata{ - ReportedAt: 28017690, + ReportedAt: 28017690 * 60 * 1000, }, - StatusDetail: &types.StatusDetail{ - Status: "some-status", - Count: 3, - ViolationCount: 5, - StatusCode: 200, - SampleResponse: "", - SampleEvent: []byte(`{}`), - ErrorType: "", + StatusDetails: []*types.StatusDetail{ + { + Status: "some-status", + Count: 3, + ViolationCount: 5, + StatusCode: 200, + SampleResponse: "", + SampleEvent: []byte(`{}`), + ErrorType: "", + }, }, }, { @@ -119,16 +213,18 @@ var _ = Describe("Reporting", func() { PU: "some-pu", }, ReportMetadata: types.ReportMetadata{ - ReportedAt: 28017690, + ReportedAt: 28017690 * 60 * 1000, }, - StatusDetail: &types.StatusDetail{ - Status: "some-status", - Count: 2, - ViolationCount: 10, - StatusCode: 200, - SampleResponse: "", - SampleEvent: []byte(`{}`), - ErrorType: "some-error-type", + StatusDetails: []*types.StatusDetail{ + { + Status: "some-status", + Count: 2, + ViolationCount: 10, + StatusCode: 200, + SampleResponse: "", + SampleEvent: []byte(`{}`), + ErrorType: "some-error-type", + }, }, }, { @@ -146,20 +242,29 @@ var _ = Describe("Reporting", func() { PU: "some-pu", }, ReportMetadata: types.ReportMetadata{ - ReportedAt: 28017690, + ReportedAt: 28017690 * 60 * 1000, }, - StatusDetail: &types.StatusDetail{ - Status: "some-status", - Count: 3, - ViolationCount: 10, - StatusCode: 200, - SampleResponse: "", - SampleEvent: []byte(`{}`), - ErrorType: "some-error-type", + StatusDetails: []*types.StatusDetail{ + { + Status: "some-status", + Count: 3, + ViolationCount: 10, + StatusCode: 200, + SampleResponse: "", + SampleEvent: []byte(`{}`), + ErrorType: "some-error-type", + }, }, }, } + aggregatedMetrics := reportHandle.getAggregatedReports(inputReports) + assert.Equal(t, expectedResponse, aggregatedMetrics) + }) + + t.Run("Should provide aggregated reports when batch size more than 1", func(t *testing.T) { + conf.Set("Reporting.maxReportsCountInARequest", 10) + assert.Equal(t, 10, reportHandle.maxReportsCountInARequest.Load()) expectedResponse := []*types.Metric{ { InstanceDetails: types.InstanceDetails{ @@ -229,13 +334,145 @@ var _ = Describe("Reporting", func() { }, }, } - configSubscriber := newConfigSubscriber(logger.NOP) - reportHandle := NewDefaultReporter(context.Background(), logger.NOP, configSubscriber, stats.NOP) aggregatedMetrics := reportHandle.getAggregatedReports(inputReports) - Expect(aggregatedMetrics).To(Equal(expectedResponse)) + assert.Equal(t, expectedResponse, aggregatedMetrics) }) -}) + + t.Run("Should provide aggregated reports when batch size is more than 1 and reports with same identifier are more then batch size", func(t *testing.T) { + conf.Set("Reporting.maxReportsCountInARequest", 2) + assert.Equal(t, 2, reportHandle.maxReportsCountInARequest.Load()) + extraReport := &types.ReportByStatus{ + InstanceDetails: types.InstanceDetails{ + WorkspaceID: "some-workspace-id", + }, + ConnectionDetails: types.ConnectionDetails{ + SourceID: "some-source-id", + DestinationID: "some-destination-id", + TransformationID: "some-transformation-id", + TrackingPlanID: "some-tracking-plan-id", + }, + PUDetails: types.PUDetails{ + InPU: "some-in-pu", + PU: "some-pu", + }, + ReportMetadata: types.ReportMetadata{ + ReportedAt: 28017690, + }, + StatusDetail: &types.StatusDetail{ + Status: "some-status", + Count: 2, + ViolationCount: 10, + StatusCode: 200, + SampleResponse: "", + SampleEvent: []byte(`{}`), + ErrorType: "another-error-type", + }, + } + newInputReports := append(inputReports, extraReport) + expectedResponse := []*types.Metric{ + { + InstanceDetails: types.InstanceDetails{ + WorkspaceID: "some-workspace-id", + }, + ConnectionDetails: types.ConnectionDetails{ + SourceID: "some-source-id", + DestinationID: "some-destination-id", + TransformationID: "some-transformation-id", + TrackingPlanID: "some-tracking-plan-id", + }, + PUDetails: types.PUDetails{ + InPU: "some-in-pu", + PU: "some-pu", + }, + ReportMetadata: types.ReportMetadata{ + ReportedAt: 28017690 * 60 * 1000, + }, + StatusDetails: []*types.StatusDetail{ + { + Status: "some-status", + Count: 3, + ViolationCount: 5, + StatusCode: 200, + SampleResponse: "", + SampleEvent: []byte(`{}`), + ErrorType: "", + }, + { + Status: "some-status", + Count: 2, + ViolationCount: 10, + StatusCode: 200, + SampleResponse: "", + SampleEvent: []byte(`{}`), + ErrorType: "some-error-type", + }, + }, + }, + { + InstanceDetails: types.InstanceDetails{ + WorkspaceID: "some-workspace-id", + }, + ConnectionDetails: types.ConnectionDetails{ + SourceID: "some-source-id-2", + DestinationID: "some-destination-id", + TransformationID: "some-transformation-id", + TrackingPlanID: "some-tracking-plan-id", + }, + PUDetails: types.PUDetails{ + InPU: "some-in-pu", + PU: "some-pu", + }, + ReportMetadata: types.ReportMetadata{ + ReportedAt: 28017690 * 60 * 1000, + }, + StatusDetails: []*types.StatusDetail{ + { + Status: "some-status", + Count: 3, + ViolationCount: 10, + StatusCode: 200, + SampleResponse: "", + SampleEvent: []byte(`{}`), + ErrorType: "some-error-type", + }, + }, + }, + { + InstanceDetails: types.InstanceDetails{ + WorkspaceID: "some-workspace-id", + }, + ConnectionDetails: types.ConnectionDetails{ + SourceID: "some-source-id", + DestinationID: "some-destination-id", + TransformationID: "some-transformation-id", + TrackingPlanID: "some-tracking-plan-id", + }, + PUDetails: types.PUDetails{ + InPU: "some-in-pu", + PU: "some-pu", + }, + ReportMetadata: types.ReportMetadata{ + ReportedAt: 28017690 * 60 * 1000, + }, + StatusDetails: []*types.StatusDetail{ + { + Status: "some-status", + Count: 2, + ViolationCount: 10, + StatusCode: 200, + SampleResponse: "", + SampleEvent: []byte(`{}`), + ErrorType: "another-error-type", + }, + }, + }, + } + + aggregatedMetrics := reportHandle.getAggregatedReports(newInputReports) + assert.Equal(t, expectedResponse, aggregatedMetrics) + }) +} func assertReportMetric(expectedMetric, actualMetric types.PUReportedMetric) { Expect(expectedMetric.ConnectionDetails.SourceID).To(Equal(actualMetric.ConnectionDetails.SourceID)) diff --git a/enterprise/reporting/setup.go b/enterprise/reporting/setup.go index e0deccb7d2f..74e2b8aad0a 100644 --- a/enterprise/reporting/setup.go +++ b/enterprise/reporting/setup.go @@ -4,6 +4,7 @@ import ( "context" "sync" + "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/utils/types" @@ -18,12 +19,12 @@ type Factory struct { } // Setup initializes Suppress User feature -func (m *Factory) Setup(ctx context.Context, backendConfig backendconfig.BackendConfig) types.Reporting { +func (m *Factory) Setup(ctx context.Context, conf *config.Config, backendConfig backendconfig.BackendConfig) types.Reporting { m.oneInstance.Do(func() { if m.Log == nil { m.Log = logger.NewLogger().Child("enterprise").Child("reporting") } - m.instance = NewReportingMediator(ctx, m.Log, m.EnterpriseToken, backendConfig) + m.instance = NewReportingMediator(ctx, conf, m.Log, m.EnterpriseToken, backendConfig) }) return m.instance } diff --git a/enterprise/reporting/setup_test.go b/enterprise/reporting/setup_test.go index c1c390f29ce..a77e5905593 100644 --- a/enterprise/reporting/setup_test.go +++ b/enterprise/reporting/setup_test.go @@ -16,16 +16,16 @@ import ( ) func TestFeatureSetup(t *testing.T) { - config.Reset() logger.Reset() + config := config.New() f := &Factory{ EnterpriseToken: "dummy-token", } - instanceA := f.Setup(context.Background(), &backendconfig.NOOP{}) + instanceA := f.Setup(context.Background(), config, &backendconfig.NOOP{}) instanceB := f.instance - instanceC := f.Setup(context.Background(), &backendconfig.NOOP{}) + instanceC := f.Setup(context.Background(), config, &backendconfig.NOOP{}) instanceD := f.instance require.Equal(t, instanceA, instanceB) @@ -33,7 +33,7 @@ func TestFeatureSetup(t *testing.T) { require.Equal(t, instanceC, instanceD) f = &Factory{} - instanceE := f.Setup(context.Background(), &backendconfig.NOOP{}) + instanceE := f.Setup(context.Background(), config, &backendconfig.NOOP{}) instanceF := f.instance require.Equal(t, instanceE, instanceF) require.NotEqual(t, instanceE, backendconfig.NOOP{}) @@ -41,9 +41,7 @@ func TestFeatureSetup(t *testing.T) { func TestSetupForDelegates(t *testing.T) { logger.Reset() - - config.Reset() - defer config.Reset() + config := config.New() pool, err := dockertest.NewPool("") require.NoError(t, err) @@ -119,7 +117,7 @@ func TestSetupForDelegates(t *testing.T) { EnterpriseToken: "dummy-token", } } - med := NewReportingMediator(context.Background(), logger.NOP, f.EnterpriseToken, &backendconfig.NOOP{}) + med := NewReportingMediator(context.Background(), config, logger.NOP, f.EnterpriseToken, &backendconfig.NOOP{}) require.Len(t, med.reporters, tc.expectedDelegates) }) diff --git a/warehouse/app.go b/warehouse/app.go index 9380e5dfc12..01fe48f5ebc 100644 --- a/warehouse/app.go +++ b/warehouse/app.go @@ -350,7 +350,7 @@ func (a *App) Run(ctx context.Context) error { } if !mode.IsStandAloneSlave(a.config.mode) { - a.reporting = a.app.Features().Reporting.Setup(gCtx, a.bcConfig) + a.reporting = a.app.Features().Reporting.Setup(gCtx, a.conf, a.bcConfig) defer a.reporting.Stop() syncer := a.reporting.DatabaseSyncer(types.SyncerConfig{ConnInfo: a.connectionString("reporting"), Label: types.WarehouseReportingLabel}) g.Go(crash.NotifyWarehouse(func() error { diff --git a/warehouse/app_test.go b/warehouse/app_test.go index 88ef9090c00..af220bb3f44 100644 --- a/warehouse/app_test.go +++ b/warehouse/app_test.go @@ -43,6 +43,7 @@ import ( func TestApp(t *testing.T) { admin.Init() misc.Init() + conf := config.New() const ( workspaceID = "test_workspace_id" @@ -57,7 +58,7 @@ func TestApp(t *testing.T) { require.NoError(t, err) report := &reporting.Factory{} - report.Setup(context.Background(), &bcConfig.NOOP{}) + report.Setup(context.Background(), conf, &bcConfig.NOOP{}) mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() From 33e9915931f15d20bcebff0e42984362f1571791 Mon Sep 17 00:00:00 2001 From: Akash Chetty Date: Thu, 5 Dec 2024 18:40:47 +0530 Subject: [PATCH 6/8] chore: poll caching for snowpipe streaming (#5335) --- .../snowpipestreaming/snowpipestreaming.go | 157 +++++++++++------- .../snowpipestreaming_test.go | 81 ++++++++- .../snowpipestreaming/types.go | 20 ++- 3 files changed, 187 insertions(+), 71 deletions(-) diff --git a/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go b/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go index 11759a54546..f16353222f9 100644 --- a/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go +++ b/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go @@ -17,12 +17,11 @@ import ( "github.com/samber/lo" "github.com/tidwall/gjson" - "github.com/rudderlabs/rudder-go-kit/stringify" - "github.com/rudderlabs/rudder-go-kit/bytesize" "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/rudderlabs/rudder-go-kit/stringify" obskit "github.com/rudderlabs/rudder-observability-kit/go/labels" backendconfig "github.com/rudderlabs/rudder-server/backend-config" @@ -50,10 +49,11 @@ func New( obskit.DestinationID(destination.ID), obskit.DestinationType(destination.DestinationDefinition.Name), ), - statsFactory: statsFactory, - destination: destination, - now: timeutil.Now, - channelCache: sync.Map{}, + statsFactory: statsFactory, + destination: destination, + now: timeutil.Now, + channelCache: sync.Map{}, + polledImportInfoMap: make(map[string]*importInfo), } m.config.client.url = conf.GetString("SnowpipeStreaming.Client.URL", "http://localhost:9078") @@ -88,6 +88,7 @@ func New( })) m.stats.discards = statsFactory.NewTaggedStat("snowpipe_streaming_discards", stats.CountType, tags) + m.stats.pollingInProgress = statsFactory.NewTaggedStat("snowpipe_streaming_polling_in_progress", stats.CountType, tags) if m.requestDoer == nil { m.requestDoer = m.retryableClient().StandardClient() @@ -362,13 +363,14 @@ func (m *Manager) abortJobs(asyncDest *common.AsyncDestinationStruct, abortReaso } // Poll checks the status of multiple imports using the import ID from pollInput. -// It returns a PollStatusResponse indicating if any imports are still in progress or if any have failed or succeeded. -// If any imports have failed, it deletes the channels for those imports. +// For the once which have reached the terminal state (success or failure), it caches the import infos in polledImportInfoMap. Later if Poll is called again, it does not need to do the status check again. +// Once all the imports have reached the terminal state, if any imports have failed, it deletes the channels for those imports. +// It returns a PollStatusResponse indicating if any imports are still in progress or if any have failed or succeeded func (m *Manager) Poll(pollInput common.AsyncPoll) common.PollStatusResponse { m.logger.Infon("Polling started") - var infos []*importInfo - err := json.Unmarshal([]byte(pollInput.ImportId), &infos) + var importInfos []*importInfo + err := json.Unmarshal([]byte(pollInput.ImportId), &importInfos) if err != nil { return common.PollStatusResponse{ InProgress: false, @@ -382,67 +384,50 @@ func (m *Manager) Poll(pollInput common.AsyncPoll) common.PollStatusResponse { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var anyoneInProgress bool - for _, info := range infos { - inProgress, err := m.pollForImportInfo(ctx, info) - if err != nil { - info.Failed = true - info.Reason = err.Error() - continue - } - anyoneInProgress = anyoneInProgress || inProgress - } - if anyoneInProgress { + if anyInProgress := m.processPollImportInfos(ctx, importInfos); anyInProgress { + m.stats.pollingInProgress.Increment() return common.PollStatusResponse{InProgress: true} } - failedInfos := lo.Filter(infos, func(info *importInfo, index int) bool { + updatedImportInfos := lo.Map(importInfos, func(item *importInfo, index int) *importInfo { + return m.polledImportInfoMap[item.ChannelID] + }) + failedImports := lo.Filter(updatedImportInfos, func(info *importInfo, index int) bool { return info.Failed }) - for _, info := range failedInfos { - m.logger.Warnn("Failed to poll channel offset", - logger.NewStringField("channelId", info.ChannelID), - logger.NewStringField("offset", info.Offset), - logger.NewStringField("table", info.Table), - logger.NewStringField("reason", info.Reason), - ) + m.cleanupFailedImports(ctx, failedImports) + m.updateJobStatistics(updatedImportInfos) + m.polledImportInfoMap = make(map[string]*importInfo) - if deleteErr := m.deleteChannel(ctx, info.Table, info.ChannelID); deleteErr != nil { - m.logger.Warnn("Failed to delete channel", - logger.NewStringField("channelId", info.ChannelID), - logger.NewStringField("table", info.Table), - obskit.Error(deleteErr), - ) + return m.buildPollStatusResponse(updatedImportInfos, failedImports) +} + +func (m *Manager) processPollImportInfos(ctx context.Context, infos []*importInfo) bool { + var anyInProgress bool + for i := range infos { + info := infos[i] + + if _, alreadyProcessed := m.polledImportInfoMap[info.ChannelID]; alreadyProcessed { + continue } - } - var successJobsCount, failedJobsCount int - for _, info := range infos { - if info.Failed { - failedJobsCount += info.Count - } else { - successJobsCount += info.Count + inProgress, err := m.getImportStatus(ctx, info) + if err != nil { + info.Failed = true + info.Reason = err.Error() + m.polledImportInfoMap[info.ChannelID] = info + continue + } + if !inProgress { + m.polledImportInfoMap[info.ChannelID] = info } - } - m.stats.jobs.failed.Count(failedJobsCount) - m.stats.jobs.succeeded.Count(successJobsCount) - statusResponse := common.PollStatusResponse{ - InProgress: false, - StatusCode: http.StatusOK, - Complete: true, - } - if len(failedInfos) > 0 { - statusResponse.HasFailed = true - statusResponse.FailedJobParameters = stringify.Any(infos) - } else { - statusResponse.HasFailed = false - statusResponse.HasWarning = false + anyInProgress = anyInProgress || inProgress } - return statusResponse + return anyInProgress } -func (m *Manager) pollForImportInfo(ctx context.Context, info *importInfo) (bool, error) { +func (m *Manager) getImportStatus(ctx context.Context, info *importInfo) (bool, error) { log := m.logger.Withn( logger.NewStringField("channelId", info.ChannelID), logger.NewStringField("offset", info.Offset), @@ -466,14 +451,64 @@ func (m *Manager) pollForImportInfo(ctx context.Context, info *importInfo) (bool return statusRes.Offset != info.Offset, nil } +func (m *Manager) cleanupFailedImports(ctx context.Context, failedInfos []*importInfo) { + for _, info := range failedInfos { + m.logger.Warnn("Failed to poll channel offset", + logger.NewStringField("channelId", info.ChannelID), + logger.NewStringField("offset", info.Offset), + logger.NewStringField("table", info.Table), + logger.NewStringField("reason", info.Reason), + ) + + if err := m.deleteChannel(ctx, info.Table, info.ChannelID); err != nil { + m.logger.Warnn("Failed to delete channel", + logger.NewStringField("channelId", info.ChannelID), + logger.NewStringField("table", info.Table), + obskit.Error(err), + ) + } + } +} + +func (m *Manager) updateJobStatistics(importInfos []*importInfo) { + var successfulCount, failedCount int + + for _, info := range importInfos { + if info.Failed { + failedCount += info.Count + } else { + successfulCount += info.Count + } + } + m.stats.jobs.failed.Count(failedCount) + m.stats.jobs.succeeded.Count(successfulCount) +} + +func (m *Manager) buildPollStatusResponse(importInfos, failedImports []*importInfo) common.PollStatusResponse { + response := common.PollStatusResponse{ + InProgress: false, + StatusCode: http.StatusOK, + Complete: true, + } + + if len(failedImports) > 0 { + response.HasFailed = true + response.FailedJobParameters = stringify.Any(importInfos) + } else { + response.HasFailed = false + response.HasWarning = false + } + return response +} + // GetUploadStats returns the status of the uploads for the snowpipe streaming destination. // It returns the status of the uploads for the given job IDs. // If any of the uploads have failed, it returns the reason for the failure. func (m *Manager) GetUploadStats(input common.GetUploadStatsInput) common.GetUploadStatsResponse { m.logger.Infon("Getting import stats for snowpipe streaming destination") - var infos []*importInfo - err := json.Unmarshal([]byte(input.FailedJobParameters), &infos) + var importInfos []*importInfo + err := json.Unmarshal([]byte(input.FailedJobParameters), &importInfos) if err != nil { return common.GetUploadStatsResponse{ StatusCode: http.StatusBadRequest, @@ -482,7 +517,7 @@ func (m *Manager) GetUploadStats(input common.GetUploadStatsInput) common.GetUpl } succeededTables, failedTables := make(map[string]struct{}), make(map[string]*importInfo) - for _, info := range infos { + for _, info := range importInfos { if info.Failed { failedTables[info.Table] = info } else { diff --git a/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming_test.go b/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming_test.go index 4508500bfa4..912a546cb11 100644 --- a/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming_test.go +++ b/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming_test.go @@ -948,7 +948,7 @@ func TestSnowpipeStreaming(t *testing.T) { require.Equal(t, http.StatusOK, output.StatusCode) require.True(t, output.Complete) require.True(t, output.HasFailed) - require.Equal(t, `[{"channelId":"test-products-channel","offset":"1003","table":"PRODUCTS","failed":true,"reason":"getting status: assert.AnError general error for testing","count":2},{"channelId":"test-users-channel","offset":"1004","table":"USERS","failed":true,"reason":"getting status: assert.AnError general error for testing","count":2}]`, output.FailedJobParameters) + require.JSONEq(t, `[{"channelId":"test-products-channel","offset":"1003","table":"PRODUCTS","failed":true,"reason":"getting status: assert.AnError general error for testing","count":2},{"channelId":"test-users-channel","offset":"1004","table":"USERS","failed":true,"reason":"getting status: assert.AnError general error for testing","count":2}]`, output.FailedJobParameters) require.EqualValues(t, 4, statsStore.Get("snowpipe_streaming_jobs", stats.Tags{ "module": "batch_router", "workspaceId": "test-workspace", @@ -998,6 +998,85 @@ func TestSnowpipeStreaming(t *testing.T) { "status": "failed", }).LastValue()) }) + t.Run("Poll caching", func(t *testing.T) { + importID := `[{"channelId":"test-channel-1","offset":"1","table":"1","failed":false,"reason":"","count":1},{"channelId":"test-channel-2","offset":"2","table":"2","failed":false,"reason":"","count":2},{"channelId":"test-channel-3","offset":"3","table":"3","failed":false,"reason":"","count":3},{"channelId":"test-channel-4","offset":"4","table":"4","failed":false,"reason":"","count":4}]` + statsStore, err := memstats.New() + require.NoError(t, err) + + statusCalls := 0 + sm := New(config.New(), logger.NOP, statsStore, destination) + sm.api = &mockAPI{ + getStatusOutputMap: map[string]func() (*model.StatusResponse, error){ + "test-channel-1": func() (*model.StatusResponse, error) { + statusCalls += 1 + return &model.StatusResponse{Valid: false, Success: true}, nil + }, + "test-channel-2": func() (*model.StatusResponse, error) { + statusCalls += 1 + return &model.StatusResponse{Valid: true, Success: false}, nil + }, + "test-channel-3": func() (*model.StatusResponse, error) { + statusCalls += 1 + return &model.StatusResponse{Valid: true, Success: true, Offset: "0"}, nil + }, + "test-channel-4": func() (*model.StatusResponse, error) { + statusCalls += 1 + return &model.StatusResponse{Valid: true, Success: true, Offset: "4"}, nil + }, + }, + } + output := sm.Poll(common.AsyncPoll{ + ImportId: importID, + }) + require.True(t, output.InProgress) + + t.Log("Polling again should not call getStatus for channels 1, 2 and 4 since they already reached the terminal state") + sm.api = &mockAPI{ + getStatusOutputMap: map[string]func() (*model.StatusResponse, error){ + "test-channel-3": func() (*model.StatusResponse, error) { + statusCalls += 1 + return &model.StatusResponse{Valid: true, Success: true, Offset: "3"}, nil + }, + }, + deleteChannelOutputMap: map[string]func() error{ + "test-channel-1": func() error { + return nil + }, + "test-channel-2": func() error { + return nil + }, + }, + } + output = sm.Poll(common.AsyncPoll{ + ImportId: importID, + }) + require.False(t, output.InProgress) + require.Equal(t, http.StatusOK, output.StatusCode) + require.True(t, output.Complete) + require.True(t, output.HasFailed) + require.Equal(t, `[{"channelId":"test-channel-1","offset":"1","table":"1","failed":true,"reason":"invalid status response with valid: false, success: true","count":1},{"channelId":"test-channel-2","offset":"2","table":"2","failed":true,"reason":"invalid status response with valid: true, success: false","count":2},{"channelId":"test-channel-3","offset":"3","table":"3","failed":false,"reason":"","count":3},{"channelId":"test-channel-4","offset":"4","table":"4","failed":false,"reason":"","count":4}]`, output.FailedJobParameters) + require.EqualValues(t, 3, statsStore.Get("snowpipe_streaming_jobs", stats.Tags{ + "module": "batch_router", + "workspaceId": "test-workspace", + "destType": "SNOWPIPE_STREAMING", + "destinationId": "test-destination", + "status": "failed", + }).LastValue()) + require.EqualValues(t, 7, statsStore.Get("snowpipe_streaming_jobs", stats.Tags{ + "module": "batch_router", + "workspaceId": "test-workspace", + "destType": "SNOWPIPE_STREAMING", + "destinationId": "test-destination", + "status": "succeeded", + }).LastValue()) + require.EqualValues(t, 1, statsStore.Get("snowpipe_streaming_polling_in_progress", stats.Tags{ + "module": "batch_router", + "workspaceId": "test-workspace", + "destType": "SNOWPIPE_STREAMING", + "destinationId": "test-destination", + }).LastValue()) + require.Equal(t, 5, statusCalls) // 4 channels + 1 for polling in progress + }) t.Run("GetUploadStats with invalid importInfo", func(t *testing.T) { statsStore, err := memstats.New() diff --git a/router/batchrouter/asyncdestinationmanager/snowpipestreaming/types.go b/router/batchrouter/asyncdestinationmanager/snowpipestreaming/types.go index c3211dcf475..3376f6104a1 100644 --- a/router/batchrouter/asyncdestinationmanager/snowpipestreaming/types.go +++ b/router/batchrouter/asyncdestinationmanager/snowpipestreaming/types.go @@ -20,14 +20,15 @@ import ( type ( Manager struct { - appConfig *config.Config - logger logger.Logger - statsFactory stats.Stats - destination *backendconfig.DestinationT - requestDoer requestDoer - now func() time.Time - api api - channelCache sync.Map + appConfig *config.Config + logger logger.Logger + statsFactory stats.Stats + destination *backendconfig.DestinationT + requestDoer requestDoer + now func() time.Time + api api + channelCache sync.Map + polledImportInfoMap map[string]*importInfo config struct { client struct { @@ -52,7 +53,8 @@ type ( failed stats.Counter aborted stats.Counter } - discards stats.Counter + discards stats.Counter + pollingInProgress stats.Counter } } From 232b107e01f01a0dd29ed5968521048bba93780c Mon Sep 17 00:00:00 2001 From: Akash Chetty Date: Thu, 5 Dec 2024 19:19:34 +0530 Subject: [PATCH 7/8] feat: snowpipe http compression for insert request (#5337) --- .../snowpipestreaming/internal/api/api.go | 56 +++++- .../internal/api/api_integration_test.go | 174 +++++++++++------- .../internal/api/api_test.go | 54 +++--- .../snowpipestreaming/snowpipestreaming.go | 2 +- runner/buckets.go | 5 + 5 files changed, 194 insertions(+), 97 deletions(-) diff --git a/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/api.go b/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/api.go index 16482a8afbe..006542f031f 100644 --- a/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/api.go +++ b/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/api.go @@ -2,6 +2,7 @@ package api import ( "bytes" + "compress/gzip" "context" "fmt" "io" @@ -10,6 +11,9 @@ import ( jsoniter "github.com/json-iterator/go" + "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/model" "github.com/rudderlabs/rudder-server/utils/httputil" ) @@ -18,6 +22,12 @@ type ( API struct { clientURL string requestDoer requestDoer + config struct { + enableCompression config.ValueLoader[bool] + } + stats struct { + insertRequestBodySize stats.Histogram + } } requestDoer interface { @@ -27,11 +37,17 @@ type ( var json = jsoniter.ConfigCompatibleWithStandardLibrary -func New(clientURL string, requestDoer requestDoer) *API { - return &API{ +func New(conf *config.Config, statsFactory stats.Stats, clientURL string, requestDoer requestDoer) *API { + a := &API{ clientURL: clientURL, requestDoer: requestDoer, } + a.config.enableCompression = conf.GetReloadableBoolVar(true, "SnowpipeStreaming.enableCompression") + a.stats.insertRequestBodySize = statsFactory.NewTaggedStat("snowpipe_streaming_request_body_size", stats.HistogramType, stats.Tags{ + "api": "insert", + }) + + return a } func mustRead(r io.Reader) []byte { @@ -135,12 +151,34 @@ func (a *API) Insert(ctx context.Context, channelID string, insertRequest *model return nil, fmt.Errorf("marshalling insert request: %w", err) } + enableCompression := a.config.enableCompression.Load() + + var ( + r io.Reader + payloadSize int + ) + + if enableCompression { + r, payloadSize, err = gzippedReader(reqJSON) + if err != nil { + return nil, fmt.Errorf("creating gzip reader: %w", err) + } + } else { + r = bytes.NewBuffer(reqJSON) + payloadSize = len(reqJSON) + } + + a.stats.insertRequestBodySize.Observe(float64(payloadSize)) + insertURL := a.clientURL + "/channels/" + channelID + "/insert" - req, err := http.NewRequestWithContext(ctx, http.MethodPost, insertURL, bytes.NewBuffer(reqJSON)) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, insertURL, r) if err != nil { return nil, fmt.Errorf("creating insert request: %w", err) } req.Header.Set("Content-Type", "application/json") + if enableCompression { + req.Header.Set("Content-Encoding", "gzip") + } resp, reqErr := a.requestDoer.Do(req) if reqErr != nil { @@ -184,3 +222,15 @@ func (a *API) GetStatus(ctx context.Context, channelID string) (*model.StatusRes } return &res, nil } + +func gzippedReader(reqJSON []byte) (io.Reader, int, error) { + var b bytes.Buffer + gz := gzip.NewWriter(&b) + if _, err := gz.Write(reqJSON); err != nil { + return nil, 0, fmt.Errorf("writing to gzip writer: %w", err) + } + if err := gz.Close(); err != nil { + return nil, 0, fmt.Errorf("closing gzip writer: %w", err) + } + return &b, b.Len(), nil +} diff --git a/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/api_integration_test.go b/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/api_integration_test.go index 1f057a9f0f2..f485a2757f0 100644 --- a/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/api_integration_test.go +++ b/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/api_integration_test.go @@ -13,6 +13,8 @@ import ( "github.com/samber/lo" "github.com/stretchr/testify/require" + "github.com/rudderlabs/rudder-go-kit/stats/memstats" + "github.com/rudderlabs/compose-test/compose" "github.com/rudderlabs/compose-test/testcompose" "github.com/rudderlabs/rudder-go-kit/config" @@ -33,7 +35,7 @@ type integrationTestConfig struct { db *sql.DB namespace string tableName string - snowpipeAPI *api.API + clientURL string } func TestAPIIntegration(t *testing.T) { @@ -49,78 +51,111 @@ func TestAPIIntegration(t *testing.T) { } t.Run("Create channel + Get channel + Insert data + Status", func(t *testing.T) { - ctx := context.Background() - testConfig := setupIntegrationTestConfig(t, ctx) - - t.Log("Creating channel") - createChannelRes, err := testConfig.snowpipeAPI.CreateChannel(ctx, &model.CreateChannelRequest{ - RudderIdentifier: "1", - Partition: "1", - AccountConfig: model.AccountConfig{ - Account: testConfig.credentials.Account, - User: testConfig.credentials.User, - Role: testConfig.credentials.Role, - PrivateKey: strings.ReplaceAll(testConfig.credentials.PrivateKey, "\n", "\\\\\n"), - PrivateKeyPassphrase: testConfig.credentials.PrivateKeyPassphrase, + testCases := []struct { + name string + enableCompression bool + payloadSize int + }{ + { + name: "Compression enabled", + enableCompression: true, + payloadSize: 378, }, - TableConfig: model.TableConfig{ - Database: testConfig.credentials.Database, - Schema: testConfig.namespace, - Table: testConfig.tableName, + { + name: "Compression disabled", + enableCompression: false, + payloadSize: 839, }, - }) - require.NoError(t, err) - require.NotEmpty(t, createChannelRes.ChannelID) - require.True(t, createChannelRes.Valid) - require.False(t, createChannelRes.Deleted) - require.EqualValues(t, whutils.ModelTableSchema{"ACTIVE": "boolean", "AGE": "int", "DOB": "datetime", "EMAIL": "string", "ID": "string", "NAME": "string"}, - createChannelRes.SnowpipeSchema, - ) - - t.Log("Getting channel") - getChannelRes, err := testConfig.snowpipeAPI.GetChannel(ctx, createChannelRes.ChannelID) - require.NoError(t, err) - require.Equal(t, createChannelRes, getChannelRes) - - rows := []model.Row{ - {"ID": "ID1", "NAME": "Alice Johnson", "EMAIL": "alice.johnson@example.com", "AGE": 28, "ACTIVE": true, "DOB": "1995-06-15T12:30:00Z"}, - {"ID": "ID2", "NAME": "Bob Smith", "EMAIL": "bob.smith@example.com", "AGE": 35, "ACTIVE": true, "DOB": "1988-01-20T09:30:00Z"}, - {"ID": "ID3", "NAME": "Charlie Brown", "EMAIL": "charlie.brown@example.com", "AGE": 22, "ACTIVE": false, "DOB": "2001-11-05T14:45:00Z"}, - {"ID": "ID4", "NAME": "Diana Prince", "EMAIL": "diana.prince@example.com", "AGE": 30, "ACTIVE": true, "DOB": "1993-08-18T08:15:00Z"}, - {"ID": "ID5", "NAME": "Eve Adams", "AGE": 45, "ACTIVE": true, "DOB": "1978-03-22T16:50:00Z"}, // -- No email - {"ID": "ID6", "NAME": "Frank Castle", "EMAIL": "frank.castle@example.com", "AGE": 38, "ACTIVE": false, "DOB": "1985-09-14T10:10:00Z"}, - {"ID": "ID7", "NAME": "Grace Hopper", "EMAIL": "grace.hopper@example.com", "AGE": 85, "ACTIVE": true, "DOB": "1936-12-09T11:30:00Z"}, } - t.Log("Inserting records") - insertRes, err := testConfig.snowpipeAPI.Insert(ctx, createChannelRes.ChannelID, &model.InsertRequest{ - Rows: rows, - Offset: "8", - }) - require.NoError(t, err) - require.Equal(t, &model.InsertResponse{Success: true, Errors: nil}, insertRes) - - t.Log("Checking status") - require.Eventually(t, func() bool { - statusRes, err := testConfig.snowpipeAPI.GetStatus(ctx, createChannelRes.ChannelID) - if err != nil { - t.Log("Error getting status:", err) - return false - } - return statusRes.Offset == "8" - }, - 30*time.Second, - 300*time.Millisecond, - ) - - t.Log("Checking records in warehouse") - records := whth.RetrieveRecordsFromWarehouse(t, testConfig.db, fmt.Sprintf(`SELECT ID, NAME, EMAIL, AGE, ACTIVE, DOB FROM %q.%q ORDER BY ID;`, testConfig.namespace, testConfig.tableName)) - require.ElementsMatch(t, convertRowsToRecord(rows, []string{"ID", "NAME", "EMAIL", "AGE", "ACTIVE", "DOB"}), records) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + testConfig := setupIntegrationTestConfig(t, ctx) + + c := config.New() + c.Set("SnowpipeStreaming.enableCompression", tc.enableCompression) + + statsStore, err := memstats.New() + require.NoError(t, err) + + snowpipeAPI := api.New(c, statsStore, testConfig.clientURL, http.DefaultClient) + + t.Log("Creating channel") + createChannelRes, err := snowpipeAPI.CreateChannel(ctx, &model.CreateChannelRequest{ + RudderIdentifier: "1", + Partition: "1", + AccountConfig: model.AccountConfig{ + Account: testConfig.credentials.Account, + User: testConfig.credentials.User, + Role: testConfig.credentials.Role, + PrivateKey: strings.ReplaceAll(testConfig.credentials.PrivateKey, "\n", "\\\\\n"), + PrivateKeyPassphrase: testConfig.credentials.PrivateKeyPassphrase, + }, + TableConfig: model.TableConfig{ + Database: testConfig.credentials.Database, + Schema: testConfig.namespace, + Table: testConfig.tableName, + }, + }) + require.NoError(t, err) + require.NotEmpty(t, createChannelRes.ChannelID) + require.True(t, createChannelRes.Valid) + require.False(t, createChannelRes.Deleted) + require.EqualValues(t, whutils.ModelTableSchema{"ACTIVE": "boolean", "AGE": "int", "DOB": "datetime", "EMAIL": "string", "ID": "string", "NAME": "string"}, + createChannelRes.SnowpipeSchema, + ) + + t.Log("Getting channel") + getChannelRes, err := snowpipeAPI.GetChannel(ctx, createChannelRes.ChannelID) + require.NoError(t, err) + require.Equal(t, createChannelRes, getChannelRes) + + rows := []model.Row{ + {"ID": "ID1", "NAME": "Alice Johnson", "EMAIL": "alice.johnson@example.com", "AGE": 28, "ACTIVE": true, "DOB": "1995-06-15T12:30:00Z"}, + {"ID": "ID2", "NAME": "Bob Smith", "EMAIL": "bob.smith@example.com", "AGE": 35, "ACTIVE": true, "DOB": "1988-01-20T09:30:00Z"}, + {"ID": "ID3", "NAME": "Charlie Brown", "EMAIL": "charlie.brown@example.com", "AGE": 22, "ACTIVE": false, "DOB": "2001-11-05T14:45:00Z"}, + {"ID": "ID4", "NAME": "Diana Prince", "EMAIL": "diana.prince@example.com", "AGE": 30, "ACTIVE": true, "DOB": "1993-08-18T08:15:00Z"}, + {"ID": "ID5", "NAME": "Eve Adams", "AGE": 45, "ACTIVE": true, "DOB": "1978-03-22T16:50:00Z"}, // -- No email + {"ID": "ID6", "NAME": "Frank Castle", "EMAIL": "frank.castle@example.com", "AGE": 38, "ACTIVE": false, "DOB": "1985-09-14T10:10:00Z"}, + {"ID": "ID7", "NAME": "Grace Hopper", "EMAIL": "grace.hopper@example.com", "AGE": 85, "ACTIVE": true, "DOB": "1936-12-09T11:30:00Z"}, + } + + t.Log("Inserting records") + insertRes, err := snowpipeAPI.Insert(ctx, createChannelRes.ChannelID, &model.InsertRequest{ + Rows: rows, + Offset: "8", + }) + require.NoError(t, err) + require.Equal(t, &model.InsertResponse{Success: true, Errors: nil}, insertRes) + require.EqualValues(t, tc.payloadSize, statsStore.Get("snowpipe_streaming_request_body_size", stats.Tags{ + "api": "insert", + }).LastValue()) + + t.Log("Checking status") + require.Eventually(t, func() bool { + statusRes, err := snowpipeAPI.GetStatus(ctx, createChannelRes.ChannelID) + if err != nil { + t.Log("Error getting status:", err) + return false + } + return statusRes.Offset == "8" + }, + 30*time.Second, + 300*time.Millisecond, + ) + + t.Log("Checking records in warehouse") + records := whth.RetrieveRecordsFromWarehouse(t, testConfig.db, fmt.Sprintf(`SELECT ID, NAME, EMAIL, AGE, ACTIVE, DOB FROM %q.%q ORDER BY ID;`, testConfig.namespace, testConfig.tableName)) + require.ElementsMatch(t, convertRowsToRecord(rows, []string{"ID", "NAME", "EMAIL", "AGE", "ACTIVE", "DOB"}), records) + }) + } }) t.Run("Create + Delete channel", func(t *testing.T) { ctx := context.Background() testConfig := setupIntegrationTestConfig(t, ctx) + snowpipeAPI := api.New(config.New(), stats.NOP, testConfig.clientURL, http.DefaultClient) t.Log("Creating channel") createChannelReq := &model.CreateChannelRequest{ @@ -139,22 +174,22 @@ func TestAPIIntegration(t *testing.T) { Table: testConfig.tableName, }, } - createChannelRes1, err := testConfig.snowpipeAPI.CreateChannel(ctx, createChannelReq) + createChannelRes1, err := snowpipeAPI.CreateChannel(ctx, createChannelReq) require.NoError(t, err) require.True(t, createChannelRes1.Valid) t.Log("Creating channel again, should return the same channel id") - createChannelRes2, err := testConfig.snowpipeAPI.CreateChannel(ctx, createChannelReq) + createChannelRes2, err := snowpipeAPI.CreateChannel(ctx, createChannelReq) require.NoError(t, err) require.True(t, createChannelRes2.Valid) require.Equal(t, createChannelRes1, createChannelRes2) t.Log("Deleting channel") - err = testConfig.snowpipeAPI.DeleteChannel(ctx, createChannelRes1.ChannelID, true) + err = snowpipeAPI.DeleteChannel(ctx, createChannelRes1.ChannelID, true) require.NoError(t, err) t.Log("Creating channel again, should return a new channel id") - createChannelRes3, err := testConfig.snowpipeAPI.CreateChannel(ctx, createChannelReq) + createChannelRes3, err := snowpipeAPI.CreateChannel(ctx, createChannelReq) require.NoError(t, err) require.NotEqual(t, createChannelRes1.ChannelID, createChannelRes3.ChannelID) }) @@ -199,14 +234,13 @@ func setupIntegrationTestConfig(t *testing.T, ctx context.Context) *integrationT require.NoError(t, sm.CreateTable(ctx, table, tableSchema)) snowpipeClientsURL := fmt.Sprintf("http://localhost:%d", c.Port("rudder-snowpipe-clients", 9078)) - a := api.New(snowpipeClientsURL, http.DefaultClient) return &integrationTestConfig{ credentials: credentials, db: sm.DB.DB, namespace: namespace, tableName: table, - snowpipeAPI: a, + clientURL: snowpipeClientsURL, } } diff --git a/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/api_test.go b/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/api_test.go index a2a5204dc1b..3eb8264ac54 100644 --- a/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/api_test.go +++ b/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api/api_test.go @@ -2,6 +2,7 @@ package api_test import ( "bytes" + "compress/gzip" "context" "encoding/json" "errors" @@ -12,6 +13,10 @@ import ( "github.com/stretchr/testify/require" + "github.com/rudderlabs/rudder-go-kit/stats" + + "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api" "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/model" whutils "github.com/rudderlabs/rudder-server/warehouse/utils" @@ -102,7 +107,7 @@ func TestAPI(t *testing.T) { ctx := context.Background() t.Run("Status=200(success=true)", func(t *testing.T) { - manager := api.New(successSnowpipeServer.URL, successSnowpipeServer.Client()) + manager := api.New(config.New(), stats.NOP, successSnowpipeServer.URL, successSnowpipeServer.Client()) res, err := manager.CreateChannel(ctx, ccr) require.NoError(t, err) require.EqualValues(t, &model.ChannelResponse{ @@ -118,7 +123,7 @@ func TestAPI(t *testing.T) { ) }) t.Run("Status=200(success=false)", func(t *testing.T) { - manager := api.New(failureSnowpipeServer.URL, failureSnowpipeServer.Client()) + manager := api.New(config.New(), stats.NOP, failureSnowpipeServer.URL, failureSnowpipeServer.Client()) res, err := manager.CreateChannel(ctx, ccr) require.NoError(t, err) require.EqualValues(t, &model.ChannelResponse{ @@ -134,7 +139,7 @@ func TestAPI(t *testing.T) { ) }) t.Run("Request failure", func(t *testing.T) { - manager := api.New(successSnowpipeServer.URL, &mockRequestDoer{ + manager := api.New(config.New(), stats.NOP, successSnowpipeServer.URL, &mockRequestDoer{ err: errors.New("bad client"), }) res, err := manager.CreateChannel(ctx, ccr) @@ -142,7 +147,7 @@ func TestAPI(t *testing.T) { require.Nil(t, res) }) t.Run("Request failure (non 200's status code)", func(t *testing.T) { - manager := api.New(successSnowpipeServer.URL, &mockRequestDoer{ + manager := api.New(config.New(), stats.NOP, successSnowpipeServer.URL, &mockRequestDoer{ response: &http.Response{ StatusCode: http.StatusBadRequest, Body: nopReadCloser{Reader: bytes.NewReader([]byte(`{}`))}, @@ -153,7 +158,7 @@ func TestAPI(t *testing.T) { require.Nil(t, res) }) t.Run("Request failure (invalid response)", func(t *testing.T) { - manager := api.New(successSnowpipeServer.URL, &mockRequestDoer{ + manager := api.New(config.New(), stats.NOP, successSnowpipeServer.URL, &mockRequestDoer{ response: &http.Response{ StatusCode: http.StatusOK, Body: nopReadCloser{Reader: bytes.NewReader([]byte(`{abd}`))}, @@ -184,25 +189,25 @@ func TestAPI(t *testing.T) { t.Run("Success", func(t *testing.T) { t.Run("sync=true", func(t *testing.T) { - manager := api.New(snowpipeServer.URL, snowpipeServer.Client()) + manager := api.New(config.New(), stats.NOP, snowpipeServer.URL, snowpipeServer.Client()) err := manager.DeleteChannel(ctx, channelID, true) require.NoError(t, err) }) t.Run("sync=false", func(t *testing.T) { - manager := api.New(snowpipeServer.URL, snowpipeServer.Client()) + manager := api.New(config.New(), stats.NOP, snowpipeServer.URL, snowpipeServer.Client()) err := manager.DeleteChannel(ctx, channelID, false) require.NoError(t, err) }) }) t.Run("Request failure", func(t *testing.T) { - manager := api.New(snowpipeServer.URL, &mockRequestDoer{ + manager := api.New(config.New(), stats.NOP, snowpipeServer.URL, &mockRequestDoer{ err: errors.New("bad client"), }) err := manager.DeleteChannel(ctx, channelID, true) require.Error(t, err) }) t.Run("Request failure (non 200's status code)", func(t *testing.T) { - manager := api.New(snowpipeServer.URL, &mockRequestDoer{ + manager := api.New(config.New(), stats.NOP, snowpipeServer.URL, &mockRequestDoer{ response: &http.Response{ StatusCode: http.StatusBadRequest, Body: nopReadCloser{Reader: bytes.NewReader([]byte(`{}`))}, @@ -230,7 +235,7 @@ func TestAPI(t *testing.T) { ctx := context.Background() t.Run("Success", func(t *testing.T) { - manager := api.New(snowpipeServer.URL, snowpipeServer.Client()) + manager := api.New(config.New(), stats.NOP, snowpipeServer.URL, snowpipeServer.Client()) res, err := manager.GetChannel(ctx, channelID) require.NoError(t, err) require.EqualValues(t, &model.ChannelResponse{ @@ -245,7 +250,7 @@ func TestAPI(t *testing.T) { ) }) t.Run("Request failure", func(t *testing.T) { - manager := api.New(snowpipeServer.URL, &mockRequestDoer{ + manager := api.New(config.New(), stats.NOP, snowpipeServer.URL, &mockRequestDoer{ err: errors.New("bad client"), }) res, err := manager.GetChannel(ctx, channelID) @@ -253,7 +258,7 @@ func TestAPI(t *testing.T) { require.Nil(t, res) }) t.Run("Request failure (non 200's status code)", func(t *testing.T) { - manager := api.New(snowpipeServer.URL, &mockRequestDoer{ + manager := api.New(config.New(), stats.NOP, snowpipeServer.URL, &mockRequestDoer{ response: &http.Response{ StatusCode: http.StatusBadRequest, Body: nopReadCloser{Reader: bytes.NewReader([]byte(`{}`))}, @@ -264,7 +269,7 @@ func TestAPI(t *testing.T) { require.Nil(t, res) }) t.Run("Request failure (invalid response)", func(t *testing.T) { - manager := api.New(snowpipeServer.URL, &mockRequestDoer{ + manager := api.New(config.New(), stats.NOP, snowpipeServer.URL, &mockRequestDoer{ response: &http.Response{ StatusCode: http.StatusOK, Body: nopReadCloser{Reader: bytes.NewReader([]byte(`{abd}`))}, @@ -287,7 +292,10 @@ func TestAPI(t *testing.T) { irJSON, err := json.Marshal(ir) require.NoError(t, err) - body, err := io.ReadAll(r.Body) + gz, err := gzip.NewReader(r.Body) + require.NoError(t, err) + + body, err := io.ReadAll(gz) require.NoError(t, err) require.NoError(t, r.Body.Close()) require.JSONEq(t, string(irJSON), string(body)) @@ -308,13 +316,13 @@ func TestAPI(t *testing.T) { ctx := context.Background() t.Run("Insert success", func(t *testing.T) { - manager := api.New(snowpipeServer.URL, snowpipeServer.Client()) + manager := api.New(config.New(), stats.NOP, snowpipeServer.URL, snowpipeServer.Client()) res, err := manager.Insert(ctx, successChannelID, ir) require.NoError(t, err) require.Equal(t, &model.InsertResponse{Success: true, Errors: nil}, res) }) t.Run("Insert failure", func(t *testing.T) { - manager := api.New(snowpipeServer.URL, snowpipeServer.Client()) + manager := api.New(config.New(), stats.NOP, snowpipeServer.URL, snowpipeServer.Client()) res, err := manager.Insert(ctx, failureChannelID, ir) require.NoError(t, err) require.Equal(t, &model.InsertResponse{ @@ -340,7 +348,7 @@ func TestAPI(t *testing.T) { ) }) t.Run("Request failure", func(t *testing.T) { - manager := api.New(snowpipeServer.URL, &mockRequestDoer{ + manager := api.New(config.New(), stats.NOP, snowpipeServer.URL, &mockRequestDoer{ err: errors.New("bad client"), response: &http.Response{ StatusCode: http.StatusOK, @@ -351,7 +359,7 @@ func TestAPI(t *testing.T) { require.Nil(t, res) }) t.Run("Request failure (non 200's status code)", func(t *testing.T) { - manager := api.New(snowpipeServer.URL, &mockRequestDoer{ + manager := api.New(config.New(), stats.NOP, snowpipeServer.URL, &mockRequestDoer{ response: &http.Response{ StatusCode: http.StatusBadRequest, Body: nopReadCloser{Reader: bytes.NewReader([]byte(`{}`))}, @@ -362,7 +370,7 @@ func TestAPI(t *testing.T) { require.Nil(t, res) }) t.Run("Request failure (invalid response)", func(t *testing.T) { - manager := api.New(snowpipeServer.URL, &mockRequestDoer{ + manager := api.New(config.New(), stats.NOP, snowpipeServer.URL, &mockRequestDoer{ response: &http.Response{ StatusCode: http.StatusOK, Body: nopReadCloser{Reader: bytes.NewReader([]byte(`{abd}`))}, @@ -391,7 +399,7 @@ func TestAPI(t *testing.T) { ctx := context.Background() t.Run("Success", func(t *testing.T) { - manager := api.New(snowpipeServer.URL, snowpipeServer.Client()) + manager := api.New(config.New(), stats.NOP, snowpipeServer.URL, snowpipeServer.Client()) res, err := manager.GetStatus(ctx, channelID) require.NoError(t, err) require.Equal(t, &model.StatusResponse{ @@ -403,7 +411,7 @@ func TestAPI(t *testing.T) { ) }) t.Run("Request failure", func(t *testing.T) { - manager := api.New(snowpipeServer.URL, &mockRequestDoer{ + manager := api.New(config.New(), stats.NOP, snowpipeServer.URL, &mockRequestDoer{ err: errors.New("bad client"), response: &http.Response{ StatusCode: http.StatusOK, @@ -414,7 +422,7 @@ func TestAPI(t *testing.T) { require.Nil(t, res) }) t.Run("Request failure (non 200's status code)", func(t *testing.T) { - manager := api.New(snowpipeServer.URL, &mockRequestDoer{ + manager := api.New(config.New(), stats.NOP, snowpipeServer.URL, &mockRequestDoer{ response: &http.Response{ StatusCode: http.StatusBadRequest, Body: nopReadCloser{Reader: bytes.NewReader([]byte(`{}`))}, @@ -425,7 +433,7 @@ func TestAPI(t *testing.T) { require.Nil(t, res) }) t.Run("Request failure (invalid response)", func(t *testing.T) { - manager := api.New(snowpipeServer.URL, &mockRequestDoer{ + manager := api.New(config.New(), stats.NOP, snowpipeServer.URL, &mockRequestDoer{ response: &http.Response{ StatusCode: http.StatusOK, Body: nopReadCloser{Reader: bytes.NewReader([]byte(`{abd}`))}, diff --git a/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go b/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go index f16353222f9..fef146dfdb0 100644 --- a/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go +++ b/router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go @@ -97,7 +97,7 @@ func New( m.api = newApiAdapter( m.logger, statsFactory, - snowpipeapi.New(m.config.client.url, m.requestDoer), + snowpipeapi.New(m.appConfig, m.statsFactory, m.config.client.url, m.requestDoer), destination, ) return m diff --git a/runner/buckets.go b/runner/buckets.go index f6e5ba51aed..7e2272c24f2 100644 --- a/runner/buckets.go +++ b/runner/buckets.go @@ -132,5 +132,10 @@ var ( float64(1 * bytesize.KB), float64(10 * bytesize.KB), float64(100 * bytesize.KB), float64(1 * bytesize.MB), float64(3 * bytesize.MB), float64(5 * bytesize.MB), float64(10 * bytesize.MB), }, + "snowpipe_streaming_request_body_size": { + float64(10 * bytesize.B), float64(100 * bytesize.B), + float64(1 * bytesize.KB), float64(10 * bytesize.KB), float64(100 * bytesize.KB), + float64(1 * bytesize.MB), float64(3 * bytesize.MB), float64(5 * bytesize.MB), float64(10 * bytesize.MB), + }, } ) From e46ce86ad08e224644bf37b51628b5cf105c7123 Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Fri, 6 Dec 2024 08:32:52 +0530 Subject: [PATCH 8/8] chore: drop upload id from wh_schema (#5336) --- .../000026_drop_wh_upload_id_column.up.sql | 2 ++ warehouse/api/http_test.go | 1 - warehouse/bcm/testdata/sql/namespace_test.sql | 4 ++-- warehouse/internal/model/schema.go | 1 - warehouse/internal/repo/schema.go | 9 +++------ warehouse/internal/repo/schema_test.go | 2 -- warehouse/router/state_export_data.go | 6 +++--- warehouse/router/upload.go | 2 +- warehouse/schema/schema.go | 13 ++++++------- warehouse/schema/schema_test.go | 5 ++--- 10 files changed, 19 insertions(+), 26 deletions(-) create mode 100644 sql/migrations/warehouse/000026_drop_wh_upload_id_column.up.sql diff --git a/sql/migrations/warehouse/000026_drop_wh_upload_id_column.up.sql b/sql/migrations/warehouse/000026_drop_wh_upload_id_column.up.sql new file mode 100644 index 00000000000..e7801586b0b --- /dev/null +++ b/sql/migrations/warehouse/000026_drop_wh_upload_id_column.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE wh_schemas ALTER COLUMN wh_upload_id DROP NOT NULL; + diff --git a/warehouse/api/http_test.go b/warehouse/api/http_test.go index cf796f3901d..cbb2cef601e 100644 --- a/warehouse/api/http_test.go +++ b/warehouse/api/http_test.go @@ -295,7 +295,6 @@ func TestHTTPApi(t *testing.T) { schemaRepo := repo.NewWHSchemas(db) _, err = schemaRepo.Insert(ctx, &model.WHSchema{ - UploadID: 1, SourceID: sourceID, Namespace: namespace, DestinationID: destinationID, diff --git a/warehouse/bcm/testdata/sql/namespace_test.sql b/warehouse/bcm/testdata/sql/namespace_test.sql index ba45a9fadad..1fb523c33be 100644 --- a/warehouse/bcm/testdata/sql/namespace_test.sql +++ b/warehouse/bcm/testdata/sql/namespace_test.sql @@ -1,6 +1,6 @@ BEGIN; -INSERT INTO wh_schemas(id,wh_upload_id, source_id, namespace, destination_id, destination_type, +INSERT INTO wh_schemas(id, source_id, namespace, destination_id, destination_type, schema, error, created_at, updated_at) -VALUES (1,1, 'test-sourceID', 'test-namespace', 'test-destinationID', 'POSTGRES','{}', NULL, +VALUES (1, 'test-sourceID', 'test-namespace', 'test-destinationID', 'POSTGRES','{}', NULL, '2022-12-06 15:23:37.100685', '2022-12-06 15:23:37.100685'); COMMIT; diff --git a/warehouse/internal/model/schema.go b/warehouse/internal/model/schema.go index 4855ab54a3c..d25ab0890e9 100644 --- a/warehouse/internal/model/schema.go +++ b/warehouse/internal/model/schema.go @@ -22,7 +22,6 @@ const ( type WHSchema struct { ID int64 - UploadID int64 SourceID string Namespace string DestinationID string diff --git a/warehouse/internal/repo/schema.go b/warehouse/internal/repo/schema.go index 1aad4673cd8..e65e8fe9724 100644 --- a/warehouse/internal/repo/schema.go +++ b/warehouse/internal/repo/schema.go @@ -17,7 +17,6 @@ const whSchemaTableName = warehouseutils.WarehouseSchemasTable const whSchemaTableColumns = ` id, - wh_upload_id, source_id, namespace, destination_id, @@ -53,20 +52,19 @@ func (sh *WHSchema) Insert(ctx context.Context, whSchema *model.WHSchema) (int64 err = sh.db.QueryRowContext(ctx, ` INSERT INTO `+whSchemaTableName+` ( - wh_upload_id, source_id, namespace, destination_id, + source_id, namespace, destination_id, destination_type, schema, created_at, updated_at ) VALUES - ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT ( + ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT ( source_id, destination_id, namespace ) DO UPDATE SET - schema = $6, + schema = $5, updated_at = $7 RETURNING id; `, - whSchema.UploadID, whSchema.SourceID, whSchema.Namespace, whSchema.DestinationID, @@ -125,7 +123,6 @@ func parseWHSchemas(rows *sqlmiddleware.Rows) ([]*model.WHSchema, error) { ) err := rows.Scan( &whSchema.ID, - &whSchema.UploadID, &whSchema.SourceID, &whSchema.Namespace, &whSchema.DestinationID, diff --git a/warehouse/internal/repo/schema_test.go b/warehouse/internal/repo/schema_test.go index 373f9296d06..b9478602bbd 100644 --- a/warehouse/internal/repo/schema_test.go +++ b/warehouse/internal/repo/schema_test.go @@ -57,7 +57,6 @@ func TestWHSchemasRepo(t *testing.T) { }, } schema = model.WHSchema{ - UploadID: 1, SourceID: sourceID, Namespace: namespace, DestinationID: destinationID, @@ -144,7 +143,6 @@ func TestWHSchemasRepo(t *testing.T) { t.Log("multiple") latestNamespace := "latest_namespace" schemaLatest := model.WHSchema{ - UploadID: 2, SourceID: sourceID, Namespace: latestNamespace, DestinationID: destinationID, diff --git a/warehouse/router/state_export_data.go b/warehouse/router/state_export_data.go index eae7785dec9..8a6777055b3 100644 --- a/warehouse/router/state_export_data.go +++ b/warehouse/router/state_export_data.go @@ -282,7 +282,7 @@ func (job *UploadJob) loadUserTables(loadFilesTableMap map[tableNameT]bool) ([]e if alteredIdentitySchema || alteredUserSchema { job.logger.Infof("loadUserTables: schema changed - updating local schema for %s", job.warehouse.Identifier) - _ = job.schemaHandle.UpdateLocalSchemaWithWarehouse(job.ctx, job.upload.ID) + _ = job.schemaHandle.UpdateLocalSchemaWithWarehouse(job.ctx) } return job.processLoadTableResponse(errorMap) } @@ -555,7 +555,7 @@ func (job *UploadJob) loadIdentityTables(populateHistoricIdentities bool) (loadE if alteredSchema { job.logger.Infof("loadIdentityTables: schema changed - updating local schema for %s", job.warehouse.Identifier) - _ = job.schemaHandle.UpdateLocalSchemaWithWarehouse(job.ctx, job.upload.ID) // TODO check error + _ = job.schemaHandle.UpdateLocalSchemaWithWarehouse(job.ctx) // TODO check error } return job.processLoadTableResponse(errorMap) @@ -698,7 +698,7 @@ func (job *UploadJob) loadAllTablesExcept(skipLoadForTables []string, loadFilesT if alteredSchemaInAtLeastOneTable.Load() { job.logger.Infof("loadAllTablesExcept: schema changed - updating local schema for %s", job.warehouse.Identifier) - _ = job.schemaHandle.UpdateLocalSchemaWithWarehouse(job.ctx, job.upload.ID) // TODO check error + _ = job.schemaHandle.UpdateLocalSchemaWithWarehouse(job.ctx) // TODO check error } return loadErrors diff --git a/warehouse/router/upload.go b/warehouse/router/upload.go index 4fc9ca18860..bdc66af74c6 100644 --- a/warehouse/router/upload.go +++ b/warehouse/router/upload.go @@ -904,5 +904,5 @@ func (job *UploadJob) GetLocalSchema(ctx context.Context) (model.Schema, error) } func (job *UploadJob) UpdateLocalSchema(ctx context.Context, schema model.Schema) error { - return job.schemaHandle.UpdateLocalSchema(ctx, job.upload.ID, schema) + return job.schemaHandle.UpdateLocalSchema(ctx, schema) } diff --git a/warehouse/schema/schema.go b/warehouse/schema/schema.go index 3caa6e41377..a65f3d82eb5 100644 --- a/warehouse/schema/schema.go +++ b/warehouse/schema/schema.go @@ -248,20 +248,20 @@ func (sh *Schema) isIDResolutionEnabled() bool { return sh.enableIDResolution && slices.Contains(whutils.IdentityEnabledWarehouses, sh.warehouse.Type) } -func (sh *Schema) UpdateLocalSchemaWithWarehouse(ctx context.Context, uploadID int64) error { +func (sh *Schema) UpdateLocalSchemaWithWarehouse(ctx context.Context) error { sh.schemaInWarehouseMu.RLock() defer sh.schemaInWarehouseMu.RUnlock() - return sh.updateLocalSchema(ctx, uploadID, sh.schemaInWarehouse) + return sh.updateLocalSchema(ctx, sh.schemaInWarehouse) } -func (sh *Schema) UpdateLocalSchema(ctx context.Context, uploadID int64, updatedSchema model.Schema) error { - return sh.updateLocalSchema(ctx, uploadID, updatedSchema) +func (sh *Schema) UpdateLocalSchema(ctx context.Context, updatedSchema model.Schema) error { + return sh.updateLocalSchema(ctx, updatedSchema) } // updateLocalSchema // 1. Inserts the updated schema into the local schema table // 2. Updates the local schema instance -func (sh *Schema) updateLocalSchema(ctx context.Context, uploadId int64, updatedSchema model.Schema) error { +func (sh *Schema) updateLocalSchema(ctx context.Context, updatedSchema model.Schema) error { updatedSchemaInBytes, err := json.Marshal(updatedSchema) if err != nil { return fmt.Errorf("marshaling schema: %w", err) @@ -269,7 +269,6 @@ func (sh *Schema) updateLocalSchema(ctx context.Context, uploadId int64, updated sh.stats.schemaSize.Observe(float64(len(updatedSchemaInBytes))) _, err = sh.schemaRepo.Insert(ctx, &model.WHSchema{ - UploadID: uploadId, SourceID: sh.warehouse.Source.ID, Namespace: sh.warehouse.Namespace, DestinationID: sh.warehouse.Destination.ID, @@ -312,7 +311,7 @@ func (sh *Schema) SyncRemoteSchema(ctx context.Context, fetchSchemaRepo fetchSch schemaChanged := sh.hasSchemaChanged(localSchema) if schemaChanged { - err := sh.updateLocalSchema(ctx, uploadID, sh.schemaInWarehouse) + err := sh.updateLocalSchema(ctx, sh.schemaInWarehouse) if err != nil { return false, fmt.Errorf("updating local schema: %w", err) } diff --git a/warehouse/schema/schema_test.go b/warehouse/schema/schema_test.go index d03fc89ed7c..c4996c9b990 100644 --- a/warehouse/schema/schema_test.go +++ b/warehouse/schema/schema_test.go @@ -78,7 +78,6 @@ func TestSchema_UpdateLocalSchema(t *testing.T) { destinationID := "test_destination_id" namespace := "test_namespace" warehouseType := warehouseutils.RS - uploadID := int64(1) tableName := "test_table" schemaInWarehouse := model.Schema{ @@ -181,7 +180,7 @@ func TestSchema_UpdateLocalSchema(t *testing.T) { ctx := context.Background() - err = s.UpdateLocalSchema(ctx, uploadID, tc.mockSchema.Schema) + err = s.UpdateLocalSchema(ctx, tc.mockSchema.Schema) if tc.wantError == nil { require.NoError(t, err) require.Equal(t, tc.wantSchema, s.localSchema) @@ -195,7 +194,7 @@ func TestSchema_UpdateLocalSchema(t *testing.T) { require.NoError(t, err) require.EqualValues(t, float64(len(marshalledSchema)), statsStore.Get("warehouse_schema_size", tags).LastValue()) - err = s.UpdateLocalSchemaWithWarehouse(ctx, uploadID) + err = s.UpdateLocalSchemaWithWarehouse(ctx) if tc.wantError == nil { require.NoError(t, err) require.Equal(t, schemaInWarehouse, s.localSchema)