From a1e48fc91f2cdb103fa6d6ebd84491d76a30e569 Mon Sep 17 00:00:00 2001 From: Sasha Yakovtseva Date: Thu, 7 Mar 2024 16:04:11 +0200 Subject: [PATCH] Add certificate path option for YDB (#13) --- internal/datastore/ydb/migrations/driver.go | 58 ++++++++++--------- internal/datastore/ydb/migrations/manager.go | 12 ++-- internal/datastore/ydb/migrations/options.go | 36 ++++++++++++ .../zz_migration.0001_initial_schema.go | 22 +++---- internal/datastore/ydb/options.go | 10 +++- internal/datastore/ydb/ydb.go | 3 + pkg/cmd/datastore/datastore.go | 7 ++- pkg/cmd/migrate.go | 18 +++--- 8 files changed, 110 insertions(+), 56 deletions(-) create mode 100644 internal/datastore/ydb/migrations/options.go diff --git a/internal/datastore/ydb/migrations/driver.go b/internal/datastore/ydb/migrations/driver.go index 9c793fb00a..1f49a16a4d 100644 --- a/internal/datastore/ydb/migrations/driver.go +++ b/internal/datastore/ydb/migrations/driver.go @@ -20,7 +20,7 @@ import ( "github.com/authzed/spicedb/pkg/truetime" ) -var _ migrate.Driver[TableClientWithOptions, TxActorWithOptions] = (*YDBDriver)(nil) +var _ migrate.Driver[TableClientWithConfig, TxActorWithConfig] = (*YDBDriver)(nil) const ( queryLoadVersion = ` @@ -41,35 +41,41 @@ INSERT INTO schema_version(version_num, created_at_unix_nano) VALUES ($newVersio ` ) -type options struct { - tablePathPrefix string -} - // YDBDriver implements a schema migration facility for use in SpiceDB's YDB datastore. type YDBDriver struct { - db *ydb.Driver - options + db *ydb.Driver + config *ydbConfig } // NewYDBDriver creates a new driver with active connections to the database specified. -func NewYDBDriver(ctx context.Context, dsn string) (*YDBDriver, error) { +func NewYDBDriver(ctx context.Context, dsn string, opts ...Option) (*YDBDriver, error) { parsedDSN := common.ParseDSN(dsn) - db, err := ydb.Open( - ctx, - parsedDSN.OriginalDSN, + config := generateConfig(opts) + if parsedDSN.TablePathPrefix != "" { + config.tablePathPrefix = parsedDSN.TablePathPrefix + } + + ydbOpts := []ydb.Option{ ydbZerolog.WithTraces(&log.Logger, trace.DatabaseSQLEvents), ydbOtel.WithTraces(), - ) + } + if config.certificatePath != "" { + ydbOpts = append(ydbOpts, ydb.WithCertificatesFromFile(config.certificatePath)) + } + + db, err := ydb.Open(ctx, parsedDSN.OriginalDSN, ydbOpts...) if err != nil { return nil, fmt.Errorf("failed to instantiate YDBDriver: %w", err) } + if _, err := db.Scheme().ListDirectory(ctx, db.Name()); err != nil { + return nil, fmt.Errorf("failed to ping YDB: %w", err) + } + return &YDBDriver{ - db: db, - options: options{ - tablePathPrefix: parsedDSN.TablePathPrefix, - }, + db: db, + config: config, }, nil } @@ -85,7 +91,7 @@ func (d *YDBDriver) Version(ctx context.Context) (string, error) { _, res, err := s.Execute( ctx, table.DefaultTxControl(), - common.AddTablePrefix(queryLoadVersion, d.tablePathPrefix), + common.AddTablePrefix(queryLoadVersion, d.config.tablePathPrefix), nil, ) if err != nil { @@ -113,10 +119,10 @@ func (d *YDBDriver) Version(ctx context.Context) (string, error) { return loaded, nil } -func (d *YDBDriver) WriteVersion(ctx context.Context, tx TxActorWithOptions, version, _ string) error { +func (d *YDBDriver) WriteVersion(ctx context.Context, tx TxActorWithConfig, version, _ string) error { res, err := tx.tx.Execute( ctx, - common.AddTablePrefix(queryWriteVersion, tx.opts.tablePathPrefix), + common.AddTablePrefix(queryWriteVersion, tx.config.tablePathPrefix), table.NewQueryParameters( table.ValueParam("$newVersion", types.TextValue(version)), table.ValueParam("$createAtUnixNano", types.Int64Value(truetime.UnixNano())), @@ -131,18 +137,18 @@ func (d *YDBDriver) WriteVersion(ctx context.Context, tx TxActorWithOptions, ver } // Conn returns the underlying table client instance for this driver. -func (d *YDBDriver) Conn() TableClientWithOptions { - return TableClientWithOptions{ +func (d *YDBDriver) Conn() TableClientWithConfig { + return TableClientWithConfig{ client: d.db.Table(), - opts: d.options, + config: d.config, } } -func (d *YDBDriver) RunTx(ctx context.Context, f migrate.TxMigrationFunc[TxActorWithOptions]) error { +func (d *YDBDriver) RunTx(ctx context.Context, f migrate.TxMigrationFunc[TxActorWithConfig]) error { return d.db.Table().DoTx(ctx, func(ctx context.Context, tx table.TransactionActor) error { - return f(ctx, TxActorWithOptions{ - tx: tx, - opts: d.options, + return f(ctx, TxActorWithConfig{ + tx: tx, + config: d.config, }) }) } diff --git a/internal/datastore/ydb/migrations/manager.go b/internal/datastore/ydb/migrations/manager.go index 8c0eabb13c..a5238e3852 100644 --- a/internal/datastore/ydb/migrations/manager.go +++ b/internal/datastore/ydb/migrations/manager.go @@ -7,14 +7,14 @@ import ( ) // YDBMigrations implements a migration manager for the YDBDriver. -var YDBMigrations = migrate.NewManager[*YDBDriver, TableClientWithOptions, TxActorWithOptions]() +var YDBMigrations = migrate.NewManager[*YDBDriver, TableClientWithConfig, TxActorWithConfig]() -type TableClientWithOptions struct { +type TableClientWithConfig struct { client table.Client - opts options + config *ydbConfig } -type TxActorWithOptions struct { - tx table.TransactionActor - opts options +type TxActorWithConfig struct { + tx table.TransactionActor + config *ydbConfig } diff --git a/internal/datastore/ydb/migrations/options.go b/internal/datastore/ydb/migrations/options.go new file mode 100644 index 0000000000..d2b0ade73d --- /dev/null +++ b/internal/datastore/ydb/migrations/options.go @@ -0,0 +1,36 @@ +package migrations + +type ydbConfig struct { + tablePathPrefix string + certificatePath string +} + +var defaultConfig = ydbConfig{} + +// Option provides the facility to configure how clients within the YDB +// datastore interact with the running YDB database. +type Option func(*ydbConfig) + +func generateConfig(options []Option) *ydbConfig { + computed := defaultConfig + for _, option := range options { + option(&computed) + } + + return &computed +} + +// WithTablePathPrefix sets table prefix that will be implicitly added to all YDB queries. +// See https://ydb.tech/docs/en/yql/reference/syntax/pragma#table-path-prefix for details. +// +// Default is empty. +// Non-empty DSN parameter takes precedence over this option. +func WithTablePathPrefix(prefix string) Option { + return func(o *ydbConfig) { o.tablePathPrefix = prefix } +} + +// WithCertificatePath sets a path to the certificate to use when connecting to YDB with grpcs protocol. +// Default is empty. +func WithCertificatePath(path string) Option { + return func(o *ydbConfig) { o.certificatePath = path } +} diff --git a/internal/datastore/ydb/migrations/zz_migration.0001_initial_schema.go b/internal/datastore/ydb/migrations/zz_migration.0001_initial_schema.go index 54490c76d1..c19ae4dbb2 100644 --- a/internal/datastore/ydb/migrations/zz_migration.0001_initial_schema.go +++ b/internal/datastore/ydb/migrations/zz_migration.0001_initial_schema.go @@ -134,17 +134,17 @@ WITH ( ) func init() { - err := YDBMigrations.Register("initial", "", func(ctx context.Context, client TableClientWithOptions) error { + err := YDBMigrations.Register("initial", "", func(ctx context.Context, client TableClientWithConfig) error { return client.client.Do(ctx, func(ctx context.Context, s table.Session) error { statements := []string{ - common.AddTablePrefix(createSchemaVersion, client.opts.tablePathPrefix), - common.AddTablePrefix(createUniqueIDTable, client.opts.tablePathPrefix), - common.AddTablePrefix(createNamespaceConfig, client.opts.tablePathPrefix), - common.AddTablePrefix(createCaveat, client.opts.tablePathPrefix), - common.AddTablePrefix(createRelationTuple, client.opts.tablePathPrefix), - common.AddTablePrefix(createNamespaceConfigChangefeed, client.opts.tablePathPrefix), - common.AddTablePrefix(createCaveatChangefeed, client.opts.tablePathPrefix), - common.AddTablePrefix(createRelationTupleChangefeed, client.opts.tablePathPrefix), + common.AddTablePrefix(createSchemaVersion, client.config.tablePathPrefix), + common.AddTablePrefix(createUniqueIDTable, client.config.tablePathPrefix), + common.AddTablePrefix(createNamespaceConfig, client.config.tablePathPrefix), + common.AddTablePrefix(createCaveat, client.config.tablePathPrefix), + common.AddTablePrefix(createRelationTuple, client.config.tablePathPrefix), + common.AddTablePrefix(createNamespaceConfigChangefeed, client.config.tablePathPrefix), + common.AddTablePrefix(createCaveatChangefeed, client.config.tablePathPrefix), + common.AddTablePrefix(createRelationTupleChangefeed, client.config.tablePathPrefix), } for _, stmt := range statements { if err := s.ExecuteSchemeQuery(ctx, stmt); err != nil { @@ -154,10 +154,10 @@ func init() { return nil }) - }, func(ctx context.Context, tx TxActorWithOptions) error { + }, func(ctx context.Context, tx TxActorWithConfig) error { _, err := tx.tx.Execute( ctx, - common.AddTablePrefix(insertUniqueID, tx.opts.tablePathPrefix), + common.AddTablePrefix(insertUniqueID, tx.config.tablePathPrefix), &table.QueryParameters{}, ) return err diff --git a/internal/datastore/ydb/options.go b/internal/datastore/ydb/options.go index 4bcbbcda41..3e6e6e1d54 100644 --- a/internal/datastore/ydb/options.go +++ b/internal/datastore/ydb/options.go @@ -6,6 +6,7 @@ import ( type ydbConfig struct { tablePathPrefix string + certificatePath string watchBufferLength uint16 watchBufferWriteTimeout time.Duration @@ -25,10 +26,8 @@ type ydbConfig struct { } var defaultConfig = ydbConfig{ - tablePathPrefix: "", watchBufferLength: 128, watchBufferWriteTimeout: time.Second, - followerReadDelay: 0 * time.Second, revisionQuantization: 5 * time.Second, maxRevisionStalenessPercent: 0.1, gcWindow: 24 * time.Hour, @@ -36,7 +35,6 @@ var defaultConfig = ydbConfig{ gcMaxOperationTime: time.Minute, bulkLoadBatchSize: 1000, gcEnabled: true, - enablePrometheusStats: false, } // Option provides the facility to configure how clients within the YDB @@ -61,6 +59,12 @@ func WithTablePathPrefix(prefix string) Option { return func(o *ydbConfig) { o.tablePathPrefix = prefix } } +// WithCertificatePath sets a path to the certificate to use when connecting to YDB with grpcs protocol. +// Default is empty. +func WithCertificatePath(path string) Option { + return func(o *ydbConfig) { o.certificatePath = path } +} + // GCWindow is the maximum age of a passed revision that will be considered // valid. // diff --git a/internal/datastore/ydb/ydb.go b/internal/datastore/ydb/ydb.go index 1f8651248a..84de141f38 100644 --- a/internal/datastore/ydb/ydb.go +++ b/internal/datastore/ydb/ydb.go @@ -81,6 +81,9 @@ func newYDBDatastore(ctx context.Context, dsn string, opts ...Option) (*ydbDatas if config.enablePrometheusStats { ydbOpts = append(ydbOpts, ydbPrometheus.WithTraces(prometheus.DefaultRegisterer)) } + if config.certificatePath != "" { + ydbOpts = append(ydbOpts, ydb.WithCertificatesFromFile(config.certificatePath)) + } db, err := ydb.Open(ctx, parsedDSN.OriginalDSN, ydbOpts...) if err != nil { diff --git a/pkg/cmd/datastore/datastore.go b/pkg/cmd/datastore/datastore.go index d6bd3fdab0..192e765137 100644 --- a/pkg/cmd/datastore/datastore.go +++ b/pkg/cmd/datastore/datastore.go @@ -141,7 +141,8 @@ type Config struct { SpannerMaxSessions uint64 `debugmap:"visible"` // YDB - YDBBulkLoadBatchSize int `debugmap:"visible"` + YDBCertificatePath string `debugmap:"visible"` + YDBBulkLoadBatchSize int `debugmap:"visible"` // Internal WatchBufferLength uint16 `debugmap:"visible"` @@ -214,11 +215,12 @@ func RegisterDatastoreFlagsWithPrefix(flagSet *pflag.FlagSet, prefix string, opt flagSet.StringVar(&opts.SpannerEmulatorHost, flagName("datastore-spanner-emulator-host"), "", "URI of spanner emulator instance used for development and testing (e.g. localhost:9010)") flagSet.Uint64Var(&opts.SpannerMinSessions, flagName("datastore-spanner-min-sessions"), 100, "minimum number of sessions across all Spanner gRPC connections the client can have at a given time") flagSet.Uint64Var(&opts.SpannerMaxSessions, flagName("datastore-spanner-max-sessions"), 400, "maximum number of sessions across all Spanner gRPC connections the client can have at a given time") - flagSet.StringVar(&opts.TablePrefix, flagName("datastore-mysql-table-prefix"), defaults.TablePrefix, "prefix to add to the name of all SpiceDB database tables (mysql and ydb driver only)") + flagSet.StringVar(&opts.TablePrefix, flagName("datastore-table-prefix"), defaults.TablePrefix, "prefix to add to the name of all SpiceDB database tables (mysql and ydb driver only)") flagSet.StringVar(&opts.MigrationPhase, flagName("datastore-migration-phase"), "", "datastore-specific flag that should be used to signal to a datastore which phase of a multi-step migration it is in (postgres and spanner driver only)") flagSet.Uint16Var(&opts.WatchBufferLength, flagName("datastore-watch-buffer-length"), defaults.WatchBufferLength, "how large the watch buffer should be before blocking") flagSet.DurationVar(&opts.WatchBufferWriteTimeout, flagName("datastore-watch-buffer-write-timeout"), defaults.WatchBufferWriteTimeout, "how long the watch buffer should queue before forcefully disconnecting the reader") flagSet.IntVar(&opts.YDBBulkLoadBatchSize, flagName("datastore-bulk-load-size"), defaults.YDBBulkLoadBatchSize, "number of rows BulkLoad will process in a single batch (ydb driver only)") + flagSet.StringVar(&opts.YDBCertificatePath, flagName("datastore-certificate-path"), defaults.YDBCertificatePath, "filepath to a valid certificate used to connect to a datastore (ydb driver only)") // disabling stats is only for tests flagSet.BoolVar(&opts.DisableStats, flagName("datastore-disable-stats"), false, "disable recording relationship counts to the stats table") @@ -439,6 +441,7 @@ func newYDBDatastore(ctx context.Context, config Config) (datastore.Datastore, e ydb.WatchBufferWriteTimeout(config.WatchBufferWriteTimeout), ydb.BulkLoadBatchSize(config.YDBBulkLoadBatchSize), ydb.WithEnablePrometheusStats(config.EnableDatastoreMetrics), + ydb.WithCertificatePath(config.YDBCertificatePath), } return ydb.NewYDBDatastore(ctx, config.URI, opts...) } diff --git a/pkg/cmd/migrate.go b/pkg/cmd/migrate.go index ecf37ad89b..b5d09c131b 100644 --- a/pkg/cmd/migrate.go +++ b/pkg/cmd/migrate.go @@ -27,7 +27,8 @@ func RegisterMigrateFlags(cmd *cobra.Command) { cmd.Flags().String("datastore-conn-uri", "", `connection string used by remote datastores (e.g. "postgres://postgres:password@localhost:5432/spicedb")`) cmd.Flags().String("datastore-spanner-credentials", "", "path to service account key credentials file with access to the cloud spanner instance (omit to use application default credentials)") cmd.Flags().String("datastore-spanner-emulator-host", "", "URI of spanner emulator instance used for development and testing (e.g. localhost:9010)") - cmd.Flags().String("datastore-mysql-table-prefix", "", "prefix to add to the name of all mysql database tables") + cmd.Flags().String("datastore-table-prefix", "", "prefix to add to the name of all SpiceDB database tables (mysql and ydb driver only)") + cmd.Flags().String("datastore-certificate-path", "", "filepath to a valid certificate used to connect to a datastore (ydb driver only)") cmd.Flags().Uint64("migration-backfill-batch-size", 1000, "number of items to migrate per iteration of a datastore backfill") cmd.Flags().Duration("migration-timeout", 1*time.Hour, "defines a timeout for the execution of the migration, set to 1 hour by default") } @@ -48,6 +49,8 @@ func migrateRun(cmd *cobra.Command, args []string) error { dbURL := cobrautil.MustGetStringExpanded(cmd, "datastore-conn-uri") timeout := cobrautil.MustGetDuration(cmd, "migration-timeout") migrationBatachSize := cobrautil.MustGetUint64(cmd, "migration-backfill-batch-size") + tablePrefix := cobrautil.MustGetString(cmd, "datastore-table-prefix") + certificatePath := cobrautil.MustGetString(cmd, "datastore-certificate-path") if datastoreEngine == "cockroachdb" { log.Ctx(cmd.Context()).Info().Msg("migrating cockroachdb datastore") @@ -84,12 +87,6 @@ func migrateRun(cmd *cobra.Command, args []string) error { } else if datastoreEngine == "mysql" { log.Ctx(cmd.Context()).Info().Msg("migrating mysql datastore") - var err error - tablePrefix, err := cmd.Flags().GetString("datastore-mysql-table-prefix") - if err != nil { - log.Ctx(cmd.Context()).Fatal().Msg(fmt.Sprintf("unable to get table prefix: %s", err)) - } - migrationDriver, err := mysqlmigrations.NewMySQLDriverFromDSN(dbURL, tablePrefix) if err != nil { return fmt.Errorf("unable to create migration driver for %s: %w", datastoreEngine, err) @@ -98,7 +95,12 @@ func migrateRun(cmd *cobra.Command, args []string) error { } else if datastoreEngine == ydb.Engine { log.Ctx(cmd.Context()).Info().Msg("migrating ydb datastore") - migrationDriver, err := ydbMigrations.NewYDBDriver(cmd.Context(), dbURL) + migrationDriver, err := ydbMigrations.NewYDBDriver( + cmd.Context(), + dbURL, + ydbMigrations.WithTablePathPrefix(tablePrefix), + ydbMigrations.WithCertificatePath(certificatePath), + ) if err != nil { return fmt.Errorf("unable to create migration driver for %s: %w", datastoreEngine, err) }