Skip to content

Commit

Permalink
Add first migration
Browse files Browse the repository at this point in the history
Signed-off-by: sashayakovtseva <[email protected]>
  • Loading branch information
sashayakovtseva committed Jan 22, 2024
1 parent 6372eb0 commit 60af68c
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 58 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ jobs:
strategy:
fail-fast: false
matrix:
datastore: ["crdb", "mysql", "postgres", "spanner", "pgbouncer"]
datastore: ["crdb", "mysql", "postgres", "spanner", "pgbouncer", "ydb"]
steps:
- uses: "actions/checkout@v3"
- uses: "authzed/actions/setup-go@main"
Expand Down
69 changes: 52 additions & 17 deletions internal/datastore/ydb/migrations/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package migrations

import (
"context"
"errors"
"fmt"
"io"

"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
ydbOtel "github.com/ydb-platform/ydb-go-sdk-otel"
ydbZerolog "github.com/ydb-platform/ydb-go-sdk-zerolog"
"github.com/ydb-platform/ydb-go-sdk/v3"
Expand All @@ -13,15 +16,28 @@ import (

log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/migrate"
"github.com/authzed/spicedb/pkg/truetime"
)

var _ migrate.Driver[table.Client, table.TransactionActor] = &YDBDriver{}
var _ migrate.Driver[table.Client, table.TransactionActor] = (*YDBDriver)(nil)

const (
errUnableToInstantiate = "unable to instantiate YDBDriver: %w"

queryLoadVersion = "SELECT version_num from schema_version"
queryWriteVersion = "UPDATE schema_version SET version_num=$newVersion WHERE version_num=$oldVersion"
queryLoadVersion = `
SELECT
version_num,
created_at_unix_nano
FROM
schema_version
ORDER BY
created_at_unix_nano DESC
LIMIT 1
`

queryWriteVersion = `
DECLARE $newVersion as Utf8;
DECLARE $createAtUnixNano as Int64;
INSERT INTO schema_version(version_num, created_at_unix_nano) VALUES ($newVersion, $createAtUnixNano);
`
)

// YDBDriver implements a schema migration facility for use in SpiceDB's YDB datastore.
Expand All @@ -30,23 +46,27 @@ type YDBDriver struct {
}

// NewYDBDriver creates a new driver with active connections to the database specified.
func NewYDBDriver(ctx context.Context, url string) (*YDBDriver, error) {
db, err := ydb.Open(ctx, url,
ydbZerolog.WithTraces(&log.Logger, trace.DetailsAll),
func NewYDBDriver(ctx context.Context, dsn string) (*YDBDriver, error) {
db, err := ydb.Open(ctx, dsn,
ydbZerolog.WithTraces(&log.Logger, trace.DatabaseSQLEvents),
ydbOtel.WithTraces(),
)
if err != nil {
return nil, fmt.Errorf(errUnableToInstantiate, err)
return nil, fmt.Errorf("unable to instantiate YDBDriver: %w", err)
}

return &YDBDriver{db}, nil
}

// Version returns the version of the schema to which the connected database has been migrated.
// Version returns the current version of the schema in the backing datastore.
// If the datastore is brand new, version returns the empty string without an error.
func (d *YDBDriver) Version(ctx context.Context) (string, error) {
var loaded string
var (
loaded string
createdAtUnixNano *int64 // don't really need this
)

if err := d.db.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
err := d.db.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
_, res, err := s.Execute(ctx, table.DefaultTxControl(), queryLoadVersion, nil)
if err != nil {
return err
Expand All @@ -56,14 +76,17 @@ func (d *YDBDriver) Version(ctx context.Context) (string, error) {
if err := res.NextResultSetErr(ctx); err != nil {
return err
}

for res.NextRow() {
if err := res.Scan(&loaded); err != nil {
if err := res.Scan(&loaded, &createdAtUnixNano); err != nil {
return err
}
}
return res.Err()
}); err != nil {
})
if err != nil {
if isMissingTableError(err) || errors.Is(err, io.EOF) {
return "", nil
}
return "", fmt.Errorf("unable to load alembic revision: %w", err)
}

Expand All @@ -74,11 +97,11 @@ func (d *YDBDriver) WriteVersion(ctx context.Context, tx table.TransactionActor,
res, err := tx.Execute(ctx, queryWriteVersion,
table.NewQueryParameters(
table.ValueParam("$newVersion", types.TextValue(version)),
table.ValueParam("$oldVersion", types.TextValue(replaced)),
table.ValueParam("$createAtUnixNano", types.Int64Value(truetime.UnixNano())),
),
)
if err != nil {
return fmt.Errorf("unable to update version row: %w", err)
return fmt.Errorf("unable to insert new version row: %w", err)
}
defer res.Close()

Expand All @@ -98,3 +121,15 @@ func (d *YDBDriver) RunTx(ctx context.Context, f migrate.TxMigrationFunc[table.T
func (d *YDBDriver) Close(ctx context.Context) error {
return d.db.Close(ctx)
}

func isMissingTableError(err error) bool {
const ydbMissingTableIssueCode = 2003

var b bool
ydb.IterateByIssues(err, func(_ string, code Ydb.StatusIds_StatusCode, _ uint32) {
if code == ydbMissingTableIssueCode {
b = true
}
})
return b
}
5 changes: 0 additions & 5 deletions internal/datastore/ydb/migrations/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,5 @@ import (
"github.com/authzed/spicedb/pkg/migrate"
)

var (
noNonAtomicMigration migrate.MigrationFunc[table.Client]
noAtomicMigration migrate.TxMigrationFunc[table.TransactionActor]
)

// YDBMigrations implements a migration manager for the YDBDriver.
var YDBMigrations = migrate.NewManager[*YDBDriver, table.Client, table.TransactionActor]()
118 changes: 84 additions & 34 deletions internal/datastore/ydb/migrations/zz_migration.0001_initial_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,53 +6,103 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/table"
)

// YDB doesn't support unique secondary indexes, so one should manually check row uniqueness before insert.
// Suggested way to do so is to use
//
// DISCARD SELECT Ensure(0, false, "duplicate") FROM table VIEW index WHERE id=$Id
//
// For convenience, secondary indexes for uniqueness check start with `uq_` prefix.
// YDB also doesn't support partial secondary indexes.
// Table's PK columns are always implicitly saved in secondary index as well.
const (
// todo add NOT NULL created_at_unix_nano.
createSchemaVersion = `
CREATE TABLE schema_version (
version_num Utf8 NOT NULL,
created_at_unix_nano Int64,
PRIMARY KEY (version_num)
);`

createUniqueIDTable = `
CREATE TABLE metadata (
unique_id String NOT NULL,
PRIMARY KEY (unique_id)
);`

// todo add NOT NULL serialized_config.
// todo check namespace name with Ensure in insert.
// todo AUTO_PARTITIONING_BY_LOAD?
createNamespaceConfig = `
CREATE TABLE namespace_config (
namespace VARCHAR PRIMARY KEY,
serialized_config BYTEA NOT NULL,
timestamp TIMESTAMP WITHOUT TIME ZONE DEFAULT now() NOT NULL
namespace Utf8 NOT NULL,
serialized_config String,
created_at_unix_nano Int64 NOT NULL,
deleted_at_unix_nano Int64,
PRIMARY KEY (namespace, created_at_unix_nano, deleted_at_unix_nano),
INDEX uq_namespace_living GLOBAL SYNC ON (namespace, deleted_at_unix_nano)
);`

createRelationTuple = `CREATE TABLE relation_tuple (
namespace VARCHAR NOT NULL,
object_id VARCHAR NOT NULL,
relation VARCHAR NOT NULL,
userset_namespace VARCHAR NOT NULL,
userset_object_id VARCHAR NOT NULL,
userset_relation VARCHAR NOT NULL,
timestamp TIMESTAMP WITHOUT TIME ZONE DEFAULT now() NOT NULL,
CONSTRAINT pk_relation_tuple PRIMARY KEY (namespace, object_id, relation, userset_namespace, userset_object_id, userset_relation)
// todo add NOT NULL definition.
// todo AUTO_PARTITIONING_BY_LOAD?
createCaveat = `
CREATE TABLE caveat (
name Utf8 NOT NULL,
definition String,
created_at_unix_nano Int64 NOT NULL,
deleted_at_unix_nano Int64,
PRIMARY KEY (name, created_at_unix_nano, deleted_at_unix_nano),
INDEX uq_caveat_living GLOBAL SYNC ON (name, deleted_at_unix_nano)
);`

createSchemaVersion = `CREATE TABLE schema_version (
version_num VARCHAR NOT NULL
// todo discuss JsonDocument instead of Json.
// todo check Ensure on insert.
// todo AUTO_PARTITIONING_BY_LOAD?
createRelationTuple = `
CREATE TABLE relation_tuple (
namespace Utf8 NOT NULL,
object_id Utf8 NOT NULL,
relation Utf8 NOT NULL,
userset_namespace Utf8 NOT NULL,
userset_object_id Utf8 NOT NULL,
userset_relation Utf8 NOT NULL,
caveat_name Utf8,
caveat_context JsonDocument,
created_at_unix_nano Int64 NOT NULL,
deleted_at_unix_nano Int64,
PRIMARY KEY (namespace, object_id, relation, userset_namespace, userset_object_id, userset_relation, created_at_unix_nano, deleted_at_unix_nano),
INDEX uq_relation_tuple_living GLOBAL SYNC ON (namespace, object_id, relation, userset_namespace, userset_object_id, userset_relation, deleted_at_unix_nano),
INDEX ix_relation_tuple_by_subject GLOBAL SYNC ON (userset_object_id, userset_namespace, userset_relation, namespace, relation),
INDEX ix_relation_tuple_by_subject_relation GLOBAL SYNC ON (userset_namespace, userset_relation, namespace, relation),
INDEX ix_relation_tuple_alive_by_resource_rel_subject_covering GLOBAL SYNC ON (namespace, relation, userset_namespace) COVER (caveat_name, caveat_context),
INDEX ix_gc_index GLOBAL SYNC ON (deleted_at_unix_nano)
);`

insertEmptyVersion = `INSERT INTO schema_version (version_num) VALUES ('');`

createReverseQueryIndex = `CREATE INDEX ix_relation_tuple_by_subject ON relation_tuple (userset_object_id, userset_namespace, userset_relation, namespace, relation)`
createReverseCheckIndex = `CREATE INDEX ix_relation_tuple_by_subject_relation ON relation_tuple (userset_namespace, userset_relation, namespace, relation)`
insertUniqueID = `INSERT INTO metadata (unique_id) VALUES (CAST(RandomUuid(1) as String));`
)

func init() {
if err := YDBMigrations.Register("initial", "", noNonAtomicMigration, func(ctx context.Context, tx table.TransactionActor) error {
statements := []string{
createNamespaceConfig,
// createRelationTuple,
// createSchemaVersion,
// insertEmptyVersion,
// createReverseQueryIndex,
// createReverseCheckIndex,
}
for _, stmt := range statements {
_, err := tx.Execute(ctx, stmt, nil)
if err != nil {
return err
err := YDBMigrations.Register("initial", "", func(ctx context.Context, client table.Client) error {
return client.Do(ctx, func(ctx context.Context, s table.Session) error {
statements := []string{
createSchemaVersion,
createUniqueIDTable,
createNamespaceConfig,
createCaveat,
createRelationTuple,
}
for _, stmt := range statements {
if err := s.ExecuteSchemeQuery(ctx, stmt); err != nil {
return err
}
}
}
return nil
}); err != nil {

return nil
})
}, func(ctx context.Context, tx table.TransactionActor) error {
_, err := tx.Execute(ctx, insertUniqueID, &table.QueryParameters{})
return err
})
if err != nil {
panic("failed to register migration: " + err.Error())
}
}
2 changes: 1 addition & 1 deletion internal/testserver/datastore/ydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

const (
ydbTestVersionTag = "23.3"
ydbTestVersionTag = "23.3.17"
ydbDefaultDatabase = "local"
ydbGRPCPort = 2136
)
Expand Down
10 changes: 10 additions & 0 deletions magefiles/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func (Testds) Mysql() error {
return datastoreTest("mysql")
}

// YDB Run datastore tests for YDB.
func (Testds) YDB() error {
return datastoreTest("ydb")
}

func datastoreTest(datastore string, tags ...string) error {
mergedTags := append([]string{"ci", "docker"}, tags...)
tagString := strings.Join(mergedTags, ",")
Expand Down Expand Up @@ -122,6 +127,11 @@ func (Testcons) Mysql() error {
return consistencyTest("mysql")
}

// YDB Runs consistency tests for YDB.
func (Testcons) YDB() error {
return consistencyTest("ydb")
}

func consistencyTest(datastore string) error {
mg.Deps(checkDocker)
return goTest("./internal/services/integrationtesting/...",
Expand Down
7 changes: 7 additions & 0 deletions pkg/truetime/truetime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package truetime

import "time"

func UnixNano() int64 {
return time.Now().UnixNano()
}

0 comments on commit 60af68c

Please sign in to comment.