Skip to content

Commit

Permalink
Add ability to disable uniqueness check with option
Browse files Browse the repository at this point in the history
Signed-off-by: sashayakovtseva <[email protected]>
  • Loading branch information
sashayakovtseva committed Mar 19, 2024
1 parent 6727654 commit e1c4fcd
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 21 deletions.
23 changes: 16 additions & 7 deletions internal/datastore/ydb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ type ydbConfig struct {

bulkLoadBatchSize int

gcEnabled bool
enableGC bool
enablePrometheusStats bool
enableUniquenessCheck bool
}

var defaultConfig = ydbConfig{
Expand All @@ -34,7 +35,8 @@ var defaultConfig = ydbConfig{
gcInterval: 3 * time.Minute,
gcMaxOperationTime: time.Minute,
bulkLoadBatchSize: 1000,
gcEnabled: true,
enableGC: true,
enableUniquenessCheck: true,
}

// Option provides the facility to configure how clients within the YDB
Expand Down Expand Up @@ -80,11 +82,11 @@ func GCInterval(interval time.Duration) Option {
return func(o *ydbConfig) { o.gcInterval = interval }
}

// GCEnabled indicates whether garbage collection is enabled.
// WithEnableGC indicates whether garbage collection is enabled.
//
// GC is enabled by default.
func GCEnabled(isGCEnabled bool) Option {
return func(o *ydbConfig) { o.gcEnabled = isGCEnabled }
func WithEnableGC(isGCEnabled bool) Option {
return func(o *ydbConfig) { o.enableGC = isGCEnabled }
}

// GCMaxOperationTime is the maximum operation time of a garbage collection pass before it times out.
Expand Down Expand Up @@ -143,6 +145,13 @@ func WatchBufferWriteTimeout(watchBufferWriteTimeout time.Duration) Option {
// clients being used by the datastore are enabled.
//
// Prometheus metrics are disabled by default.
func WithEnablePrometheusStats(enablePrometheusStats bool) Option {
return func(o *ydbConfig) { o.enablePrometheusStats = enablePrometheusStats }
func WithEnablePrometheusStats(v bool) Option {
return func(o *ydbConfig) { o.enablePrometheusStats = v }
}

// WithEnableUniquenessCheck marks whether relation tuples will be checked against
// unique index during CREATE operation. YDB doesn't support unique secondary indexes,
// and since this check is quite expensive one may turn it off.
func WithEnableUniquenessCheck(v bool) Option {
return func(o *ydbConfig) { o.enableUniquenessCheck = v }
}
21 changes: 12 additions & 9 deletions internal/datastore/ydb/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ var _ datastore.ReadWriteTransaction = (*ydbReadWriter)(nil)

type ydbReadWriter struct {
*ydbReader
bulkLoadBatchSize int
newRevision revisions.TimestampRevision
bulkLoadBatchSize int
newRevision revisions.TimestampRevision
enableUniquenessCheck bool
}

// WriteCaveats stores the provided caveats.
Expand Down Expand Up @@ -91,7 +92,7 @@ func (rw *ydbReadWriter) WriteRelationships(ctx context.Context, mutations []*co
}

// Perform SELECT queries first as a part of uniqueness check.
if len(insertionTuples) > 0 {
if len(insertionTuples) > 0 && rw.enableUniquenessCheck {
dups, err := rw.selectTuples(ctx, insertionTuples)
if err != nil {
return fmt.Errorf("failed to ensure CREATE tuples uniqueness: %w", err)
Expand Down Expand Up @@ -284,12 +285,14 @@ func (rw *ydbReadWriter) BulkLoad(ctx context.Context, iter datastore.BulkWriteR
}

if len(insertionTuples) > 0 {
dups, err := rw.selectTuples(ctx, insertionTuples)
if err != nil {
return 0, fmt.Errorf("failed to ensure CREATE tuples uniqueness: %w", err)
}
if len(dups) > 0 {
return 0, datastoreCommon.NewCreateRelationshipExistsError(dups[0])
if rw.enableUniquenessCheck {
dups, err := rw.selectTuples(ctx, insertionTuples)
if err != nil {
return 0, fmt.Errorf("failed to ensure CREATE tuples uniqueness: %w", err)
}
if len(dups) > 0 {
return 0, datastoreCommon.NewCreateRelationshipExistsError(dups[0])
}
}

if err := executeQuery(ctx, rw.tablePathPrefix, rw.executor, insertBuilder); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions internal/datastore/ydb/ydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,9 @@ func (y *ydbDatastore) ReadWriteTx(
livingObjectModifier,
false,
),
bulkLoadBatchSize: y.config.bulkLoadBatchSize,
newRevision: newRev,
bulkLoadBatchSize: y.config.bulkLoadBatchSize,
newRevision: newRev,
enableUniquenessCheck: y.config.enableUniquenessCheck,
}

return fn(ctx, rw)
Expand Down
11 changes: 8 additions & 3 deletions pkg/cmd/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ type Config struct {
SpannerMaxSessions uint64 `debugmap:"visible"`

// YDB
YDBCertificatePath string `debugmap:"visible"`
YDBBulkLoadBatchSize int `debugmap:"visible"`
YDBCertificatePath string `debugmap:"visible"`
YDBBulkLoadBatchSize int `debugmap:"visible"`
YDBEnableUniquenessCheck bool `debugmap:"visible"`

// Internal
WatchBufferLength uint16 `debugmap:"visible"`
Expand Down Expand Up @@ -221,6 +222,7 @@ func RegisterDatastoreFlagsWithPrefix(flagSet *pflag.FlagSet, prefix string, opt
flagSet.DurationVar(&opts.WatchBufferWriteTimeout, flagName("datastore-watch-buffer-write-timeout"), defaults.WatchBufferWriteTimeout, "how long the watch buffer should queue before forcefully disconnecting the reader")
flagSet.IntVar(&opts.YDBBulkLoadBatchSize, flagName("datastore-bulk-load-size"), defaults.YDBBulkLoadBatchSize, "number of rows BulkLoad will process in a single batch (ydb driver only)")
flagSet.StringVar(&opts.YDBCertificatePath, flagName("datastore-certificate-path"), defaults.YDBCertificatePath, "filepath to a valid certificate used to connect to a datastore (ydb driver only)")
flagSet.BoolVar(&opts.YDBEnableUniquenessCheck, flagName("datastore-ydb-enable-uniqueness-check"), defaults.YDBEnableUniquenessCheck, "whether to check tuples against unique index during CREATE operation (ydb driver only)")

// disabling stats is only for tests
flagSet.BoolVar(&opts.DisableStats, flagName("datastore-disable-stats"), false, "disable recording relationship counts to the stats table")
Expand Down Expand Up @@ -275,7 +277,9 @@ func DefaultDatastoreConfig() *Config {
SpannerEmulatorHost: "",
SpannerMinSessions: 100,
SpannerMaxSessions: 400,
YDBCertificatePath: "",
YDBBulkLoadBatchSize: 1000,
YDBEnableUniquenessCheck: true,
WatchBufferLength: 1024,
WatchBufferWriteTimeout: 1 * time.Second,
MigrationPhase: "",
Expand Down Expand Up @@ -430,7 +434,7 @@ func newPostgresDatastore(ctx context.Context, opts Config) (datastore.Datastore
func newYDBDatastore(ctx context.Context, config Config) (datastore.Datastore, error) {
opts := []ydb.Option{
ydb.GCWindow(config.GCWindow),
ydb.GCEnabled(!config.ReadOnly),
ydb.WithEnableGC(!config.ReadOnly),
ydb.RevisionQuantization(config.RevisionQuantization),
ydb.MaxRevisionStalenessPercent(config.MaxRevisionStalenessPercent),
ydb.FollowerReadDelay(config.FollowerReadDelay),
Expand All @@ -442,6 +446,7 @@ func newYDBDatastore(ctx context.Context, config Config) (datastore.Datastore, e
ydb.BulkLoadBatchSize(config.YDBBulkLoadBatchSize),
ydb.WithEnablePrometheusStats(config.EnableDatastoreMetrics),
ydb.WithCertificatePath(config.YDBCertificatePath),
ydb.WithEnableUniquenessCheck(config.YDBEnableUniquenessCheck),
}
return ydb.NewYDBDatastore(ctx, config.URI, opts...)
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/cmd/datastore/zz_generated.options.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit e1c4fcd

Please sign in to comment.