Skip to content

Commit

Permalink
Add certificate path option for YDB (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
sashayakovtseva authored Mar 7, 2024
1 parent 04604b9 commit a1e48fc
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 56 deletions.
58 changes: 32 additions & 26 deletions internal/datastore/ydb/migrations/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/authzed/spicedb/pkg/truetime"
)

var _ migrate.Driver[TableClientWithOptions, TxActorWithOptions] = (*YDBDriver)(nil)
var _ migrate.Driver[TableClientWithConfig, TxActorWithConfig] = (*YDBDriver)(nil)

const (
queryLoadVersion = `
Expand All @@ -41,35 +41,41 @@ INSERT INTO schema_version(version_num, created_at_unix_nano) VALUES ($newVersio
`
)

type options struct {
tablePathPrefix string
}

// YDBDriver implements a schema migration facility for use in SpiceDB's YDB datastore.
type YDBDriver struct {
db *ydb.Driver
options
db *ydb.Driver
config *ydbConfig
}

// NewYDBDriver creates a new driver with active connections to the database specified.
func NewYDBDriver(ctx context.Context, dsn string) (*YDBDriver, error) {
func NewYDBDriver(ctx context.Context, dsn string, opts ...Option) (*YDBDriver, error) {
parsedDSN := common.ParseDSN(dsn)

db, err := ydb.Open(
ctx,
parsedDSN.OriginalDSN,
config := generateConfig(opts)
if parsedDSN.TablePathPrefix != "" {
config.tablePathPrefix = parsedDSN.TablePathPrefix
}

ydbOpts := []ydb.Option{
ydbZerolog.WithTraces(&log.Logger, trace.DatabaseSQLEvents),
ydbOtel.WithTraces(),
)
}
if config.certificatePath != "" {
ydbOpts = append(ydbOpts, ydb.WithCertificatesFromFile(config.certificatePath))
}

db, err := ydb.Open(ctx, parsedDSN.OriginalDSN, ydbOpts...)
if err != nil {
return nil, fmt.Errorf("failed to instantiate YDBDriver: %w", err)
}

if _, err := db.Scheme().ListDirectory(ctx, db.Name()); err != nil {
return nil, fmt.Errorf("failed to ping YDB: %w", err)
}

return &YDBDriver{
db: db,
options: options{
tablePathPrefix: parsedDSN.TablePathPrefix,
},
db: db,
config: config,
}, nil
}

Expand All @@ -85,7 +91,7 @@ func (d *YDBDriver) Version(ctx context.Context) (string, error) {
_, res, err := s.Execute(
ctx,
table.DefaultTxControl(),
common.AddTablePrefix(queryLoadVersion, d.tablePathPrefix),
common.AddTablePrefix(queryLoadVersion, d.config.tablePathPrefix),
nil,
)
if err != nil {
Expand Down Expand Up @@ -113,10 +119,10 @@ func (d *YDBDriver) Version(ctx context.Context) (string, error) {
return loaded, nil
}

func (d *YDBDriver) WriteVersion(ctx context.Context, tx TxActorWithOptions, version, _ string) error {
func (d *YDBDriver) WriteVersion(ctx context.Context, tx TxActorWithConfig, version, _ string) error {
res, err := tx.tx.Execute(
ctx,
common.AddTablePrefix(queryWriteVersion, tx.opts.tablePathPrefix),
common.AddTablePrefix(queryWriteVersion, tx.config.tablePathPrefix),
table.NewQueryParameters(
table.ValueParam("$newVersion", types.TextValue(version)),
table.ValueParam("$createAtUnixNano", types.Int64Value(truetime.UnixNano())),
Expand All @@ -131,18 +137,18 @@ func (d *YDBDriver) WriteVersion(ctx context.Context, tx TxActorWithOptions, ver
}

// Conn returns the underlying table client instance for this driver.
func (d *YDBDriver) Conn() TableClientWithOptions {
return TableClientWithOptions{
func (d *YDBDriver) Conn() TableClientWithConfig {
return TableClientWithConfig{
client: d.db.Table(),
opts: d.options,
config: d.config,
}
}

func (d *YDBDriver) RunTx(ctx context.Context, f migrate.TxMigrationFunc[TxActorWithOptions]) error {
func (d *YDBDriver) RunTx(ctx context.Context, f migrate.TxMigrationFunc[TxActorWithConfig]) error {
return d.db.Table().DoTx(ctx, func(ctx context.Context, tx table.TransactionActor) error {
return f(ctx, TxActorWithOptions{
tx: tx,
opts: d.options,
return f(ctx, TxActorWithConfig{
tx: tx,
config: d.config,
})
})
}
Expand Down
12 changes: 6 additions & 6 deletions internal/datastore/ydb/migrations/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import (
)

// YDBMigrations implements a migration manager for the YDBDriver.
var YDBMigrations = migrate.NewManager[*YDBDriver, TableClientWithOptions, TxActorWithOptions]()
var YDBMigrations = migrate.NewManager[*YDBDriver, TableClientWithConfig, TxActorWithConfig]()

type TableClientWithOptions struct {
type TableClientWithConfig struct {
client table.Client
opts options
config *ydbConfig
}

type TxActorWithOptions struct {
tx table.TransactionActor
opts options
type TxActorWithConfig struct {
tx table.TransactionActor
config *ydbConfig
}
36 changes: 36 additions & 0 deletions internal/datastore/ydb/migrations/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package migrations

type ydbConfig struct {
tablePathPrefix string
certificatePath string
}

var defaultConfig = ydbConfig{}

// Option provides the facility to configure how clients within the YDB
// datastore interact with the running YDB database.
type Option func(*ydbConfig)

func generateConfig(options []Option) *ydbConfig {
computed := defaultConfig
for _, option := range options {
option(&computed)
}

return &computed
}

// WithTablePathPrefix sets table prefix that will be implicitly added to all YDB queries.
// See https://ydb.tech/docs/en/yql/reference/syntax/pragma#table-path-prefix for details.
//
// Default is empty.
// Non-empty DSN parameter takes precedence over this option.
func WithTablePathPrefix(prefix string) Option {
return func(o *ydbConfig) { o.tablePathPrefix = prefix }
}

// WithCertificatePath sets a path to the certificate to use when connecting to YDB with grpcs protocol.
// Default is empty.
func WithCertificatePath(path string) Option {
return func(o *ydbConfig) { o.certificatePath = path }
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,17 @@ WITH (
)

func init() {
err := YDBMigrations.Register("initial", "", func(ctx context.Context, client TableClientWithOptions) error {
err := YDBMigrations.Register("initial", "", func(ctx context.Context, client TableClientWithConfig) error {
return client.client.Do(ctx, func(ctx context.Context, s table.Session) error {
statements := []string{
common.AddTablePrefix(createSchemaVersion, client.opts.tablePathPrefix),
common.AddTablePrefix(createUniqueIDTable, client.opts.tablePathPrefix),
common.AddTablePrefix(createNamespaceConfig, client.opts.tablePathPrefix),
common.AddTablePrefix(createCaveat, client.opts.tablePathPrefix),
common.AddTablePrefix(createRelationTuple, client.opts.tablePathPrefix),
common.AddTablePrefix(createNamespaceConfigChangefeed, client.opts.tablePathPrefix),
common.AddTablePrefix(createCaveatChangefeed, client.opts.tablePathPrefix),
common.AddTablePrefix(createRelationTupleChangefeed, client.opts.tablePathPrefix),
common.AddTablePrefix(createSchemaVersion, client.config.tablePathPrefix),
common.AddTablePrefix(createUniqueIDTable, client.config.tablePathPrefix),
common.AddTablePrefix(createNamespaceConfig, client.config.tablePathPrefix),
common.AddTablePrefix(createCaveat, client.config.tablePathPrefix),
common.AddTablePrefix(createRelationTuple, client.config.tablePathPrefix),
common.AddTablePrefix(createNamespaceConfigChangefeed, client.config.tablePathPrefix),
common.AddTablePrefix(createCaveatChangefeed, client.config.tablePathPrefix),
common.AddTablePrefix(createRelationTupleChangefeed, client.config.tablePathPrefix),
}
for _, stmt := range statements {
if err := s.ExecuteSchemeQuery(ctx, stmt); err != nil {
Expand All @@ -154,10 +154,10 @@ func init() {

return nil
})
}, func(ctx context.Context, tx TxActorWithOptions) error {
}, func(ctx context.Context, tx TxActorWithConfig) error {
_, err := tx.tx.Execute(
ctx,
common.AddTablePrefix(insertUniqueID, tx.opts.tablePathPrefix),
common.AddTablePrefix(insertUniqueID, tx.config.tablePathPrefix),
&table.QueryParameters{},
)
return err
Expand Down
10 changes: 7 additions & 3 deletions internal/datastore/ydb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

type ydbConfig struct {
tablePathPrefix string
certificatePath string

watchBufferLength uint16
watchBufferWriteTimeout time.Duration
Expand All @@ -25,18 +26,15 @@ type ydbConfig struct {
}

var defaultConfig = ydbConfig{
tablePathPrefix: "",
watchBufferLength: 128,
watchBufferWriteTimeout: time.Second,
followerReadDelay: 0 * time.Second,
revisionQuantization: 5 * time.Second,
maxRevisionStalenessPercent: 0.1,
gcWindow: 24 * time.Hour,
gcInterval: 3 * time.Minute,
gcMaxOperationTime: time.Minute,
bulkLoadBatchSize: 1000,
gcEnabled: true,
enablePrometheusStats: false,
}

// Option provides the facility to configure how clients within the YDB
Expand All @@ -61,6 +59,12 @@ func WithTablePathPrefix(prefix string) Option {
return func(o *ydbConfig) { o.tablePathPrefix = prefix }
}

// WithCertificatePath sets a path to the certificate to use when connecting to YDB with grpcs protocol.
// Default is empty.
func WithCertificatePath(path string) Option {
return func(o *ydbConfig) { o.certificatePath = path }
}

// GCWindow is the maximum age of a passed revision that will be considered
// valid.
//
Expand Down
3 changes: 3 additions & 0 deletions internal/datastore/ydb/ydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ func newYDBDatastore(ctx context.Context, dsn string, opts ...Option) (*ydbDatas
if config.enablePrometheusStats {
ydbOpts = append(ydbOpts, ydbPrometheus.WithTraces(prometheus.DefaultRegisterer))
}
if config.certificatePath != "" {
ydbOpts = append(ydbOpts, ydb.WithCertificatesFromFile(config.certificatePath))
}

db, err := ydb.Open(ctx, parsedDSN.OriginalDSN, ydbOpts...)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions pkg/cmd/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ type Config struct {
SpannerMaxSessions uint64 `debugmap:"visible"`

// YDB
YDBBulkLoadBatchSize int `debugmap:"visible"`
YDBCertificatePath string `debugmap:"visible"`
YDBBulkLoadBatchSize int `debugmap:"visible"`

// Internal
WatchBufferLength uint16 `debugmap:"visible"`
Expand Down Expand Up @@ -214,11 +215,12 @@ func RegisterDatastoreFlagsWithPrefix(flagSet *pflag.FlagSet, prefix string, opt
flagSet.StringVar(&opts.SpannerEmulatorHost, flagName("datastore-spanner-emulator-host"), "", "URI of spanner emulator instance used for development and testing (e.g. localhost:9010)")
flagSet.Uint64Var(&opts.SpannerMinSessions, flagName("datastore-spanner-min-sessions"), 100, "minimum number of sessions across all Spanner gRPC connections the client can have at a given time")
flagSet.Uint64Var(&opts.SpannerMaxSessions, flagName("datastore-spanner-max-sessions"), 400, "maximum number of sessions across all Spanner gRPC connections the client can have at a given time")
flagSet.StringVar(&opts.TablePrefix, flagName("datastore-mysql-table-prefix"), defaults.TablePrefix, "prefix to add to the name of all SpiceDB database tables (mysql and ydb driver only)")
flagSet.StringVar(&opts.TablePrefix, flagName("datastore-table-prefix"), defaults.TablePrefix, "prefix to add to the name of all SpiceDB database tables (mysql and ydb driver only)")
flagSet.StringVar(&opts.MigrationPhase, flagName("datastore-migration-phase"), "", "datastore-specific flag that should be used to signal to a datastore which phase of a multi-step migration it is in (postgres and spanner driver only)")
flagSet.Uint16Var(&opts.WatchBufferLength, flagName("datastore-watch-buffer-length"), defaults.WatchBufferLength, "how large the watch buffer should be before blocking")
flagSet.DurationVar(&opts.WatchBufferWriteTimeout, flagName("datastore-watch-buffer-write-timeout"), defaults.WatchBufferWriteTimeout, "how long the watch buffer should queue before forcefully disconnecting the reader")
flagSet.IntVar(&opts.YDBBulkLoadBatchSize, flagName("datastore-bulk-load-size"), defaults.YDBBulkLoadBatchSize, "number of rows BulkLoad will process in a single batch (ydb driver only)")
flagSet.StringVar(&opts.YDBCertificatePath, flagName("datastore-certificate-path"), defaults.YDBCertificatePath, "filepath to a valid certificate used to connect to a datastore (ydb driver only)")

// disabling stats is only for tests
flagSet.BoolVar(&opts.DisableStats, flagName("datastore-disable-stats"), false, "disable recording relationship counts to the stats table")
Expand Down Expand Up @@ -439,6 +441,7 @@ func newYDBDatastore(ctx context.Context, config Config) (datastore.Datastore, e
ydb.WatchBufferWriteTimeout(config.WatchBufferWriteTimeout),
ydb.BulkLoadBatchSize(config.YDBBulkLoadBatchSize),
ydb.WithEnablePrometheusStats(config.EnableDatastoreMetrics),
ydb.WithCertificatePath(config.YDBCertificatePath),
}
return ydb.NewYDBDatastore(ctx, config.URI, opts...)
}
Expand Down
18 changes: 10 additions & 8 deletions pkg/cmd/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func RegisterMigrateFlags(cmd *cobra.Command) {
cmd.Flags().String("datastore-conn-uri", "", `connection string used by remote datastores (e.g. "postgres://postgres:password@localhost:5432/spicedb")`)
cmd.Flags().String("datastore-spanner-credentials", "", "path to service account key credentials file with access to the cloud spanner instance (omit to use application default credentials)")
cmd.Flags().String("datastore-spanner-emulator-host", "", "URI of spanner emulator instance used for development and testing (e.g. localhost:9010)")
cmd.Flags().String("datastore-mysql-table-prefix", "", "prefix to add to the name of all mysql database tables")
cmd.Flags().String("datastore-table-prefix", "", "prefix to add to the name of all SpiceDB database tables (mysql and ydb driver only)")
cmd.Flags().String("datastore-certificate-path", "", "filepath to a valid certificate used to connect to a datastore (ydb driver only)")
cmd.Flags().Uint64("migration-backfill-batch-size", 1000, "number of items to migrate per iteration of a datastore backfill")
cmd.Flags().Duration("migration-timeout", 1*time.Hour, "defines a timeout for the execution of the migration, set to 1 hour by default")
}
Expand All @@ -48,6 +49,8 @@ func migrateRun(cmd *cobra.Command, args []string) error {
dbURL := cobrautil.MustGetStringExpanded(cmd, "datastore-conn-uri")
timeout := cobrautil.MustGetDuration(cmd, "migration-timeout")
migrationBatachSize := cobrautil.MustGetUint64(cmd, "migration-backfill-batch-size")
tablePrefix := cobrautil.MustGetString(cmd, "datastore-table-prefix")
certificatePath := cobrautil.MustGetString(cmd, "datastore-certificate-path")

if datastoreEngine == "cockroachdb" {
log.Ctx(cmd.Context()).Info().Msg("migrating cockroachdb datastore")
Expand Down Expand Up @@ -84,12 +87,6 @@ func migrateRun(cmd *cobra.Command, args []string) error {
} else if datastoreEngine == "mysql" {
log.Ctx(cmd.Context()).Info().Msg("migrating mysql datastore")

var err error
tablePrefix, err := cmd.Flags().GetString("datastore-mysql-table-prefix")
if err != nil {
log.Ctx(cmd.Context()).Fatal().Msg(fmt.Sprintf("unable to get table prefix: %s", err))
}

migrationDriver, err := mysqlmigrations.NewMySQLDriverFromDSN(dbURL, tablePrefix)
if err != nil {
return fmt.Errorf("unable to create migration driver for %s: %w", datastoreEngine, err)
Expand All @@ -98,7 +95,12 @@ func migrateRun(cmd *cobra.Command, args []string) error {
} else if datastoreEngine == ydb.Engine {
log.Ctx(cmd.Context()).Info().Msg("migrating ydb datastore")

migrationDriver, err := ydbMigrations.NewYDBDriver(cmd.Context(), dbURL)
migrationDriver, err := ydbMigrations.NewYDBDriver(
cmd.Context(),
dbURL,
ydbMigrations.WithTablePathPrefix(tablePrefix),
ydbMigrations.WithCertificatePath(certificatePath),
)
if err != nil {
return fmt.Errorf("unable to create migration driver for %s: %w", datastoreEngine, err)
}
Expand Down

0 comments on commit a1e48fc

Please sign in to comment.