Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support table path prefix for YDB #2

Merged
merged 3 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions internal/datastore/ydb/common/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package common

import (
"bytes"
"net/url"
"sync"
)

type DSN struct {
OriginalDSN string
TablePathPrefix string
}

// ParseDSN is used to extract custom parameters from YDB DSN that are used to alter datastore behaviour.
// The following parameters are recognized:
//
// - table_path_prefix: string. Will be added to all queries.
func ParseDSN(dsn string) DSN {
parsedDSN := DSN{OriginalDSN: dsn}

uri, err := url.Parse(dsn)
if err != nil {
// don't care about the error since ydb.Open will parse dsn again.
return parsedDSN
}

params := uri.Query()
parsedDSN.TablePathPrefix = params.Get("table_path_prefix")
sashayakovtseva marked this conversation as resolved.
Show resolved Hide resolved

return parsedDSN
}

var bytesPool = sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
}

func AddTablePrefix(query string, tablePathPrefix string) string {
if tablePathPrefix == "" {
return query
}

buffer := bytesPool.Get().(*bytes.Buffer)
defer func() {
buffer.Reset()
bytesPool.Put(buffer)
}()

buffer.WriteString("PRAGMA TablePathPrefix(\"")
buffer.WriteString(tablePathPrefix)
buffer.WriteString("\");\n\n")
buffer.WriteString(query)

return buffer.String()

}
46 changes: 37 additions & 9 deletions internal/datastore/ydb/migrations/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"

"github.com/authzed/spicedb/internal/datastore/ydb/common"
log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/migrate"
"github.com/authzed/spicedb/pkg/truetime"
)

var _ migrate.Driver[table.Client, table.TransactionActor] = (*YDBDriver)(nil)
var _ migrate.Driver[TableClientWithOptions, TxActorWithOptions] = (*YDBDriver)(nil)

const (
queryLoadVersion = `
Expand All @@ -40,13 +41,20 @@ INSERT INTO schema_version(version_num, created_at_unix_nano) VALUES ($newVersio
`
)

type options struct {
tablePathPrefix string
}

// YDBDriver implements a schema migration facility for use in SpiceDB's YDB datastore.
type YDBDriver struct {
db *ydb.Driver
options
}

// NewYDBDriver creates a new driver with active connections to the database specified.
func NewYDBDriver(ctx context.Context, dsn string) (*YDBDriver, error) {
parsedDSN := common.ParseDSN(dsn)

db, err := ydb.Open(ctx, dsn,
ydbZerolog.WithTraces(&log.Logger, trace.DatabaseSQLEvents),
ydbOtel.WithTraces(),
Expand All @@ -55,7 +63,12 @@ func NewYDBDriver(ctx context.Context, dsn string) (*YDBDriver, error) {
return nil, fmt.Errorf("unable to instantiate YDBDriver: %w", err)
}

return &YDBDriver{db}, nil
return &YDBDriver{
db: db,
options: options{
tablePathPrefix: parsedDSN.TablePathPrefix,
},
}, nil
}

// Version returns the current version of the schema in the backing datastore.
Expand All @@ -67,7 +80,12 @@ func (d *YDBDriver) Version(ctx context.Context) (string, error) {
)

err := d.db.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
_, res, err := s.Execute(ctx, table.DefaultTxControl(), queryLoadVersion, nil)
_, res, err := s.Execute(
ctx,
table.DefaultTxControl(),
common.AddTablePrefix(queryLoadVersion, d.tablePathPrefix),
nil,
)
if err != nil {
return err
}
Expand All @@ -93,8 +111,10 @@ func (d *YDBDriver) Version(ctx context.Context) (string, error) {
return loaded, nil
}

func (d *YDBDriver) WriteVersion(ctx context.Context, tx table.TransactionActor, version, replaced string) error {
res, err := tx.Execute(ctx, queryWriteVersion,
func (d *YDBDriver) WriteVersion(ctx context.Context, tx TxActorWithOptions, version, _ string) error {
res, err := tx.tx.Execute(
ctx,
common.AddTablePrefix(queryWriteVersion, tx.opts.tablePathPrefix),
table.NewQueryParameters(
table.ValueParam("$newVersion", types.TextValue(version)),
table.ValueParam("$createAtUnixNano", types.Int64Value(truetime.UnixNano())),
Expand All @@ -109,12 +129,20 @@ func (d *YDBDriver) WriteVersion(ctx context.Context, tx table.TransactionActor,
}

// Conn returns the underlying table client instance for this driver.
func (d *YDBDriver) Conn() table.Client {
return d.db.Table()
func (d *YDBDriver) Conn() TableClientWithOptions {
return TableClientWithOptions{
client: d.db.Table(),
opts: d.options,
}
}

func (d *YDBDriver) RunTx(ctx context.Context, f migrate.TxMigrationFunc[table.TransactionActor]) error {
return d.db.Table().DoTx(ctx, table.TxOperation(f))
func (d *YDBDriver) RunTx(ctx context.Context, f migrate.TxMigrationFunc[TxActorWithOptions]) error {
return d.db.Table().DoTx(ctx, func(ctx context.Context, tx table.TransactionActor) error {
return f(ctx, TxActorWithOptions{
tx: tx,
opts: d.options,
})
})
}

// Close disposes the driver.
Expand Down
12 changes: 11 additions & 1 deletion internal/datastore/ydb/migrations/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,14 @@ import (
)

// YDBMigrations implements a migration manager for the YDBDriver.
var YDBMigrations = migrate.NewManager[*YDBDriver, table.Client, table.TransactionActor]()
var YDBMigrations = migrate.NewManager[*YDBDriver, TableClientWithOptions, TxActorWithOptions]()

type TableClientWithOptions struct {
client table.Client
opts options
}

type TxActorWithOptions struct {
tx table.TransactionActor
opts options
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"

"github.com/ydb-platform/ydb-go-sdk/v3/table"

"github.com/authzed/spicedb/internal/datastore/ydb/common"
)

// YDB doesn't support unique secondary indexes, so one should manually check row uniqueness before insert.
Expand Down Expand Up @@ -78,14 +80,14 @@ CREATE TABLE relation_tuple (
)

func init() {
err := YDBMigrations.Register("initial", "", func(ctx context.Context, client table.Client) error {
return client.Do(ctx, func(ctx context.Context, s table.Session) error {
err := YDBMigrations.Register("initial", "", func(ctx context.Context, client TableClientWithOptions) error {
return client.client.Do(ctx, func(ctx context.Context, s table.Session) error {
statements := []string{
createSchemaVersion,
createUniqueIDTable,
createNamespaceConfig,
createCaveat,
createRelationTuple,
common.AddTablePrefix(createSchemaVersion, client.opts.tablePathPrefix),
common.AddTablePrefix(createUniqueIDTable, client.opts.tablePathPrefix),
common.AddTablePrefix(createNamespaceConfig, client.opts.tablePathPrefix),
common.AddTablePrefix(createCaveat, client.opts.tablePathPrefix),
common.AddTablePrefix(createRelationTuple, client.opts.tablePathPrefix),
}
for _, stmt := range statements {
if err := s.ExecuteSchemeQuery(ctx, stmt); err != nil {
Expand All @@ -95,8 +97,12 @@ func init() {

return nil
})
}, func(ctx context.Context, tx table.TransactionActor) error {
_, err := tx.Execute(ctx, insertUniqueID, &table.QueryParameters{})
}, func(ctx context.Context, tx TxActorWithOptions) error {
_, err := tx.tx.Execute(
ctx,
common.AddTablePrefix(insertUniqueID, tx.opts.tablePathPrefix),
&table.QueryParameters{},
)
return err
})
if err != nil {
Expand Down
45 changes: 27 additions & 18 deletions internal/testserver/datastore/ydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

ydbDatastore "github.com/authzed/spicedb/internal/datastore/ydb"
ydbMigrations "github.com/authzed/spicedb/internal/datastore/ydb/migrations"
"github.com/authzed/spicedb/pkg/secrets"

"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/migrate"
Expand All @@ -33,31 +34,20 @@ type ydbTester struct {

hostname string
port string
dsn string
}

// RunYDBForTesting returns a RunningEngineForTest for YDB.
func RunYDBForTesting(t testing.TB, bridgeNetworkName string) RunningEngineForTest {
pool, err := dockertest.NewPool("")
require.NoError(t, err)

return ydbTester{
pool: pool,
bridgeNetworkName: bridgeNetworkName,
}
}

func (r ydbTester) NewDatabase(t testing.TB) string {
// there's no easy way to create new database in a local YDB, so
// create a new container with default /local database instead.

containerName := fmt.Sprintf("ydb-%s", uuid.New().String())
hostname := "localhost"
if r.bridgeNetworkName != "" {
if bridgeNetworkName != "" {
hostname = containerName
}

resource, err := r.pool.RunWithOptions(&dockertest.RunOptions{
resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Name: containerName,
Hostname: hostname,
Repository: "ghcr.io/ydb-platform/local-ydb",
Expand All @@ -66,12 +56,14 @@ func (r ydbTester) NewDatabase(t testing.TB) string {
"YDB_USE_IN_MEMORY_PDISKS=true",
"YDB_FEATURE_FLAGS=enable_not_null_data_columns",
},
NetworkID: r.bridgeNetworkName,
NetworkID: bridgeNetworkName,
})
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, r.pool.Purge(resource)) })
t.Cleanup(func() { require.NoError(t, pool.Purge(resource)) })

require.NoError(t, r.pool.Retry(func() error { // await container is ready
// await container is ready.
// since YDB has internal cluster discovery we can't check availability from outside network.
require.NoError(t, pool.Retry(func() error {
var buf bytes.Buffer

code, err := resource.Exec([]string{
Expand All @@ -96,11 +88,28 @@ func (r ydbTester) NewDatabase(t testing.TB) string {
}))

port := resource.GetPort(fmt.Sprintf("%d/tcp", ydbGRPCPort))
if r.bridgeNetworkName != "" {
if bridgeNetworkName != "" {
port = strconv.FormatInt(ydbGRPCPort, 10)
}

dsn := fmt.Sprintf("grpc://%s:%s/%s", hostname, port, ydbDefaultDatabase)
return ydbTester{
pool: pool,
bridgeNetworkName: bridgeNetworkName,
hostname: hostname,
port: port,
}
}

func (r ydbTester) NewDatabase(t testing.TB) string {
// there's no easy way to create new database in a local YDB,
// so create a new directory instead.

uniquePortion, err := secrets.TokenHex(4)
require.NoError(t, err)

directory := fmt.Sprintf("/%s/%s", ydbDefaultDatabase, uniquePortion)
dsn := fmt.Sprintf("grpc://%s:%s/%s?table_path_prefix=%s", r.hostname, r.port, ydbDefaultDatabase, directory)

return dsn
}

Expand Down
Loading