Skip to content

Commit

Permalink
snowpipe: remove application query param which does nothing
Browse files Browse the repository at this point in the history
  • Loading branch information
rockwotj committed Dec 16, 2024
1 parent b6eec24 commit 845be1a
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 24 deletions.
2 changes: 1 addition & 1 deletion internal/impl/snowflake/output_snowflake_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func newSnowflakeStreamer(
return err
}
}
restClient, err := streaming.NewRestClient(account, user, mgr.EngineVersion(), channelPrefix, rsaKey, mgr.Logger())
restClient, err := streaming.NewRestClient(account, user, mgr.EngineVersion(), rsaKey, mgr.Logger())
if err != nil {
return nil, fmt.Errorf("unable to create rest API client: %w", err)
}
Expand Down
1 change: 0 additions & 1 deletion internal/impl/snowflake/streaming/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func setup(t *testing.T) (*streaming.SnowflakeRestClient, *streaming.SnowflakeSe
clientOptions.Account,
clientOptions.User,
clientOptions.ConnectVersion,
"Redpanda_Connect_"+clientOptions.Application,
clientOptions.PrivateKey,
clientOptions.Logger,
)
Expand Down
32 changes: 13 additions & 19 deletions internal/impl/snowflake/streaming/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"

Expand Down Expand Up @@ -301,10 +300,9 @@ type (
type SnowflakeRestClient struct {
account string
user string
app string
privateKey *rsa.PrivateKey
client *http.Client
userAgent string
version string
logger *service.Logger

authRefreshLoop *asyncroutine.Periodic
Expand All @@ -324,16 +322,13 @@ func NewRestClient(account, user, version, app string, privateKey *rsa.PrivateKe
// this should only show up in development, not released binaries
version = "99.0.0"
}
userAgent := fmt.Sprintf("RedpandaConnect_SnowpipeStreamingSDK/%v", version)
debugf(logger, "making snowflake HTTP requests using User-Agent: %s", userAgent)
c = &SnowflakeRestClient{
account: account,
user: user,
client: http.DefaultClient,
privateKey: privateKey,
userAgent: userAgent,
logger: logger,
app: url.QueryEscape("Redpanda_Connect_" + strings.TrimPrefix(app, "Redpanda_Connect_")),
version: version,
cachedJWT: typed.NewAtomicValue(""),
authRefreshLoop: asyncroutine.NewPeriodic(
time.Hour-(2*time.Minute),
Expand Down Expand Up @@ -385,42 +380,42 @@ func (c *SnowflakeRestClient) computeJWT() (string, error) {
// we don't have to handle async requests.
func (c *SnowflakeRestClient) RunSQL(ctx context.Context, req RunSQLRequest) (resp RunSQLResponse, err error) {
requestID := uuid.NewString()
err = c.doPost(ctx, fmt.Sprintf("https://%s.snowflakecomputing.com/api/v2/statements?application=%s&requestId=%s", c.account, c.app, requestID), req, &resp)
err = c.doPost(ctx, fmt.Sprintf("https://%s.snowflakecomputing.com/api/v2/statements?requestId=%s", c.account, requestID), req, &resp)
return
}

// configureClient configures a client for Snowpipe Streaming.
func (c *SnowflakeRestClient) configureClient(ctx context.Context, req clientConfigureRequest) (resp clientConfigureResponse, err error) {
requestID := uuid.NewString()
err = c.doPost(ctx, fmt.Sprintf("https://%s.snowflakecomputing.com/v1/streaming/client/configure?application=%s&requestId=%s", c.account, c.app, requestID), req, &resp)
err = c.doPost(ctx, fmt.Sprintf("https://%s.snowflakecomputing.com/v1/streaming/client/configure?requestId=%s", c.account, requestID), req, &resp)
return
}

// channelStatus returns the status of a given channel
func (c *SnowflakeRestClient) channelStatus(ctx context.Context, req batchChannelStatusRequest) (resp batchChannelStatusResponse, err error) {
requestID := uuid.NewString()
err = c.doPost(ctx, fmt.Sprintf("https://%s.snowflakecomputing.com/v1/streaming/channels/status?application=%s&requestId=%s", c.account, c.app, requestID), req, &resp)
err = c.doPost(ctx, fmt.Sprintf("https://%s.snowflakecomputing.com/v1/streaming/channels/status?requestId=%s", c.account, requestID), req, &resp)
return
}

// openChannel opens a channel for writing
func (c *SnowflakeRestClient) openChannel(ctx context.Context, req openChannelRequest) (resp openChannelResponse, err error) {
requestID := uuid.NewString()
err = c.doPost(ctx, fmt.Sprintf("https://%s.snowflakecomputing.com/v1/streaming/channels/open?application=%s&requestId=%s", c.account, c.app, requestID), req, &resp)
err = c.doPost(ctx, fmt.Sprintf("https://%s.snowflakecomputing.com/v1/streaming/channels/open?requestId=%s", c.account, requestID), req, &resp)
return
}

// dropChannel drops a channel when it's no longer in use.
func (c *SnowflakeRestClient) dropChannel(ctx context.Context, req dropChannelRequest) (resp dropChannelResponse, err error) {
requestID := uuid.NewString()
err = c.doPost(ctx, fmt.Sprintf("https://%s.snowflakecomputing.com/v1/streaming/channels/drop?application=%s&requestId=%s", c.account, c.app, requestID), req, &resp)
err = c.doPost(ctx, fmt.Sprintf("https://%s.snowflakecomputing.com/v1/streaming/channels/drop?requestId=%s", c.account, requestID), req, &resp)
return
}

// registerBlob registers a blob in object storage to be ingested into Snowflake.
func (c *SnowflakeRestClient) registerBlob(ctx context.Context, req registerBlobRequest) (resp registerBlobResponse, err error) {
requestID := uuid.NewString()
err = c.doPost(ctx, fmt.Sprintf("https://%s.snowflakecomputing.com/v1/streaming/channels/write/blobs?application=%s&requestId=%s", c.account, c.app, requestID), req, &resp)
err = c.doPost(ctx, fmt.Sprintf("https://%s.snowflakecomputing.com/v1/streaming/channels/write/blobs?requestId=%s", c.account, requestID), req, &resp)
return
}

Expand Down Expand Up @@ -450,12 +445,11 @@ func (c *SnowflakeRestClient) doPost(ctx context.Context, url string, req any, r
} else if err != nil {
return nil, fmt.Errorf("unable to make http request: %w", err)
}
httpReq.Header.Add("Content-Type", "application/json")
httpReq.Header.Add("Accept", "application/json")
httpReq.Header.Add("User-Agent", c.userAgent)
jwt := c.cachedJWT.Load()
httpReq.Header.Add("Authorization", "Bearer "+jwt)
httpReq.Header.Add("X-Snowflake-Authorization-Token-Type", "KEYPAIR_JWT")
httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("Accept", "application/json")
httpReq.Header.Set("User-Agent", fmt.Sprintf("RedpandaConnect_SnowpipeStreamingSDK/%v", c.version))
httpReq.Header.Set("X-Snowflake-Authorization-Token-Type", "KEYPAIR_JWT")
httpReq.Header.Set("Authorization", "Bearer "+c.cachedJWT.Load())
r, err := c.client.Do(httpReq)
if errors.Is(err, context.Canceled) {
return nil, backoff.Permanent(err)
Expand Down
5 changes: 2 additions & 3 deletions internal/impl/snowflake/streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ type ClientOptions struct {
// Private key for the user
PrivateKey *rsa.PrivateKey
// Logger for... logging?
Logger *service.Logger
Logger *service.Logger
// Connect version for the User-Agent in Snowflake
ConnectVersion string
Application string
}

type stageUploaderResult struct {
Expand Down Expand Up @@ -77,7 +77,6 @@ func NewSnowflakeServiceClient(ctx context.Context, opts ClientOptions) (*Snowfl
opts.Account,
opts.User,
opts.ConnectVersion,
opts.Application,
opts.PrivateKey,
opts.Logger,

Check failure on line 81 in internal/impl/snowflake/streaming/streaming.go

View workflow job for this annotation

GitHub Actions / golangci-lint

not enough arguments in call to NewRestClient

Check failure on line 81 in internal/impl/snowflake/streaming/streaming.go

View workflow job for this annotation

GitHub Actions / golangci-lint

not enough arguments in call to NewRestClient

Check failure on line 81 in internal/impl/snowflake/streaming/streaming.go

View workflow job for this annotation

GitHub Actions / golangci-lint

not enough arguments in call to NewRestClient

Check failure on line 81 in internal/impl/snowflake/streaming/streaming.go

View workflow job for this annotation

GitHub Actions / golangci-lint

not enough arguments in call to NewRestClient

Check failure on line 81 in internal/impl/snowflake/streaming/streaming.go

View workflow job for this annotation

GitHub Actions / test

not enough arguments in call to NewRestClient
)
Expand Down

0 comments on commit 845be1a

Please sign in to comment.