Skip to content

Commit

Permalink
[AC-243] UntilQueryLambdaTagPropagated (#177)
Browse files Browse the repository at this point in the history
  • Loading branch information
pmenglund authored Nov 9, 2023
1 parent 881c3e0 commit 4d2f53a
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 4 deletions.
22 changes: 19 additions & 3 deletions query_lambdas.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,16 @@ func (rc *RockClient) UpdateQueryLambda(ctx context.Context, workspace, name, sq
return resp.GetData(), nil
}

// CreateQueryLambdaTag creates a new tag for the query lambda version.
// CreateQueryLambdaTag creates a new tag for a specific query lambda version, or update the tag if it already exists.
// Note that the tag propagation takes time, and the wait.UntilQueryLambdaTagPropagated() method should be called
// after updating an existing tag, to avoid eventual consistency issues.
//
// https://docs.rockset.com/rest-api/#createquerylambdatag
// If strong consistency of the version is required when executing a query lambda,
// option.WithVersion() must be used, e.g.
//
// r, err := rc.ExecuteQueryLambda(ctx, ws, ql, option.WithVersion("f79fc3eae5c823bb"))
//
// https://docs.rockset.com/documentation/reference/createquerylambdatag
func (rc *RockClient) CreateQueryLambdaTag(ctx context.Context, workspace, name, version,
tag string) (openapi.QueryLambdaTag, error) {
var err error
Expand Down Expand Up @@ -190,7 +197,16 @@ func (rc *RockClient) DeleteQueryLambdaTag(ctx context.Context, workspace, name,
return nil
}

// ExecuteQueryLambda executes a query lambda with optional query options.
// ExecuteQueryLambda executes a query lambda with optional query options. If neither option.WithTag() nor
// option.WithVersion() is used, it makes the call using option.WithTag(LatestTag)
//
// If strong consistency of the version is required when executing a query lambda,
// option.WithVersion() must be used, e.g.
//
// r, err := rc.ExecuteQueryLambda(ctx, ws, ql, option.WithVersion("f79fc3eae5c823bb"))
//
// https://docs.rockset.com/documentation/reference/executequerylambdabytag
// https://docs.rockset.com/documentation/reference/executequerylambda
func (rc *RockClient) ExecuteQueryLambda(ctx context.Context, workspace, name string,
options ...option.QueryLambdaOption) (openapi.QueryResponse, error) {
req := option.ExecuteQueryLambdaRequest{
Expand Down
19 changes: 18 additions & 1 deletion wait/query_lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package wait

import (
"context"
"time"

"github.com/rockset/rockset-go-client/option"
)
Expand All @@ -14,7 +15,8 @@ func (w *Waiter) UntilQueryLambdaVersionGone(ctx context.Context, workspace, nam
}))
}

// UntilQueryLambdaVersionActive waits until the Virtual Instance is active.
// UntilQueryLambdaVersionActive waits until the query lambda version is active. Returns an error if the query lambda
// contains invalid SQL.
func (w *Waiter) UntilQueryLambdaVersionActive(ctx context.Context, workspace, name, version string) error {
return w.rc.RetryWithCheck(ctx,
ResourceHasState(ctx,
Expand All @@ -24,3 +26,18 @@ func (w *Waiter) UntilQueryLambdaVersionActive(ctx context.Context, workspace, n
return option.QueryLambdaState(ql.GetState()), err
}))
}

// UntilQueryLambdaTagPropagated waits until the query lambda tag has been propagated throughout the system.
func (w *Waiter) UntilQueryLambdaTagPropagated(ctx context.Context, workspace, name, tag string) error {
t := time.NewTimer(queryLambdaTagPropagation) // eww
defer t.Stop()

select {
case <-ctx.Done():
return ctx.Err()
case <-t.C:
return nil
}
}

const queryLambdaTagPropagation = 2 * time.Minute

0 comments on commit 4d2f53a

Please sign in to comment.