Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements TRANSACTION_TAG and STATEMENT_TAG system variables #76

Merged
merged 2 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 44 additions & 45 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,10 @@ and `{}` for a mutually exclusive keyword.
| Show Query Result Shape | `DESCRIBE SELECT ...;` | |
| Show DML Result Shape | `DESCRIBE {INSERT\|UPDATE\|DELETE} ... THEN RETURN ...;` | |
| Start a new query optimizer statistics package construction | `ANALYZE;` | |
| Start Read-Write Transaction | `BEGIN [RW] [PRIORITY {HIGH\|MEDIUM\|LOW}] [TAG <tag>];` | See [Request Priority](#request-priority) for details on the priority. The tag you set is used as both transaction tag and request tag. See also [Transaction Tags and Request Tags](#transaction-tags-and-request-tags).|
| Start Read-Write Transaction | `BEGIN [RW] [PRIORITY {HIGH\|MEDIUM\|LOW}];` | See [Request Priority](#request-priority) for details on the priority. The tag you set is used as both transaction tag and request tag.|
| Commit Read-Write Transaction | `COMMIT;` | |
| Rollback Read-Write Transaction | `ROLLBACK;` | |
| Start Read-Only Transaction | `BEGIN RO [{<seconds>\|<RFC3339-formatted time>}] [PRIORITY {HIGH\|MEDIUM\|LOW}] [TAG <tag>];` | `<seconds>` and `<RFC3339-formatted time>` is used for stale read. See [Request Priority](#request-priority) for details on the priority. The tag you set is used as request tag. See also [Transaction Tags and Request Tags](#transaction-tags-and-request-tags).|
| Start Read-Only Transaction | `BEGIN RO [{<seconds>\|<RFC3339-formatted time>}] [PRIORITY {HIGH\|MEDIUM\|LOW}];` | `<seconds>` and `<RFC3339-formatted time>` is used for stale read. See [Request Priority](#request-priority) for details on the priority.|
| End Read-Only Transaction | `CLOSE;` | |
| Test root-partitionable | `TRY PARTITIONED QUERY <sql>` ||
| Show partition tokens of partition query | `PARTITION <sql>` ||
Expand Down Expand Up @@ -431,45 +431,28 @@ Note that transaction-level priority takes precedence over command-level priorit

## Transaction Tags and Request Tags

In a read-write transaction, you can add a tag following `BEGIN RW TAG <tag>`.
spanner-mycli adds the tag set in `BEGIN RW TAG` as a transaction tag.
The tag will also be used as request tags within the transaction.
You can set transaction tag using `SET TRANSACTION_TAG = "<tag>"`, and request tag using `SET STATEMENT_TAG = "<tag>"`.

```
# Read-write transaction
# transaction_tag = tx1
+--------------------+
| BEGIN RW TAG tx1; |
| |
| SELECT val |
| FROM tab1 +-----request_tag = tx1
| WHERE id = 1; |
| |
| UPDATE tab1 |
| SET val = 10 +-----request_tag = tx1
| WHERE id = 1; |
| |
| COMMIT; |
+--------------------+
+------------------------------+
| SET TRANSACTION_TAG = "tx1"; |
| BEGIN RW; |
| |
| SET STATEMENT_TAG = "req1"; |
| SELECT val |
| FROM tab1 +--------------transaction_tag = tx1, request_tag = req1
| WHERE id = 1; |
| |
| SET STATEMENT_TAG = "req2"; |
| |
| UPDATE tab1 |
| SET val = 10 +--------------transaction_tag = tx1, request_tag = req2
| WHERE id = 1; |
| |
| COMMIT; +--------------transaction_tag = tx1
+------------------------------+
```

In a read-only transaction, you can add a tag following `BEGIN RO TAG <tag>`.
Since read-only transaction doesn't support transaction tag, spanner-mycli adds the tag set in `BEGIN RO TAG` as request tags.
```
# Read-only transaction
# transaction_tag = N/A
+--------------------+
| BEGIN RO TAG tx2; |
| |
| SELECT SUM(val) |
| FROM tab1 +-----request_tag = tx2
| WHERE id = 1; |
| |
| CLOSE; |
+--------------------+
```


## Using with the Cloud Spanner Emulator

This tool supports the [Cloud Spanner Emulator](https://cloud.google.com/spanner/docs/emulator) via the [`SPANNER_EMULATOR_HOST` environment variable](https://cloud.google.com/spanner/docs/emulator#client-libraries).
Expand All @@ -496,14 +479,16 @@ This section describes some notable features of spanner-mycli, they are not appe

They have almost same semantics with [Spanner JDBC properties](https://cloud.google.com/spanner/docs/jdbc-session-mgmt-commands?hl=en)

| Name | Type | Example |
|------------------------------|------------|--------------------------------------|
| READ_ONLY_STALENESS | READ_WRITE | `"analyze_20241017_15_59_17UTC"` |
| OPTIMIZER_VERSION | READ_WRITE | `"7"` |
| OPTIMIZER_STATISTICS_PACKAGE | READ_WRITE | `"7"` |
| RPC_PRIORITY | READ_WRITE | `"MEDIUM"` |
| READ_TIMESTAMP | READ_ONLY | `"2024-11-01T05:28:58.943332+09:00"` |
| COMMIT_RESPONSE | READ_ONLY | `"2024-11-01T05:31:11.311894+09:00"` |
| Name | Type | Example |
|------------------------------|------------|-----------------------------------------------------|
| READ_ONLY_STALENESS | READ_WRITE | `"analyze_20241017_15_59_17UTC"` |
| OPTIMIZER_VERSION | READ_WRITE | `"7"` |
| OPTIMIZER_STATISTICS_PACKAGE | READ_WRITE | `"7"` |
| RPC_PRIORITY | READ_WRITE | `"MEDIUM"` |
| READ_TIMESTAMP | READ_ONLY | `"2024-11-01T05:28:58.943332+09:00"` |
| COMMIT_RESPONSE | READ_ONLY | `"2024-11-01T05:31:11.311894+09:00"` |
| TRANSACTION_TAG | READ_WRITE | `"app=concert,env=dev,action=update"` |
| STATEMENT_TAG | READ_WRITE | `"app=concert,env=dev,action=update,request=fetch"` |

#### spanner-mycli original variables

Expand Down Expand Up @@ -1043,6 +1028,20 @@ Or run test except integration tests.
$ make fasttest
```

## Incompatibilities from spanner-cli

In principle, spanner-mycli accepts the same input as spanner-cli, but some compatibility is intentionally not maintained.

- `BEGIN RW TAG <tag>` and `BEGIN RO TAG <tag>` are no longer supported.
- Use `SET TRANSACTION TAG = "<tag>"` and `SET STATEMENT_TAG = "<tag>"`.
- Rationale: spanner-cli are broken. https://github.com/cloudspannerecosystem/spanner-cli/issues/132
- `\G` is no longer supported.
- Use `SET CLI_FORMAT = "VERTICAL"`.
- Rationale: `\G` is not compatible with [GoogleSQL lexical structure](https://cloud.google.com/spanner/docs/reference/standard-sql/lexical) and [memefish](https://github.com/cloudspannerecosystem/memefish).
- `\` is no longer used for prompt expansions.
- Use `%` instead.
- Rationale: `\` is needed to be escaped in ini files of [jassevdk/go-flags](https://github.com/jessevdk/go-flags).

## TODO

* Show secondary index by "SHOW CREATE TABLE"
19 changes: 12 additions & 7 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func NewSession(ctx context.Context, sysVars *systemVariables, opts ...option.Cl
adminClient: adminClient,
systemVariables: sysVars,
}
sysVars.CurrentSession = session
go session.startHeartbeat()

return session, nil
Expand All @@ -146,7 +147,7 @@ func (s *Session) InReadOnlyTransaction() bool {
}

// BeginReadWriteTransaction starts read-write transaction.
func (s *Session) BeginReadWriteTransaction(ctx context.Context, priority sppb.RequestOptions_Priority, tag string) error {
func (s *Session) BeginReadWriteTransaction(ctx context.Context, priority sppb.RequestOptions_Priority) error {
if s.InReadWriteTransaction() {
return errors.New("read-write transaction is already running")
}
Expand All @@ -160,11 +161,13 @@ func (s *Session) BeginReadWriteTransaction(ctx context.Context, priority sppb.R
priority = s.systemVariables.RPCPriority
}

tag := s.systemVariables.TransactionTag
opts := spanner.TransactionOptions{
CommitOptions: spanner.CommitOptions{ReturnCommitStats: true},
CommitPriority: priority,
TransactionTag: tag,
}

txn, err := spanner.NewReadWriteStmtBasedTransactionWithOptions(ctx, s.client, opts)
if err != nil {
return err
Expand Down Expand Up @@ -206,7 +209,7 @@ func (s *Session) RollbackReadWriteTransaction(ctx context.Context) error {
}

// BeginReadOnlyTransaction starts read-only transaction and returns the snapshot timestamp for the transaction if successful.
func (s *Session) BeginReadOnlyTransaction(ctx context.Context, typ timestampBoundType, staleness time.Duration, timestamp time.Time, priority sppb.RequestOptions_Priority, tag string) (time.Time, error) {
func (s *Session) BeginReadOnlyTransaction(ctx context.Context, typ timestampBoundType, staleness time.Duration, timestamp time.Time, priority sppb.RequestOptions_Priority) (time.Time, error) {
if s.InReadOnlyTransaction() {
return time.Time{}, errors.New("read-only transaction is already running")
}
Expand Down Expand Up @@ -234,7 +237,6 @@ func (s *Session) BeginReadOnlyTransaction(ctx context.Context, typ timestampBou
}

s.tc = &transactionContext{
tag: tag,
priority: priority,
roTxn: txn,
}
Expand Down Expand Up @@ -295,18 +297,17 @@ func (s *Session) runQueryWithOptions(ctx context.Context, stmt spanner.Statemen

opts.Options.OptimizerVersion = s.systemVariables.OptimizerVersion
opts.Options.OptimizerStatisticsPackage = s.systemVariables.OptimizerStatisticsPackage
opts.RequestTag = s.systemVariables.RequestTag

switch {
case s.InReadWriteTransaction():
// The current Go Spanner client library does not apply client-level directed read options to read-write transactions.
// Therefore, we explicitly set query-level options here to fail the query during a read-write transaction.
opts.DirectedReadOptions = s.clientConfig.DirectedReadOptions
opts.RequestTag = s.tc.tag
iter := s.tc.rwTxn.QueryWithOptions(ctx, stmt, opts)
s.tc.sendHeartbeat = true
return iter, nil
case s.InReadOnlyTransaction():
opts.RequestTag = s.tc.tag
return s.tc.roTxn.QueryWithOptions(ctx, stmt, opts), s.tc.roTxn
default:
txn := s.client.Single()
Expand All @@ -329,7 +330,11 @@ func (s *Session) RunUpdate(ctx context.Context, stmt spanner.Statement, useUpda

opts := spanner.QueryOptions{
Priority: s.currentPriority(),
RequestTag: s.tc.tag,
RequestTag: s.systemVariables.RequestTag,
Options: &sppb.ExecuteSqlRequest_QueryOptions{
OptimizerVersion: s.systemVariables.OptimizerVersion,
OptimizerStatisticsPackage: s.systemVariables.OptimizerStatisticsPackage,
},
}

// Workaround: Usually, we can execute DMLs using Query(ExecuteStreamingSql RPC),
Expand Down Expand Up @@ -486,7 +491,7 @@ func (s *Session) RunInNewOrExistRwTx(ctx context.Context,
var implicitRWTx bool
if !s.InReadWriteTransaction() {
// Start implicit transaction.
if err := s.BeginReadWriteTransaction(ctx, s.currentPriority(), ""); err != nil {
if err := s.BeginReadWriteTransaction(ctx, s.currentPriority()); err != nil {
return 0, spanner.CommitResponse{}, nil, nil, err
}
implicitRWTx = true
Expand Down
4 changes: 2 additions & 2 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestRequestPriority(t *testing.T) {
}

// Read-Write Transaction.
if err := session.BeginReadWriteTransaction(ctx, test.transactionPriority, ""); err != nil {
if err := session.BeginReadWriteTransaction(ctx, test.transactionPriority); err != nil {
t.Fatalf("failed to begin read write transaction: %v", err)
}
iter, _ := session.RunQuery(ctx, spanner.NewStatement("SELECT * FROM t1"))
Expand All @@ -97,7 +97,7 @@ func TestRequestPriority(t *testing.T) {
}

// Read-Only Transaction.
if _, err := session.BeginReadOnlyTransaction(ctx, strong, 0, time.Now(), test.transactionPriority, ""); err != nil {
if _, err := session.BeginReadOnlyTransaction(ctx, strong, 0, time.Now(), test.transactionPriority); err != nil {
t.Fatalf("failed to begin read only transaction: %v", err)
}
iter, _ = session.RunQueryWithStats(ctx, spanner.NewStatement("SELECT * FROM t1"))
Expand Down
19 changes: 5 additions & 14 deletions statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ var (
truncateTableRe = regexp.MustCompile(`(?is)^TRUNCATE\s+TABLE\s+(.+)$`)

// Transaction
beginRwRe = regexp.MustCompile(`(?is)^BEGIN(?:\s+RW)?(?:\s+PRIORITY\s+(HIGH|MEDIUM|LOW))?(?:\s+TAG\s+(.+))?$`)
beginRoRe = regexp.MustCompile(`(?is)^BEGIN\s+RO(?:\s+([^\s]+))?(?:\s+PRIORITY\s+(HIGH|MEDIUM|LOW))?(?:\s+TAG\s+(.+))?$`)
beginRwRe = regexp.MustCompile(`(?is)^BEGIN(?:\s+RW)?(?:\s+PRIORITY\s+(HIGH|MEDIUM|LOW))?$`)
beginRoRe = regexp.MustCompile(`(?is)^BEGIN\s+RO(?:\s+([^\s]+))?(?:\s+PRIORITY\s+(HIGH|MEDIUM|LOW))?$`)
commitRe = regexp.MustCompile(`(?is)^COMMIT$`)
rollbackRe = regexp.MustCompile(`(?is)^ROLLBACK$`)
closeRe = regexp.MustCompile(`(?is)^CLOSE$`)
Expand Down Expand Up @@ -544,6 +544,7 @@ func (s *ShowVariablesStatement) Execute(ctx context.Context, session *Session)
if v.Getter == nil {
continue
}

value, err := v.Getter(session.systemVariables, k)
if errors.Is(err, errIgnored) {
continue
Expand Down Expand Up @@ -953,7 +954,6 @@ func (s *ExplainAnalyzeDmlStatement) Execute(ctx context.Context, session *Sessi

type BeginRwStatement struct {
Priority sppb.RequestOptions_Priority
Tag string
}

func newBeginRwStatement(input string) (*BeginRwStatement, error) {
Expand All @@ -968,10 +968,6 @@ func newBeginRwStatement(input string) (*BeginRwStatement, error) {
stmt.Priority = priority
}

if matched[2] != "" {
stmt.Tag = matched[2]
}

return stmt, nil
}

Expand All @@ -984,7 +980,7 @@ func (s *BeginRwStatement) Execute(ctx context.Context, session *Session) (*Resu
return nil, errors.New("you're in read-only transaction. Please finish the transaction by 'CLOSE;'")
}

if err := session.BeginReadWriteTransaction(ctx, s.Priority, s.Tag); err != nil {
if err := session.BeginReadWriteTransaction(ctx, s.Priority); err != nil {
return nil, err
}

Expand Down Expand Up @@ -1046,7 +1042,6 @@ type BeginRoStatement struct {
Staleness time.Duration
Timestamp time.Time
Priority sppb.RequestOptions_Priority
Tag string
}

func newBeginRoStatement(input string) (*BeginRoStatement, error) {
Expand Down Expand Up @@ -1078,10 +1073,6 @@ func newBeginRoStatement(input string) (*BeginRoStatement, error) {
stmt.Priority = priority
}

if matched[3] != "" {
stmt.Tag = matched[3]
}

return stmt, nil
}

Expand All @@ -1096,7 +1087,7 @@ func (s *BeginRoStatement) Execute(ctx context.Context, session *Session) (*Resu
}
}

ts, err := session.BeginReadOnlyTransaction(ctx, s.TimestampBoundType, s.Staleness, s.Timestamp, s.Priority, s.Tag)
ts, err := session.BeginReadOnlyTransaction(ctx, s.TimestampBoundType, s.Staleness, s.Timestamp, s.Priority)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading