Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add certificate path option for YDB #13

Merged
merged 1 commit into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 32 additions & 26 deletions internal/datastore/ydb/migrations/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/authzed/spicedb/pkg/truetime"
)

var _ migrate.Driver[TableClientWithOptions, TxActorWithOptions] = (*YDBDriver)(nil)
var _ migrate.Driver[TableClientWithConfig, TxActorWithConfig] = (*YDBDriver)(nil)

const (
queryLoadVersion = `
Expand All @@ -41,35 +41,41 @@ INSERT INTO schema_version(version_num, created_at_unix_nano) VALUES ($newVersio
`
)

type options struct {
tablePathPrefix string
}

// YDBDriver implements a schema migration facility for use in SpiceDB's YDB datastore.
type YDBDriver struct {
db *ydb.Driver
options
db *ydb.Driver
config *ydbConfig
}

// NewYDBDriver creates a new driver with active connections to the database specified.
func NewYDBDriver(ctx context.Context, dsn string) (*YDBDriver, error) {
func NewYDBDriver(ctx context.Context, dsn string, opts ...Option) (*YDBDriver, error) {
parsedDSN := common.ParseDSN(dsn)

db, err := ydb.Open(
ctx,
parsedDSN.OriginalDSN,
config := generateConfig(opts)
if parsedDSN.TablePathPrefix != "" {
config.tablePathPrefix = parsedDSN.TablePathPrefix
}

ydbOpts := []ydb.Option{
ydbZerolog.WithTraces(&log.Logger, trace.DatabaseSQLEvents),
ydbOtel.WithTraces(),
)
}
if config.certificatePath != "" {
ydbOpts = append(ydbOpts, ydb.WithCertificatesFromFile(config.certificatePath))
}

db, err := ydb.Open(ctx, parsedDSN.OriginalDSN, ydbOpts...)
if err != nil {
return nil, fmt.Errorf("failed to instantiate YDBDriver: %w", err)
}

if _, err := db.Scheme().ListDirectory(ctx, db.Name()); err != nil {
return nil, fmt.Errorf("failed to ping YDB: %w", err)
}

return &YDBDriver{
db: db,
options: options{
tablePathPrefix: parsedDSN.TablePathPrefix,
},
db: db,
config: config,
}, nil
}

Expand All @@ -85,7 +91,7 @@ func (d *YDBDriver) Version(ctx context.Context) (string, error) {
_, res, err := s.Execute(
ctx,
table.DefaultTxControl(),
common.AddTablePrefix(queryLoadVersion, d.tablePathPrefix),
common.AddTablePrefix(queryLoadVersion, d.config.tablePathPrefix),
nil,
)
if err != nil {
Expand Down Expand Up @@ -113,10 +119,10 @@ func (d *YDBDriver) Version(ctx context.Context) (string, error) {
return loaded, nil
}

func (d *YDBDriver) WriteVersion(ctx context.Context, tx TxActorWithOptions, version, _ string) error {
func (d *YDBDriver) WriteVersion(ctx context.Context, tx TxActorWithConfig, version, _ string) error {
res, err := tx.tx.Execute(
ctx,
common.AddTablePrefix(queryWriteVersion, tx.opts.tablePathPrefix),
common.AddTablePrefix(queryWriteVersion, tx.config.tablePathPrefix),
table.NewQueryParameters(
table.ValueParam("$newVersion", types.TextValue(version)),
table.ValueParam("$createAtUnixNano", types.Int64Value(truetime.UnixNano())),
Expand All @@ -131,18 +137,18 @@ func (d *YDBDriver) WriteVersion(ctx context.Context, tx TxActorWithOptions, ver
}

// Conn returns the underlying table client instance for this driver.
func (d *YDBDriver) Conn() TableClientWithOptions {
return TableClientWithOptions{
func (d *YDBDriver) Conn() TableClientWithConfig {
return TableClientWithConfig{
client: d.db.Table(),
opts: d.options,
config: d.config,
}
}

func (d *YDBDriver) RunTx(ctx context.Context, f migrate.TxMigrationFunc[TxActorWithOptions]) error {
func (d *YDBDriver) RunTx(ctx context.Context, f migrate.TxMigrationFunc[TxActorWithConfig]) error {
return d.db.Table().DoTx(ctx, func(ctx context.Context, tx table.TransactionActor) error {
return f(ctx, TxActorWithOptions{
tx: tx,
opts: d.options,
return f(ctx, TxActorWithConfig{
tx: tx,
config: d.config,
})
})
}
Expand Down
12 changes: 6 additions & 6 deletions internal/datastore/ydb/migrations/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import (
)

// YDBMigrations implements a migration manager for the YDBDriver.
var YDBMigrations = migrate.NewManager[*YDBDriver, TableClientWithOptions, TxActorWithOptions]()
var YDBMigrations = migrate.NewManager[*YDBDriver, TableClientWithConfig, TxActorWithConfig]()

type TableClientWithOptions struct {
type TableClientWithConfig struct {
client table.Client
opts options
config *ydbConfig
}

type TxActorWithOptions struct {
tx table.TransactionActor
opts options
type TxActorWithConfig struct {
tx table.TransactionActor
config *ydbConfig
}
36 changes: 36 additions & 0 deletions internal/datastore/ydb/migrations/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package migrations

type ydbConfig struct {
tablePathPrefix string
certificatePath string
}

var defaultConfig = ydbConfig{}

// Option provides the facility to configure how clients within the YDB
// datastore interact with the running YDB database.
type Option func(*ydbConfig)

func generateConfig(options []Option) *ydbConfig {
computed := defaultConfig
for _, option := range options {
option(&computed)
}

return &computed
}

// WithTablePathPrefix sets table prefix that will be implicitly added to all YDB queries.
// See https://ydb.tech/docs/en/yql/reference/syntax/pragma#table-path-prefix for details.
//
// Default is empty.
// Non-empty DSN parameter takes precedence over this option.
func WithTablePathPrefix(prefix string) Option {
return func(o *ydbConfig) { o.tablePathPrefix = prefix }
}

// WithCertificatePath sets a path to the certificate to use when connecting to YDB with grpcs protocol.
// Default is empty.
func WithCertificatePath(path string) Option {
return func(o *ydbConfig) { o.certificatePath = path }
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,17 @@ WITH (
)

func init() {
err := YDBMigrations.Register("initial", "", func(ctx context.Context, client TableClientWithOptions) error {
err := YDBMigrations.Register("initial", "", func(ctx context.Context, client TableClientWithConfig) error {
return client.client.Do(ctx, func(ctx context.Context, s table.Session) error {
statements := []string{
common.AddTablePrefix(createSchemaVersion, client.opts.tablePathPrefix),
common.AddTablePrefix(createUniqueIDTable, client.opts.tablePathPrefix),
common.AddTablePrefix(createNamespaceConfig, client.opts.tablePathPrefix),
common.AddTablePrefix(createCaveat, client.opts.tablePathPrefix),
common.AddTablePrefix(createRelationTuple, client.opts.tablePathPrefix),
common.AddTablePrefix(createNamespaceConfigChangefeed, client.opts.tablePathPrefix),
common.AddTablePrefix(createCaveatChangefeed, client.opts.tablePathPrefix),
common.AddTablePrefix(createRelationTupleChangefeed, client.opts.tablePathPrefix),
common.AddTablePrefix(createSchemaVersion, client.config.tablePathPrefix),
common.AddTablePrefix(createUniqueIDTable, client.config.tablePathPrefix),
common.AddTablePrefix(createNamespaceConfig, client.config.tablePathPrefix),
common.AddTablePrefix(createCaveat, client.config.tablePathPrefix),
common.AddTablePrefix(createRelationTuple, client.config.tablePathPrefix),
common.AddTablePrefix(createNamespaceConfigChangefeed, client.config.tablePathPrefix),
common.AddTablePrefix(createCaveatChangefeed, client.config.tablePathPrefix),
common.AddTablePrefix(createRelationTupleChangefeed, client.config.tablePathPrefix),
}
for _, stmt := range statements {
if err := s.ExecuteSchemeQuery(ctx, stmt); err != nil {
Expand All @@ -154,10 +154,10 @@ func init() {

return nil
})
}, func(ctx context.Context, tx TxActorWithOptions) error {
}, func(ctx context.Context, tx TxActorWithConfig) error {
_, err := tx.tx.Execute(
ctx,
common.AddTablePrefix(insertUniqueID, tx.opts.tablePathPrefix),
common.AddTablePrefix(insertUniqueID, tx.config.tablePathPrefix),
&table.QueryParameters{},
)
return err
Expand Down
10 changes: 7 additions & 3 deletions internal/datastore/ydb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

type ydbConfig struct {
tablePathPrefix string
certificatePath string

watchBufferLength uint16
watchBufferWriteTimeout time.Duration
Expand All @@ -25,18 +26,15 @@ type ydbConfig struct {
}

var defaultConfig = ydbConfig{
tablePathPrefix: "",
watchBufferLength: 128,
watchBufferWriteTimeout: time.Second,
followerReadDelay: 0 * time.Second,
revisionQuantization: 5 * time.Second,
maxRevisionStalenessPercent: 0.1,
gcWindow: 24 * time.Hour,
gcInterval: 3 * time.Minute,
gcMaxOperationTime: time.Minute,
bulkLoadBatchSize: 1000,
gcEnabled: true,
enablePrometheusStats: false,
}

// Option provides the facility to configure how clients within the YDB
Expand All @@ -61,6 +59,12 @@ func WithTablePathPrefix(prefix string) Option {
return func(o *ydbConfig) { o.tablePathPrefix = prefix }
}

// WithCertificatePath sets a path to the certificate to use when connecting to YDB with grpcs protocol.
// Default is empty.
func WithCertificatePath(path string) Option {
return func(o *ydbConfig) { o.certificatePath = path }
}

// GCWindow is the maximum age of a passed revision that will be considered
// valid.
//
Expand Down
3 changes: 3 additions & 0 deletions internal/datastore/ydb/ydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ func newYDBDatastore(ctx context.Context, dsn string, opts ...Option) (*ydbDatas
if config.enablePrometheusStats {
ydbOpts = append(ydbOpts, ydbPrometheus.WithTraces(prometheus.DefaultRegisterer))
}
if config.certificatePath != "" {
ydbOpts = append(ydbOpts, ydb.WithCertificatesFromFile(config.certificatePath))
}

db, err := ydb.Open(ctx, parsedDSN.OriginalDSN, ydbOpts...)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions pkg/cmd/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ type Config struct {
SpannerMaxSessions uint64 `debugmap:"visible"`

// YDB
YDBBulkLoadBatchSize int `debugmap:"visible"`
YDBCertificatePath string `debugmap:"visible"`
YDBBulkLoadBatchSize int `debugmap:"visible"`

// Internal
WatchBufferLength uint16 `debugmap:"visible"`
Expand Down Expand Up @@ -214,11 +215,12 @@ func RegisterDatastoreFlagsWithPrefix(flagSet *pflag.FlagSet, prefix string, opt
flagSet.StringVar(&opts.SpannerEmulatorHost, flagName("datastore-spanner-emulator-host"), "", "URI of spanner emulator instance used for development and testing (e.g. localhost:9010)")
flagSet.Uint64Var(&opts.SpannerMinSessions, flagName("datastore-spanner-min-sessions"), 100, "minimum number of sessions across all Spanner gRPC connections the client can have at a given time")
flagSet.Uint64Var(&opts.SpannerMaxSessions, flagName("datastore-spanner-max-sessions"), 400, "maximum number of sessions across all Spanner gRPC connections the client can have at a given time")
flagSet.StringVar(&opts.TablePrefix, flagName("datastore-mysql-table-prefix"), defaults.TablePrefix, "prefix to add to the name of all SpiceDB database tables (mysql and ydb driver only)")
flagSet.StringVar(&opts.TablePrefix, flagName("datastore-table-prefix"), defaults.TablePrefix, "prefix to add to the name of all SpiceDB database tables (mysql and ydb driver only)")
flagSet.StringVar(&opts.MigrationPhase, flagName("datastore-migration-phase"), "", "datastore-specific flag that should be used to signal to a datastore which phase of a multi-step migration it is in (postgres and spanner driver only)")
flagSet.Uint16Var(&opts.WatchBufferLength, flagName("datastore-watch-buffer-length"), defaults.WatchBufferLength, "how large the watch buffer should be before blocking")
flagSet.DurationVar(&opts.WatchBufferWriteTimeout, flagName("datastore-watch-buffer-write-timeout"), defaults.WatchBufferWriteTimeout, "how long the watch buffer should queue before forcefully disconnecting the reader")
flagSet.IntVar(&opts.YDBBulkLoadBatchSize, flagName("datastore-bulk-load-size"), defaults.YDBBulkLoadBatchSize, "number of rows BulkLoad will process in a single batch (ydb driver only)")
flagSet.StringVar(&opts.YDBCertificatePath, flagName("datastore-certificate-path"), defaults.YDBCertificatePath, "filepath to a valid certificate used to connect to a datastore (ydb driver only)")

// disabling stats is only for tests
flagSet.BoolVar(&opts.DisableStats, flagName("datastore-disable-stats"), false, "disable recording relationship counts to the stats table")
Expand Down Expand Up @@ -439,6 +441,7 @@ func newYDBDatastore(ctx context.Context, config Config) (datastore.Datastore, e
ydb.WatchBufferWriteTimeout(config.WatchBufferWriteTimeout),
ydb.BulkLoadBatchSize(config.YDBBulkLoadBatchSize),
ydb.WithEnablePrometheusStats(config.EnableDatastoreMetrics),
ydb.WithCertificatePath(config.YDBCertificatePath),
}
return ydb.NewYDBDatastore(ctx, config.URI, opts...)
}
Expand Down
18 changes: 10 additions & 8 deletions pkg/cmd/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func RegisterMigrateFlags(cmd *cobra.Command) {
cmd.Flags().String("datastore-conn-uri", "", `connection string used by remote datastores (e.g. "postgres://postgres:password@localhost:5432/spicedb")`)
cmd.Flags().String("datastore-spanner-credentials", "", "path to service account key credentials file with access to the cloud spanner instance (omit to use application default credentials)")
cmd.Flags().String("datastore-spanner-emulator-host", "", "URI of spanner emulator instance used for development and testing (e.g. localhost:9010)")
cmd.Flags().String("datastore-mysql-table-prefix", "", "prefix to add to the name of all mysql database tables")
cmd.Flags().String("datastore-table-prefix", "", "prefix to add to the name of all SpiceDB database tables (mysql and ydb driver only)")
cmd.Flags().String("datastore-certificate-path", "", "filepath to a valid certificate used to connect to a datastore (ydb driver only)")
cmd.Flags().Uint64("migration-backfill-batch-size", 1000, "number of items to migrate per iteration of a datastore backfill")
cmd.Flags().Duration("migration-timeout", 1*time.Hour, "defines a timeout for the execution of the migration, set to 1 hour by default")
}
Expand All @@ -48,6 +49,8 @@ func migrateRun(cmd *cobra.Command, args []string) error {
dbURL := cobrautil.MustGetStringExpanded(cmd, "datastore-conn-uri")
timeout := cobrautil.MustGetDuration(cmd, "migration-timeout")
migrationBatachSize := cobrautil.MustGetUint64(cmd, "migration-backfill-batch-size")
tablePrefix := cobrautil.MustGetString(cmd, "datastore-table-prefix")
certificatePath := cobrautil.MustGetString(cmd, "datastore-certificate-path")

if datastoreEngine == "cockroachdb" {
log.Ctx(cmd.Context()).Info().Msg("migrating cockroachdb datastore")
Expand Down Expand Up @@ -84,12 +87,6 @@ func migrateRun(cmd *cobra.Command, args []string) error {
} else if datastoreEngine == "mysql" {
log.Ctx(cmd.Context()).Info().Msg("migrating mysql datastore")

var err error
tablePrefix, err := cmd.Flags().GetString("datastore-mysql-table-prefix")
if err != nil {
log.Ctx(cmd.Context()).Fatal().Msg(fmt.Sprintf("unable to get table prefix: %s", err))
}

migrationDriver, err := mysqlmigrations.NewMySQLDriverFromDSN(dbURL, tablePrefix)
if err != nil {
return fmt.Errorf("unable to create migration driver for %s: %w", datastoreEngine, err)
Expand All @@ -98,7 +95,12 @@ func migrateRun(cmd *cobra.Command, args []string) error {
} else if datastoreEngine == ydb.Engine {
log.Ctx(cmd.Context()).Info().Msg("migrating ydb datastore")

migrationDriver, err := ydbMigrations.NewYDBDriver(cmd.Context(), dbURL)
migrationDriver, err := ydbMigrations.NewYDBDriver(
cmd.Context(),
dbURL,
ydbMigrations.WithTablePathPrefix(tablePrefix),
ydbMigrations.WithCertificatePath(certificatePath),
)
if err != nil {
return fmt.Errorf("unable to create migration driver for %s: %w", datastoreEngine, err)
}
Expand Down
Loading