Skip to content

Commit

Permalink
implement schema watch for mysql
Browse files Browse the repository at this point in the history
Signed-off-by: Kartikay <[email protected]>
  • Loading branch information
kartikaysaxena committed Jan 31, 2025
1 parent 542053f commit dac084e
Show file tree
Hide file tree
Showing 10 changed files with 349 additions and 43 deletions.
2 changes: 1 addition & 1 deletion cmd/spicedb/migrate_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestMigrate(t *testing.T) {

t.Run(engineKey, func(t *testing.T) {
engineKey := engineKey
t.Parallel()

r := testdatastore.RunDatastoreEngineWithBridge(t, engineKey, bridgeNetworkName)
db := r.NewDatabase(t)

Expand Down
19 changes: 18 additions & 1 deletion cmd/spicedb/servetesting_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package main

import (
"bytes"
"context"
"fmt"
"io"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/authzed/grpcutil"
"github.com/google/uuid"
"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -240,7 +242,22 @@ func newTester(t *testing.T, containerOpts *dockertest.RunOptions, token string,
return err
})
if err != nil {
fmt.Printf("got error on startup: %v\n", err)
stream := new(bytes.Buffer)

waitCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

lerr := pool.Client.Logs(docker.LogsOptions{
Context: waitCtx,
OutputStream: stream,
ErrorStream: stream,
Stdout: true,
Stderr: true,
Container: resource.Container.ID,
})
require.NoError(t, lerr)

fmt.Printf("got error on startup: %v\ncontainer logs: %s\n", err, stream.String())
cleanup()
continue
}
Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/mysql/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestMySQLDatastoreDSNWithoutParseTime(t *testing.T) {
func TestMySQL8Datastore(t *testing.T) {
b := testdatastore.RunMySQLForTestingWithOptions(t, testdatastore.MySQLTesterOptions{MigrateForNewDatastore: true}, "")
dst := datastoreTester{b: b, t: t}
test.AllWithExceptions(t, test.DatastoreTesterFunc(dst.createDatastore), test.WithCategories(test.WatchSchemaCategory, test.WatchCheckpointsCategory), true)
test.AllWithExceptions(t, test.DatastoreTesterFunc(dst.createDatastore), test.WithCategories(test.WatchCheckpointsCategory), true)
additionalMySQLTests(t, b)
}

Expand Down
29 changes: 25 additions & 4 deletions internal/datastore/mysql/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type QueryBuilder struct {
ReadNamespaceQuery sq.SelectBuilder
DeleteNamespaceQuery sq.UpdateBuilder
DeleteNamespaceRelationshipsQuery sq.UpdateBuilder
QueryChangedNamespacesQuery sq.SelectBuilder

ReadCounterQuery sq.SelectBuilder
InsertCounterQuery sq.InsertBuilder
Expand All @@ -30,10 +31,11 @@ type QueryBuilder struct {
QueryChangedQuery sq.SelectBuilder
CountRelsQuery sq.SelectBuilder

WriteCaveatQuery sq.InsertBuilder
ReadCaveatQuery sq.SelectBuilder
ListCaveatsQuery sq.SelectBuilder
DeleteCaveatQuery sq.UpdateBuilder
WriteCaveatQuery sq.InsertBuilder
ReadCaveatQuery sq.SelectBuilder
ListCaveatsQuery sq.SelectBuilder
DeleteCaveatQuery sq.UpdateBuilder
QueryChangedCaveatsQuery sq.SelectBuilder
}

// NewQueryBuilder returns a new QueryBuilder instance. The migration
Expand All @@ -49,6 +51,7 @@ func NewQueryBuilder(driver *migrations.MySQLDriver) *QueryBuilder {
builder.WriteNamespaceQuery = writeNamespace(driver.Namespace())
builder.ReadNamespaceQuery = readNamespace(driver.Namespace())
builder.DeleteNamespaceQuery = deleteNamespace(driver.Namespace())
builder.QueryChangedNamespacesQuery = changedNamespaces(driver.Namespace())

// counters builders
builder.ReadCounterQuery = readCounter(driver.RelationshipCounters())
Expand All @@ -71,6 +74,7 @@ func NewQueryBuilder(driver *migrations.MySQLDriver) *QueryBuilder {
builder.ListCaveatsQuery = listCaveats(driver.Caveat())
builder.WriteCaveatQuery = writeCaveat(driver.Caveat())
builder.DeleteCaveatQuery = deleteCaveat(driver.Caveat())
builder.QueryChangedCaveatsQuery = changedCaveats(driver.Caveat())

return &builder
}
Expand Down Expand Up @@ -223,3 +227,20 @@ func queryChanged(tableTuple string) sq.SelectBuilder {
colDeletedTxn,
).From(tableTuple)
}

func changedCaveats(tableTuple string) sq.SelectBuilder {
return sb.Select(
colName,
colCaveatDefinition,
colCreatedTxn,
colDeletedTxn,
).From(tableTuple)
}

func changedNamespaces(tableTuple string) sq.SelectBuilder {
return sb.Select(
colConfig,
colCreatedTxn,
colDeletedTxn,
).From(tableTuple)
}
160 changes: 151 additions & 9 deletions internal/datastore/mysql/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package mysql
import (
"context"
"errors"
"fmt"
"time"

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/internal/datastore/revisions"
"github.com/authzed/spicedb/pkg/datastore"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/tuple"

sq "github.com/Masterminds/squirrel"
Expand All @@ -29,12 +31,6 @@ func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revi
updates := make(chan *datastore.RevisionChanges, watchBufferLength)
errs := make(chan error, 1)

if options.Content&datastore.WatchSchema == datastore.WatchSchema {
close(updates)
errs <- errors.New("schema watch unsupported in MySQL")
return updates, errs
}

if options.EmissionStrategy == datastore.EmitImmediatelyStrategy {
close(updates)
errs <- errors.New("emit immediately strategy is unsupported in MySQL")
Expand Down Expand Up @@ -177,7 +173,30 @@ func (mds *Datastore) loadChanges(
}

// Load the changes relationships for the revision range.
sql, args, err = mds.QueryChangedQuery.Where(sq.Or{
if err := mds.loadRelationshipChanges(ctx, afterRevision, newRevision, stagedChanges); err != nil {
return nil, 0, err
}

// Load namespace changes for the revision range.
if options.Content&datastore.WatchSchema == datastore.WatchSchema {
if err := mds.loadNamespaceChanges(ctx, afterRevision, newRevision, stagedChanges); err != nil {
return nil, 0, err
}
}

// Load caveat changes for the revision range.
if options.Content&datastore.WatchSchema == datastore.WatchSchema {
if err := mds.loadCaveatChanges(ctx, afterRevision, newRevision, stagedChanges); err != nil {
return nil, 0, err
}
}

changes, err = stagedChanges.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc)
return
}

func (mds *Datastore) loadRelationshipChanges(ctx context.Context, afterRevision uint64, newRevision uint64, stagedChanges *common.Changes[revisions.TransactionIDRevision, uint64]) (err error) {
sql, args, err := mds.QueryChangedQuery.Where(sq.Or{
sq.And{
sq.Gt{colCreatedTxn: afterRevision},
sq.LtOrEq{colCreatedTxn: newRevision},
Expand All @@ -191,7 +210,7 @@ func (mds *Datastore) loadChanges(
return
}

rows, err = mds.db.QueryContext(ctx, sql, args...)
rows, err := mds.db.QueryContext(ctx, sql, args...)
if err != nil {
if errors.Is(err, context.Canceled) {
err = datastore.NewWatchCanceledErr()
Expand Down Expand Up @@ -265,7 +284,130 @@ func (mds *Datastore) loadChanges(
if err = rows.Err(); err != nil {
return
}
return
}

func (mds *Datastore) loadNamespaceChanges(ctx context.Context, afterRevision uint64, newRevision uint64, stagedChanges *common.Changes[revisions.TransactionIDRevision, uint64]) (err error) {
sql, args, err := mds.QueryChangedNamespacesQuery.Where(sq.Or{
sq.And{
sq.Gt{colCreatedTxn: afterRevision},
sq.LtOrEq{colCreatedTxn: newRevision},
},
sq.And{
sq.Gt{colDeletedTxn: afterRevision},
sq.LtOrEq{colDeletedTxn: newRevision},
},
}).ToSql()
if err != nil {
return fmt.Errorf("unable to prepare changes SQL: %w", err)
}

rows, err := mds.db.QueryContext(ctx, sql, args...)
if err != nil {
if errors.Is(err, context.Canceled) {
err = datastore.NewWatchCanceledErr()
}
return
}
defer common.LogOnError(ctx, rows.Close)

for rows.Next() {
var createdTxn uint64
var deletedTxn uint64
var config []byte

err = rows.Scan(
&config,
&createdTxn,
&deletedTxn,
)
if err != nil {
return
}
loaded := &core.NamespaceDefinition{}
if err := loaded.UnmarshalVT(config); err != nil {
return fmt.Errorf("unable to parse changed namespace: %w", err)
}

if createdTxn > afterRevision && createdTxn <= newRevision {
if err = stagedChanges.AddChangedDefinition(ctx, revisions.NewForTransactionID(createdTxn), loaded); err != nil {
return
}
}
if deletedTxn > afterRevision && deletedTxn <= newRevision {
if err = stagedChanges.AddDeletedNamespace(ctx, revisions.NewForTransactionID(deletedTxn), loaded.Name); err != nil {
return
}
}
}

if err = rows.Err(); err != nil {
return fmt.Errorf("unable to load changes: %w", err)
}

return
}

func (mds *Datastore) loadCaveatChanges(ctx context.Context, afterRevision uint64, newRevision uint64, stagedChanges *common.Changes[revisions.TransactionIDRevision, uint64]) (err error) {
sql, args, err := mds.QueryChangedCaveatsQuery.Where(sq.Or{
sq.And{
sq.Gt{colCreatedTxn: afterRevision},
sq.LtOrEq{colCreatedTxn: newRevision},
},
sq.And{
sq.Gt{colDeletedTxn: afterRevision},
sq.LtOrEq{colDeletedTxn: newRevision},
},
}).ToSql()
if err != nil {
return fmt.Errorf("unable to prepare changes SQL: %w", err)
}

rows, err := mds.db.QueryContext(ctx, sql, args...)
if err != nil {
if errors.Is(err, context.Canceled) {
err = datastore.NewWatchCanceledErr()
}
return
}

defer common.LogOnError(ctx, rows.Close)

for rows.Next() {
var createdTxn uint64
var deletedTxn uint64
var config []byte
var name string

err = rows.Scan(
&name,
&config,
&createdTxn,
&deletedTxn,
)
if err != nil {
return fmt.Errorf("unable to parse changed caveat: %w", err)
}
loaded := &core.CaveatDefinition{}
if err := loaded.UnmarshalVT(config); err != nil {
return fmt.Errorf(errUnableToReadConfig, err)
}

if createdTxn > afterRevision && createdTxn <= newRevision {
if err = stagedChanges.AddChangedDefinition(ctx, revisions.NewForTransactionID(createdTxn), loaded); err != nil {
return
}
}
if deletedTxn > afterRevision && deletedTxn <= newRevision {
if err = stagedChanges.AddDeletedCaveat(ctx, revisions.NewForTransactionID(deletedTxn), loaded.Name); err != nil {
return
}
}
}

if err = rows.Err(); err != nil {
return fmt.Errorf("unable to load changes: %w", err)
}

changes, err = stagedChanges.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc)
return
}
Loading

0 comments on commit dac084e

Please sign in to comment.