Skip to content

Commit

Permalink
snowpipe: add metadata for SNOWPIPE_STREAMING_CLIENT_HISTORY
Browse files Browse the repository at this point in the history
I think this is what was missing...
  • Loading branch information
rockwotj committed Dec 16, 2024
1 parent 845be1a commit 5c43ce3
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 9 deletions.
1 change: 0 additions & 1 deletion internal/impl/snowflake/output_snowflake_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,6 @@ func newSnowflakeStreamer(
PrivateKey: rsaKey,
Logger: mgr.Logger(),
ConnectVersion: mgr.EngineVersion(),
Application: channelPrefix,
})
if err != nil {
return nil, err
Expand Down
1 change: 0 additions & 1 deletion internal/impl/snowflake/streaming/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func setup(t *testing.T) (*streaming.SnowflakeRestClient, *streaming.SnowflakeSe
Role: "ACCOUNTADMIN",
PrivateKey: parseResult.(*rsa.PrivateKey),
ConnectVersion: "",
Application: "development",
}
restClient, err := streaming.NewRestClient(
clientOptions.Account,
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/snowflake/streaming/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ type SnowflakeRestClient struct {
}

// NewRestClient creates a new REST client for the given parameters.
func NewRestClient(account, user, version, app string, privateKey *rsa.PrivateKey, logger *service.Logger) (c *SnowflakeRestClient, err error) {
func NewRestClient(account, user, version string, privateKey *rsa.PrivateKey, logger *service.Logger) (c *SnowflakeRestClient, err error) {
version = strings.TrimLeft(version, "v")
// Drop any -rc suffix, Snowflake doesn't like it
splits := strings.SplitN(version, "-", 2)
Expand Down
8 changes: 7 additions & 1 deletion internal/impl/snowflake/streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,13 @@ func (c *SnowflakeIngestionChannel) InsertRows(ctx context.Context, batch servic
uploader := uploaderResult.uploader
fullMD5Hash := md5.Sum(part.parquetFile)
err = backoff.Retry(func() error {
return uploader.upload(ctx, blobPath, part.parquetFile, fullMD5Hash[:])
return uploader.upload(ctx, blobPath, part.parquetFile, fullMD5Hash[:], map[string]string{
"ingestclientname": "RedpandaConnect_SnowpipeStreamingSDK",
"ingestclientkey": c.clientPrefix,
// TODO(rockwood): It's not clear what this digest is used for,
// so we omit it so that we don't have to compute both the sha and md5
// "sfc-digest": string(sha256.Sum256(part.parquetFile)[:]),
})
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Second), 3))
if err != nil {
return insertStats, fmt.Errorf("unable to upload to storage: %w", err)
Expand Down
19 changes: 14 additions & 5 deletions internal/impl/snowflake/streaming/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

gcs "cloud.google.com/go/storage"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
Expand All @@ -32,7 +33,7 @@ import (
)

type uploader interface {
upload(ctx context.Context, path string, encrypted, md5Hash []byte) error
upload(ctx context.Context, path string, encrypted, md5Hash []byte, metadata map[string]string) error
}

func newUploader(fileLocationInfo fileLocationInfo) (uploader, error) {
Expand Down Expand Up @@ -121,9 +122,15 @@ type azureUploader struct {
container, pathPrefix string
}

func (u *azureUploader) upload(ctx context.Context, path string, encrypted, md5Hash []byte) error {
func (u *azureUploader) upload(ctx context.Context, path string, encrypted, md5Hash []byte, metadata map[string]string) error {
// We upload in multiple parts, so we have to validate ourselves post upload 😒
resp, err := u.client.UploadBuffer(ctx, u.container, filepath.Join(u.pathPrefix, path), encrypted, nil)
md := map[string]*string{}
for k, v := range metadata {
val := v
md[k] = &val
}
o := blockblob.UploadBufferOptions{Metadata: md}
resp, err := u.client.UploadBuffer(ctx, u.container, filepath.Join(u.pathPrefix, path), encrypted, &o)
if err != nil {
return err
}
Expand All @@ -138,11 +145,12 @@ type s3Uploader struct {
bucket, pathPrefix string
}

func (u *s3Uploader) upload(ctx context.Context, path string, encrypted, md5Hash []byte) error {
func (u *s3Uploader) upload(ctx context.Context, path string, encrypted, md5Hash []byte, metadata map[string]string) error {
input := &s3.PutObjectInput{
Bucket: &u.bucket,
Key: aws.String(filepath.Join(u.pathPrefix, path)),
Body: bytes.NewReader(encrypted),
Metadata: metadata,
ContentMD5: aws.String(base64.StdEncoding.EncodeToString(md5Hash)),
}
_, err := u.client.Upload(ctx, input)
Expand All @@ -154,11 +162,12 @@ type gcsUploader struct {
pathPrefix string
}

func (u *gcsUploader) upload(ctx context.Context, path string, encrypted, md5Hash []byte) error {
func (u *gcsUploader) upload(ctx context.Context, path string, encrypted, md5Hash []byte, metadata map[string]string) error {
object := u.bucket.Object(filepath.Join(u.pathPrefix, path))
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ow := object.NewWriter(ctx)
ow.Metadata = metadata
ow.MD5 = md5Hash
for len(encrypted) > 0 {
n, err := ow.Write(encrypted)
Expand Down

0 comments on commit 5c43ce3

Please sign in to comment.