From 3f2e270d85d50396f2f42ffe56a643f42a2abd63 Mon Sep 17 00:00:00 2001 From: sashayakovtseva Date: Tue, 6 Feb 2024 21:49:42 +0800 Subject: [PATCH 1/3] Rewrite ydb queries with table prefix Signed-off-by: sashayakovtseva --- internal/datastore/ydb/common/common.go | 57 +++++++++++++++++++ internal/datastore/ydb/migrations/driver.go | 46 ++++++++++++--- internal/datastore/ydb/migrations/manager.go | 12 +++- .../zz_migration.0001_initial_schema.go | 24 +++++--- 4 files changed, 120 insertions(+), 19 deletions(-) create mode 100644 internal/datastore/ydb/common/common.go diff --git a/internal/datastore/ydb/common/common.go b/internal/datastore/ydb/common/common.go new file mode 100644 index 0000000000..da25e974d0 --- /dev/null +++ b/internal/datastore/ydb/common/common.go @@ -0,0 +1,57 @@ +package common + +import ( + "bytes" + "net/url" + "sync" +) + +type DSN struct { + OriginalDSN string + TablePathPrefix string +} + +// ParseDSN is used to extract custom parameters from YDB DSN that are used to alter datastore behaviour. +// The following parameters are recognized: +// +// - table_path_prefix: string. Will be added to all queries. +func ParseDSN(dsn string) DSN { + parsedDSN := DSN{OriginalDSN: dsn} + + uri, err := url.Parse(dsn) + if err != nil { + // don't care about the error since ydb.Open will parse dsn again. + return parsedDSN + } + + params := uri.Query() + parsedDSN.TablePathPrefix = params.Get("table_path_prefix") + + return parsedDSN +} + +var bytesPool = sync.Pool{ + New: func() interface{} { + return &bytes.Buffer{} + }, +} + +func RewriteQuery(query string, tablePathPrefix string) string { + if tablePathPrefix == "" { + return query + } + + buffer := bytesPool.Get().(*bytes.Buffer) + defer func() { + buffer.Reset() + bytesPool.Put(buffer) + }() + + buffer.WriteString("PRAGMA TablePathPrefix(\"") + buffer.WriteString(tablePathPrefix) + buffer.WriteString("\");\n\n") + buffer.WriteString(query) + + return buffer.String() + +} diff --git a/internal/datastore/ydb/migrations/driver.go b/internal/datastore/ydb/migrations/driver.go index c207ad649e..6e6ec1c6f0 100644 --- a/internal/datastore/ydb/migrations/driver.go +++ b/internal/datastore/ydb/migrations/driver.go @@ -14,12 +14,13 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/table/types" "github.com/ydb-platform/ydb-go-sdk/v3/trace" + "github.com/authzed/spicedb/internal/datastore/ydb/common" log "github.com/authzed/spicedb/internal/logging" "github.com/authzed/spicedb/pkg/migrate" "github.com/authzed/spicedb/pkg/truetime" ) -var _ migrate.Driver[table.Client, table.TransactionActor] = (*YDBDriver)(nil) +var _ migrate.Driver[TableClientWithOptions, TxActorWithOptions] = (*YDBDriver)(nil) const ( queryLoadVersion = ` @@ -40,13 +41,20 @@ INSERT INTO schema_version(version_num, created_at_unix_nano) VALUES ($newVersio ` ) +type options struct { + tablePathPrefix string +} + // YDBDriver implements a schema migration facility for use in SpiceDB's YDB datastore. type YDBDriver struct { db *ydb.Driver + options } // NewYDBDriver creates a new driver with active connections to the database specified. func NewYDBDriver(ctx context.Context, dsn string) (*YDBDriver, error) { + parsedDSN := common.ParseDSN(dsn) + db, err := ydb.Open(ctx, dsn, ydbZerolog.WithTraces(&log.Logger, trace.DatabaseSQLEvents), ydbOtel.WithTraces(), @@ -55,7 +63,12 @@ func NewYDBDriver(ctx context.Context, dsn string) (*YDBDriver, error) { return nil, fmt.Errorf("unable to instantiate YDBDriver: %w", err) } - return &YDBDriver{db}, nil + return &YDBDriver{ + db: db, + options: options{ + tablePathPrefix: parsedDSN.TablePathPrefix, + }, + }, nil } // Version returns the current version of the schema in the backing datastore. @@ -67,7 +80,12 @@ func (d *YDBDriver) Version(ctx context.Context) (string, error) { ) err := d.db.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - _, res, err := s.Execute(ctx, table.DefaultTxControl(), queryLoadVersion, nil) + _, res, err := s.Execute( + ctx, + table.DefaultTxControl(), + common.RewriteQuery(queryLoadVersion, d.tablePathPrefix), + nil, + ) if err != nil { return err } @@ -93,8 +111,10 @@ func (d *YDBDriver) Version(ctx context.Context) (string, error) { return loaded, nil } -func (d *YDBDriver) WriteVersion(ctx context.Context, tx table.TransactionActor, version, replaced string) error { - res, err := tx.Execute(ctx, queryWriteVersion, +func (d *YDBDriver) WriteVersion(ctx context.Context, tx TxActorWithOptions, version, _ string) error { + res, err := tx.tx.Execute( + ctx, + common.RewriteQuery(queryWriteVersion, tx.opts.tablePathPrefix), table.NewQueryParameters( table.ValueParam("$newVersion", types.TextValue(version)), table.ValueParam("$createAtUnixNano", types.Int64Value(truetime.UnixNano())), @@ -109,12 +129,20 @@ func (d *YDBDriver) WriteVersion(ctx context.Context, tx table.TransactionActor, } // Conn returns the underlying table client instance for this driver. -func (d *YDBDriver) Conn() table.Client { - return d.db.Table() +func (d *YDBDriver) Conn() TableClientWithOptions { + return TableClientWithOptions{ + client: d.db.Table(), + opts: d.options, + } } -func (d *YDBDriver) RunTx(ctx context.Context, f migrate.TxMigrationFunc[table.TransactionActor]) error { - return d.db.Table().DoTx(ctx, table.TxOperation(f)) +func (d *YDBDriver) RunTx(ctx context.Context, f migrate.TxMigrationFunc[TxActorWithOptions]) error { + return d.db.Table().DoTx(ctx, func(ctx context.Context, tx table.TransactionActor) error { + return f(ctx, TxActorWithOptions{ + tx: tx, + opts: d.options, + }) + }) } // Close disposes the driver. diff --git a/internal/datastore/ydb/migrations/manager.go b/internal/datastore/ydb/migrations/manager.go index 3dfc89fe5e..8c0eabb13c 100644 --- a/internal/datastore/ydb/migrations/manager.go +++ b/internal/datastore/ydb/migrations/manager.go @@ -7,4 +7,14 @@ import ( ) // YDBMigrations implements a migration manager for the YDBDriver. -var YDBMigrations = migrate.NewManager[*YDBDriver, table.Client, table.TransactionActor]() +var YDBMigrations = migrate.NewManager[*YDBDriver, TableClientWithOptions, TxActorWithOptions]() + +type TableClientWithOptions struct { + client table.Client + opts options +} + +type TxActorWithOptions struct { + tx table.TransactionActor + opts options +} diff --git a/internal/datastore/ydb/migrations/zz_migration.0001_initial_schema.go b/internal/datastore/ydb/migrations/zz_migration.0001_initial_schema.go index 5bd658565c..c5aa463854 100644 --- a/internal/datastore/ydb/migrations/zz_migration.0001_initial_schema.go +++ b/internal/datastore/ydb/migrations/zz_migration.0001_initial_schema.go @@ -4,6 +4,8 @@ import ( "context" "github.com/ydb-platform/ydb-go-sdk/v3/table" + + "github.com/authzed/spicedb/internal/datastore/ydb/common" ) // YDB doesn't support unique secondary indexes, so one should manually check row uniqueness before insert. @@ -78,14 +80,14 @@ CREATE TABLE relation_tuple ( ) func init() { - err := YDBMigrations.Register("initial", "", func(ctx context.Context, client table.Client) error { - return client.Do(ctx, func(ctx context.Context, s table.Session) error { + err := YDBMigrations.Register("initial", "", func(ctx context.Context, client TableClientWithOptions) error { + return client.client.Do(ctx, func(ctx context.Context, s table.Session) error { statements := []string{ - createSchemaVersion, - createUniqueIDTable, - createNamespaceConfig, - createCaveat, - createRelationTuple, + common.RewriteQuery(createSchemaVersion, client.opts.tablePathPrefix), + common.RewriteQuery(createUniqueIDTable, client.opts.tablePathPrefix), + common.RewriteQuery(createNamespaceConfig, client.opts.tablePathPrefix), + common.RewriteQuery(createCaveat, client.opts.tablePathPrefix), + common.RewriteQuery(createRelationTuple, client.opts.tablePathPrefix), } for _, stmt := range statements { if err := s.ExecuteSchemeQuery(ctx, stmt); err != nil { @@ -95,8 +97,12 @@ func init() { return nil }) - }, func(ctx context.Context, tx table.TransactionActor) error { - _, err := tx.Execute(ctx, insertUniqueID, &table.QueryParameters{}) + }, func(ctx context.Context, tx TxActorWithOptions) error { + _, err := tx.tx.Execute( + ctx, + common.RewriteQuery(insertUniqueID, tx.opts.tablePathPrefix), + &table.QueryParameters{}, + ) return err }) if err != nil { From 3fe0b7eca3535d1b442e6544a74a7efc8bdf4e26 Mon Sep 17 00:00:00 2001 From: sashayakovtseva Date: Tue, 6 Feb 2024 21:57:54 +0800 Subject: [PATCH 2/3] Run a single container for datastore tests Signed-off-by: sashayakovtseva --- internal/testserver/datastore/ydb.go | 45 +++++++++++++++++----------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/internal/testserver/datastore/ydb.go b/internal/testserver/datastore/ydb.go index b09b774691..9fc98e38f8 100644 --- a/internal/testserver/datastore/ydb.go +++ b/internal/testserver/datastore/ydb.go @@ -16,6 +16,7 @@ import ( ydbDatastore "github.com/authzed/spicedb/internal/datastore/ydb" ydbMigrations "github.com/authzed/spicedb/internal/datastore/ydb/migrations" + "github.com/authzed/spicedb/pkg/secrets" "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/migrate" @@ -33,7 +34,6 @@ type ydbTester struct { hostname string port string - dsn string } // RunYDBForTesting returns a RunningEngineForTest for YDB. @@ -41,23 +41,13 @@ func RunYDBForTesting(t testing.TB, bridgeNetworkName string) RunningEngineForTe pool, err := dockertest.NewPool("") require.NoError(t, err) - return ydbTester{ - pool: pool, - bridgeNetworkName: bridgeNetworkName, - } -} - -func (r ydbTester) NewDatabase(t testing.TB) string { - // there's no easy way to create new database in a local YDB, so - // create a new container with default /local database instead. - containerName := fmt.Sprintf("ydb-%s", uuid.New().String()) hostname := "localhost" - if r.bridgeNetworkName != "" { + if bridgeNetworkName != "" { hostname = containerName } - resource, err := r.pool.RunWithOptions(&dockertest.RunOptions{ + resource, err := pool.RunWithOptions(&dockertest.RunOptions{ Name: containerName, Hostname: hostname, Repository: "ghcr.io/ydb-platform/local-ydb", @@ -66,12 +56,14 @@ func (r ydbTester) NewDatabase(t testing.TB) string { "YDB_USE_IN_MEMORY_PDISKS=true", "YDB_FEATURE_FLAGS=enable_not_null_data_columns", }, - NetworkID: r.bridgeNetworkName, + NetworkID: bridgeNetworkName, }) require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, r.pool.Purge(resource)) }) + t.Cleanup(func() { require.NoError(t, pool.Purge(resource)) }) - require.NoError(t, r.pool.Retry(func() error { // await container is ready + // await container is ready. + // since YDB has internal cluster discovery we can't check availability from outside network. + require.NoError(t, pool.Retry(func() error { var buf bytes.Buffer code, err := resource.Exec([]string{ @@ -96,11 +88,28 @@ func (r ydbTester) NewDatabase(t testing.TB) string { })) port := resource.GetPort(fmt.Sprintf("%d/tcp", ydbGRPCPort)) - if r.bridgeNetworkName != "" { + if bridgeNetworkName != "" { port = strconv.FormatInt(ydbGRPCPort, 10) } - dsn := fmt.Sprintf("grpc://%s:%s/%s", hostname, port, ydbDefaultDatabase) + return ydbTester{ + pool: pool, + bridgeNetworkName: bridgeNetworkName, + hostname: hostname, + port: port, + } +} + +func (r ydbTester) NewDatabase(t testing.TB) string { + // there's no easy way to create new database in a local YDB, + // so create a new directory instead. + + uniquePortion, err := secrets.TokenHex(4) + require.NoError(t, err) + + directory := fmt.Sprintf("/%s/%s", ydbDefaultDatabase, uniquePortion) + dsn := fmt.Sprintf("grpc://%s:%s/%s?table_path_prefix=%s", r.hostname, r.port, ydbDefaultDatabase, directory) + return dsn } From 63287c8c5f50af504ccad9bd30e10adac7a3ffb5 Mon Sep 17 00:00:00 2001 From: sashayakovtseva Date: Wed, 7 Feb 2024 19:42:25 +0800 Subject: [PATCH 3/3] Rename RewriteQuery->AddTablePrefix Signed-off-by: sashayakovtseva --- internal/datastore/ydb/common/common.go | 2 +- internal/datastore/ydb/migrations/driver.go | 4 ++-- .../migrations/zz_migration.0001_initial_schema.go | 12 ++++++------ 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/internal/datastore/ydb/common/common.go b/internal/datastore/ydb/common/common.go index da25e974d0..1005f7ff84 100644 --- a/internal/datastore/ydb/common/common.go +++ b/internal/datastore/ydb/common/common.go @@ -36,7 +36,7 @@ var bytesPool = sync.Pool{ }, } -func RewriteQuery(query string, tablePathPrefix string) string { +func AddTablePrefix(query string, tablePathPrefix string) string { if tablePathPrefix == "" { return query } diff --git a/internal/datastore/ydb/migrations/driver.go b/internal/datastore/ydb/migrations/driver.go index 6e6ec1c6f0..9310700ca0 100644 --- a/internal/datastore/ydb/migrations/driver.go +++ b/internal/datastore/ydb/migrations/driver.go @@ -83,7 +83,7 @@ func (d *YDBDriver) Version(ctx context.Context) (string, error) { _, res, err := s.Execute( ctx, table.DefaultTxControl(), - common.RewriteQuery(queryLoadVersion, d.tablePathPrefix), + common.AddTablePrefix(queryLoadVersion, d.tablePathPrefix), nil, ) if err != nil { @@ -114,7 +114,7 @@ func (d *YDBDriver) Version(ctx context.Context) (string, error) { func (d *YDBDriver) WriteVersion(ctx context.Context, tx TxActorWithOptions, version, _ string) error { res, err := tx.tx.Execute( ctx, - common.RewriteQuery(queryWriteVersion, tx.opts.tablePathPrefix), + common.AddTablePrefix(queryWriteVersion, tx.opts.tablePathPrefix), table.NewQueryParameters( table.ValueParam("$newVersion", types.TextValue(version)), table.ValueParam("$createAtUnixNano", types.Int64Value(truetime.UnixNano())), diff --git a/internal/datastore/ydb/migrations/zz_migration.0001_initial_schema.go b/internal/datastore/ydb/migrations/zz_migration.0001_initial_schema.go index c5aa463854..df0e404a3d 100644 --- a/internal/datastore/ydb/migrations/zz_migration.0001_initial_schema.go +++ b/internal/datastore/ydb/migrations/zz_migration.0001_initial_schema.go @@ -83,11 +83,11 @@ func init() { err := YDBMigrations.Register("initial", "", func(ctx context.Context, client TableClientWithOptions) error { return client.client.Do(ctx, func(ctx context.Context, s table.Session) error { statements := []string{ - common.RewriteQuery(createSchemaVersion, client.opts.tablePathPrefix), - common.RewriteQuery(createUniqueIDTable, client.opts.tablePathPrefix), - common.RewriteQuery(createNamespaceConfig, client.opts.tablePathPrefix), - common.RewriteQuery(createCaveat, client.opts.tablePathPrefix), - common.RewriteQuery(createRelationTuple, client.opts.tablePathPrefix), + common.AddTablePrefix(createSchemaVersion, client.opts.tablePathPrefix), + common.AddTablePrefix(createUniqueIDTable, client.opts.tablePathPrefix), + common.AddTablePrefix(createNamespaceConfig, client.opts.tablePathPrefix), + common.AddTablePrefix(createCaveat, client.opts.tablePathPrefix), + common.AddTablePrefix(createRelationTuple, client.opts.tablePathPrefix), } for _, stmt := range statements { if err := s.ExecuteSchemeQuery(ctx, stmt); err != nil { @@ -100,7 +100,7 @@ func init() { }, func(ctx context.Context, tx TxActorWithOptions) error { _, err := tx.tx.Execute( ctx, - common.RewriteQuery(insertUniqueID, tx.opts.tablePathPrefix), + common.AddTablePrefix(insertUniqueID, tx.opts.tablePathPrefix), &table.QueryParameters{}, ) return err