From fcb5200bf2cd3e38401167082a0a40744419cef8 Mon Sep 17 00:00:00 2001 From: sashayakovtseva Date: Fri, 23 Feb 2024 20:02:59 +0800 Subject: [PATCH 01/14] Start Watch Signed-off-by: sashayakovtseva --- .../zz_migration.0001_initial_schema.go | 37 +- internal/datastore/ydb/query.go | 2 + internal/datastore/ydb/watch.go | 363 ++++++++++++++++++ internal/datastore/ydb/ydb.go | 10 +- 4 files changed, 402 insertions(+), 10 deletions(-) create mode 100644 internal/datastore/ydb/watch.go diff --git a/internal/datastore/ydb/migrations/zz_migration.0001_initial_schema.go b/internal/datastore/ydb/migrations/zz_migration.0001_initial_schema.go index d1eddcfa60..29d9b5ea24 100644 --- a/internal/datastore/ydb/migrations/zz_migration.0001_initial_schema.go +++ b/internal/datastore/ydb/migrations/zz_migration.0001_initial_schema.go @@ -66,8 +66,7 @@ CREATE TABLE caveat ( INDEX uq_caveat_living GLOBAL SYNC ON (deleted_at_unix_nano, name) COVER (definition) );` - // todo discuss JsonDocument instead of Json. - // todo check Ensure on insert, check indexed. + // todo use correct indexes. // todo AUTO_PARTITIONING_BY_LOAD? createRelationTuple = ` CREATE TABLE relation_tuple ( @@ -89,6 +88,37 @@ CREATE TABLE relation_tuple ( INDEX ix_gc_index GLOBAL SYNC ON (deleted_at_unix_nano) );` + // todo TOPIC_MIN_ACTIVE_PARTITIONS? + createNamespaceConfigChangefeed = ` +ALTER TABLE namespace_config +ADD CHANGEFEED spicedb_watch +WITH ( + FORMAT = 'JSON', + MODE = 'NEW_IMAGE', + RETENTION_PERIOD = Interval('PT1H'), + VIRTUAL_TIMESTAMPS = TRUE +);` + + createCaveatChangefeed = ` +ALTER TABLE caveat +ADD CHANGEFEED spicedb_watch +WITH ( + FORMAT = 'JSON', + MODE = 'NEW_IMAGE', + RETENTION_PERIOD = Interval('PT1H'), + VIRTUAL_TIMESTAMPS = TRUE +);` + + createRelationTupleChangefeed = ` +ALTER TABLE relation_tuple +ADD CHANGEFEED spicedb_watch +WITH ( + FORMAT = 'JSON', + MODE = 'NEW_IMAGE', + RETENTION_PERIOD = Interval('PT1H'), + VIRTUAL_TIMESTAMPS = TRUE +);` + insertUniqueID = `INSERT INTO metadata (unique_id) VALUES (CAST(RandomUuid(1) as String));` ) @@ -101,6 +131,9 @@ func init() { common.AddTablePrefix(createNamespaceConfig, client.opts.tablePathPrefix), common.AddTablePrefix(createCaveat, client.opts.tablePathPrefix), common.AddTablePrefix(createRelationTuple, client.opts.tablePathPrefix), + common.AddTablePrefix(createNamespaceConfigChangefeed, client.opts.tablePathPrefix), + common.AddTablePrefix(createCaveatChangefeed, client.opts.tablePathPrefix), + common.AddTablePrefix(createRelationTupleChangefeed, client.opts.tablePathPrefix), } for _, stmt := range statements { if err := s.ExecuteSchemeQuery(ctx, stmt); err != nil { diff --git a/internal/datastore/ydb/query.go b/internal/datastore/ydb/query.go index 46c11611b7..5191526152 100644 --- a/internal/datastore/ydb/query.go +++ b/internal/datastore/ydb/query.go @@ -22,6 +22,8 @@ import ( ) const ( + changefeedSpicedbWatch = "spicedb_watch" + // common colCreatedAtUnixNano = "created_at_unix_nano" colDeletedAtUnixNano = "deleted_at_unix_nano" diff --git a/internal/datastore/ydb/watch.go b/internal/datastore/ydb/watch.go new file mode 100644 index 0000000000..e55d50a83e --- /dev/null +++ b/internal/datastore/ydb/watch.go @@ -0,0 +1,363 @@ +package ydb + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strconv" + "strings" + "time" + + "github.com/google/uuid" + "github.com/samber/lo" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar" + + "github.com/authzed/spicedb/internal/datastore/common" + "github.com/authzed/spicedb/internal/datastore/revisions" + "github.com/authzed/spicedb/pkg/datastore" + core "github.com/authzed/spicedb/pkg/proto/core/v1" + "github.com/authzed/spicedb/pkg/spiceerrors" +) + +func (y *ydbDatastore) Watch( + ctx context.Context, + afterRevision datastore.Revision, + options datastore.WatchOptions, +) (<-chan *datastore.RevisionChanges, <-chan error) { + watchBufferLength := options.WatchBufferLength + if watchBufferLength <= 0 { + watchBufferLength = y.config.watchBufferLength + } + + updates := make(chan *datastore.RevisionChanges, watchBufferLength) + errs := make(chan error, 1) + + features, err := y.Features(ctx) + if err != nil { + errs <- err + return updates, errs + } + + if !features.Watch.Enabled { + errs <- datastore.NewWatchDisabledErr(features.Watch.Reason) + return updates, errs + } + + go y.watch(ctx, afterRevision, options, updates, errs) + + return updates, errs +} + +type cdcEvent struct { + Key []string + NewImage json.RawMessage + Ts []int +} + +type namespaceConfigImage struct { + SerializedConfig []byte + DeletedAtUnixNano *int64 +} + +type caveatImage struct { + Definition []byte + DeletedAtUnixNano *int64 +} + +type relationTupleImage struct { + CaveatName *string + CaveatContext *[]byte + DeletedAtUnixNano *int64 +} + +func (y *ydbDatastore) watch( + ctx context.Context, + afterRevision datastore.Revision, + opts datastore.WatchOptions, + updates chan *datastore.RevisionChanges, + errs chan error, +) { + defer close(updates) + defer close(errs) + + afterTimestampRevision, ok := afterRevision.(revisions.TimestampRevision) + if !ok { + errs <- fmt.Errorf("expected timestamp revision, got %T", afterRevision) + return + } + + tableToTopicName := map[string]string{ + tableRelationTuple: y.config.tablePathPrefix + "/" + tableRelationTuple + "/" + changefeedSpicedbWatch, + tableCaveat: y.config.tablePathPrefix + "/" + tableCaveat + "/" + changefeedSpicedbWatch, + tableNamespaceConfig: y.config.tablePathPrefix + "/" + tableNamespaceConfig + "/" + changefeedSpicedbWatch, + } + topicToTableName := lo.Invert(tableToTopicName) + + topics := make([]string, 0, 3) + if opts.Content&datastore.WatchRelationships == datastore.WatchRelationships { + topics = append(topics, tableToTopicName[tableRelationTuple]) + } + if opts.Content&datastore.WatchSchema == datastore.WatchSchema { + topics = append(topics, tableToTopicName[tableNamespaceConfig]) + topics = append(topics, tableToTopicName[tableCaveat]) + } + + if len(topics) == 0 { + errs <- fmt.Errorf("at least relationships or schema must be specified") + return + } + + sendError := func(err error) { + if errors.Is(ctx.Err(), context.Canceled) { + errs <- datastore.NewWatchCanceledErr() + return + } + + errs <- err + } + + watchBufferWriteTimeout := opts.WatchBufferWriteTimeout + if watchBufferWriteTimeout <= 0 { + watchBufferWriteTimeout = y.config.watchBufferWriteTimeout + } + + sendChange := func(change *datastore.RevisionChanges) bool { + select { + case updates <- change: + return true + default: + // if we cannot immediately write, set up the timer and try again. + } + + timer := time.NewTimer(watchBufferWriteTimeout) + defer timer.Stop() + + select { + case updates <- change: + return true + case <-timer.C: + errs <- datastore.NewWatchDisconnectedErr() + return false + } + } + + var selectors topicoptions.ReadSelectors + for _, topic := range topics { + selectors = append(selectors, + topicoptions.ReadSelector{ + Path: topic, + ReadFrom: afterTimestampRevision.Time(), + }, + ) + } + + reader, err := y.driver.Topic().StartReader( + uuid.New().String(), + selectors, + topicoptions.WithReaderCommitMode(topicoptions.CommitModeSync), + ) + if err != nil { + sendError(fmt.Errorf("failed to create reader: %w", err)) + return + } + defer func() { _ = reader.Close(ctx) }() + + if err := reader.WaitInit(ctx); err != nil { + sendError(fmt.Errorf("failed to await reader initialization: %w", err)) + return + } + + var ( + rErr error + msg *topicreader.Message + event cdcEvent + tracked = common.NewChanges(revisions.TimestampIDKeyFunc, opts.Content) + ) + for msg, rErr = reader.ReadMessage(ctx); rErr != nil; msg, rErr = reader.ReadMessage(ctx) { + if err := topicsugar.JSONUnmarshal(msg, &event); err != nil { + sendError(fmt.Errorf("failed to unmarshal cdc event: %w", err)) + return + } + + // todo + // Resolved indicates that the specified revision is "complete"; no additional updates can come in before or at it. + // Therefore, at this point, we issue tracked updates from before that time, and the checkpoint update. + if false { + rev := revisions.NewForTimestamp(1) + changes := tracked.FilterAndRemoveRevisionChanges(revisions.TimestampIDKeyLessThanFunc, rev) + for i := range changes { + if !sendChange(&changes[i]) { + return + } + } + + if opts.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints { + if !sendChange(&datastore.RevisionChanges{ + Revision: rev, + IsCheckpoint: true, + }) { + return + } + } + continue + } + + switch topicToTableName[msg.Topic()] { + case tableRelationTuple: + if len(event.Key) != 7 { + err := spiceerrors.MustBugf( + "unexpected PK size. want 7, got %d (%q)", + len(event.Key), strings.Join(event.Key, ","), + ) + sendError(err) + return + } + + createdAtUnixNano, _ := strconv.Atoi(event.Key[6]) + createRev := revisions.NewForTimestamp(int64(createdAtUnixNano)) + + tuple := &core.RelationTuple{ + ResourceAndRelation: &core.ObjectAndRelation{ + Namespace: event.Key[0], + ObjectId: event.Key[1], + Relation: event.Key[2], + }, + Subject: &core.ObjectAndRelation{ + Namespace: event.Key[3], + ObjectId: event.Key[4], + Relation: event.Key[5], + }, + } + + if event.NewImage != nil { + var changes relationTupleImage + if err := json.Unmarshal(event.NewImage, &changes); err != nil { + sendError(fmt.Errorf("failed to unmarshal relation tuple new image: %w", err)) + return + } + + var structuredCtx map[string]any + if changes.CaveatContext != nil { + if err := json.Unmarshal(*changes.CaveatContext, &structuredCtx); err != nil { + sendError(fmt.Errorf("failed to unmarshal caveat context new image: %w", err)) + return + } + } + + ctxCaveat, err := common.ContextualizedCaveatFrom(lo.FromPtr(changes.CaveatName), structuredCtx) + if err != nil { + sendError(err) + return + } + tuple.Caveat = ctxCaveat + + if changes.DeletedAtUnixNano == nil { + err := tracked.AddRelationshipChange(ctx, createRev, tuple, core.RelationTupleUpdate_TOUCH) + if err != nil { + sendError(err) + return + } + } else { + deleteRev := revisions.NewForTimestamp(*changes.DeletedAtUnixNano) + err := tracked.AddRelationshipChange(ctx, deleteRev, tuple, core.RelationTupleUpdate_DELETE) + if err != nil { + sendError(err) + return + } + } + } + + case tableNamespaceConfig: + if len(event.Key) != 2 { + err := spiceerrors.MustBugf( + "unexpected PK size. want 2, got %d (%q)", + len(event.Key), strings.Join(event.Key, ","), + ) + sendError(err) + return + } + + namespaceName := event.Key[0] + createdAtUnixNano, _ := strconv.Atoi(event.Key[1]) + createRev := revisions.NewForTimestamp(int64(createdAtUnixNano)) + + if event.NewImage != nil { + var changes namespaceConfigImage + if err := json.Unmarshal(event.NewImage, &changes); err != nil { + sendError(fmt.Errorf("failed to unmarshal namespace new image: %w", err)) + return + } + + var loaded core.NamespaceDefinition + if err := loaded.UnmarshalVT(changes.SerializedConfig); err != nil { + sendError(fmt.Errorf("failed to unmarshal namespace config: %w", err)) + return + } + + if changes.DeletedAtUnixNano == nil { + tracked.AddChangedDefinition(ctx, createRev, &loaded) + } else { + deleteRev := revisions.NewForTimestamp(*changes.DeletedAtUnixNano) + tracked.AddDeletedNamespace(ctx, deleteRev, namespaceName) + } + } + + case tableCaveat: + if len(event.Key) != 2 { + err := spiceerrors.MustBugf( + "unexpected PK size. want 2, got %d (%q)", + len(event.Key), strings.Join(event.Key, ","), + ) + sendError(err) + return + } + + caveatName := event.Key[0] + createdAtUnixNano, _ := strconv.Atoi(event.Key[1]) + createRev := revisions.NewForTimestamp(int64(createdAtUnixNano)) + + if event.NewImage != nil { + var changes caveatImage + if err := json.Unmarshal(event.NewImage, &changes); err != nil { + sendError(fmt.Errorf("failed to unmarshal caveat new image: %w", err)) + return + } + + var loaded core.CaveatDefinition + if err := loaded.UnmarshalVT(changes.Definition); err != nil { + sendError(fmt.Errorf("failed to unmarshal caveat config: %w", err)) + return + } + + if changes.DeletedAtUnixNano == nil { + tracked.AddChangedDefinition(ctx, createRev, &loaded) + } else { + deleteRev := revisions.NewForTimestamp(*changes.DeletedAtUnixNano) + tracked.AddDeletedNamespace(ctx, deleteRev, caveatName) + } + } + } + + if err := reader.Commit(ctx, msg); err != nil { + sendError(fmt.Errorf("failed to commit offset: %w", err)) + return + } + } + + if rErr != nil { + if errors.Is(ctx.Err(), context.Canceled) { + closeCtx, closeCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer closeCancel() + if err := reader.Close(closeCtx); err != nil { + errs <- err + return + } + errs <- datastore.NewWatchCanceledErr() + } else { + errs <- rErr + } + } +} diff --git a/internal/datastore/ydb/ydb.go b/internal/datastore/ydb/ydb.go index bf011198bc..a55221d483 100644 --- a/internal/datastore/ydb/ydb.go +++ b/internal/datastore/ydb/ydb.go @@ -31,10 +31,9 @@ func init() { var ( _ datastore.Datastore = &ydbDatastore{} - yq = sq.StatementBuilder.PlaceholderFormat(sq.DollarP) - ParseRevisionString = revisions.RevisionParser(revisions.Timestamp) + yq = sq.StatementBuilder.PlaceholderFormat(sq.DollarP) tracer = otel.Tracer("spicedb/internal/datastore/ydb") ) @@ -58,7 +57,7 @@ type ydbDatastore struct { originalDSN string - // isClosed used in HeadRevision only to pass datastore tests + // isClosed used in HeadRevision only to pass datastore tests. isClosed atomic.Bool } @@ -204,8 +203,3 @@ func (y *ydbDatastore) HeadRevision(_ context.Context) (datastore.Revision, erro now := truetime.UnixNano() return revisions.NewForTimestamp(now), nil } - -func (y *ydbDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) { - // TODO implement me - panic("implement me") -} From fcf0b4adc45519aa28abf155f7626219b27e915d Mon Sep 17 00:00:00 2001 From: sashayakovtseva Date: Mon, 26 Feb 2024 21:40:50 +0800 Subject: [PATCH 02/14] Implement example retryable error Signed-off-by: sashayakovtseva --- internal/datastore/ydb/ydb.go | 4 ++++ internal/datastore/ydb/ydb_test.go | 9 +++++++++ 2 files changed, 13 insertions(+) diff --git a/internal/datastore/ydb/ydb.go b/internal/datastore/ydb/ydb.go index a55221d483..9881fa1dea 100644 --- a/internal/datastore/ydb/ydb.go +++ b/internal/datastore/ydb/ydb.go @@ -79,6 +79,10 @@ func newYDBDatastore(ctx context.Context, dsn string, opts ...Option) (*ydbDatas return nil, fmt.Errorf("failed to open YDB connectionn: %w", err) } + if _, err := db.Scheme().ListDirectory(ctx, config.tablePathPrefix); err != nil { + return nil, fmt.Errorf("failed to ping YDB: %w", err) + } + maxRevisionStaleness := time.Duration(float64(config.revisionQuantization.Nanoseconds())* config.maxRevisionStalenessPercent) * time.Nanosecond diff --git a/internal/datastore/ydb/ydb_test.go b/internal/datastore/ydb/ydb_test.go index 2b323bde3a..5ae41aa8e4 100644 --- a/internal/datastore/ydb/ydb_test.go +++ b/internal/datastore/ydb/ydb_test.go @@ -4,11 +4,13 @@ package ydb import ( "context" + "fmt" "os" "testing" "time" "github.com/stretchr/testify/require" + "github.com/ydb-platform/ydb-go-sdk/v3/retry" log "github.com/authzed/spicedb/internal/logging" testdatastore "github.com/authzed/spicedb/internal/testserver/datastore" @@ -18,6 +20,13 @@ import ( var ydbTestEngine testdatastore.RunningEngineForTest +// Implement the TestableDatastore interface. +func (y *ydbDatastore) ExampleRetryableError() error { + // todo return conditionally retryable error, otherwise + // we won't pass "retryable retries disabled" test. + return retry.RetryableError(fmt.Errorf("some user error")) +} + func TestMain(m *testing.M) { var ( err error From b1e11c3eeaec669d5c3e6fa0d6fa4bc170a0bb60 Mon Sep 17 00:00:00 2001 From: sashayakovtseva Date: Mon, 26 Feb 2024 22:17:07 +0800 Subject: [PATCH 03/14] Update ydb options Signed-off-by: sashayakovtseva --- go.mod | 2 ++ go.sum | 7 +++++++ internal/datastore/ydb/options.go | 27 +++++++++++++++++---------- internal/datastore/ydb/ydb.go | 3 +++ 4 files changed, 29 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index 00725cbcd0..be6ff5f2c3 100644 --- a/go.mod +++ b/go.mod @@ -74,6 +74,7 @@ require ( github.com/stretchr/testify v1.8.4 github.com/ydb-platform/ydb-go-genproto v0.0.0-20240126124512-dbb0e1720dbf github.com/ydb-platform/ydb-go-sdk-otel v0.4.6 + github.com/ydb-platform/ydb-go-sdk-prometheus v0.12.1 github.com/ydb-platform/ydb-go-sdk-zerolog v0.14.0 github.com/ydb-platform/ydb-go-sdk/v3 v3.56.1 go.opencensus.io v0.24.0 @@ -322,6 +323,7 @@ require ( github.com/xeipuuv/gojsonschema v1.2.0 // indirect github.com/xen0n/gosmopolitan v1.2.2 // indirect github.com/yagipy/maintidx v1.0.0 // indirect + github.com/ydb-platform/ydb-go-sdk-metrics v0.16.4 // indirect github.com/yeya24/promlinter v0.2.0 // indirect github.com/ykadowak/zerologlint v0.1.3 // indirect gitlab.com/bosi/decorder v0.4.1 // indirect diff --git a/go.sum b/go.sum index db1363384c..4aeacb9fea 100644 --- a/go.sum +++ b/go.sum @@ -535,6 +535,7 @@ github.com/johannesboyne/gofakes3 v0.0.0-20230914150226-f005f5cc03aa h1:a6Hc6Hlq github.com/johannesboyne/gofakes3 v0.0.0-20230914150226-f005f5cc03aa/go.mod h1:AxgWC4DDX54O2WDoQO1Ceabtn6IbktjU/7bigor+66g= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= github.com/jonboulle/clockwork v0.3.0 h1:9BSCMi8C+0qdApAp4auwX0RkLGUjs956h0EkuQymUhg= github.com/jonboulle/clockwork v0.3.0/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= @@ -897,13 +898,19 @@ github.com/xen0n/gosmopolitan v1.2.2 h1:/p2KTnMzwRexIW8GlKawsTWOxn7UHA+jCMF/V8HH github.com/xen0n/gosmopolitan v1.2.2/go.mod h1:7XX7Mj61uLYrj0qmeN0zi7XDon9JRAEhYQqAPLVNTeg= github.com/yagipy/maintidx v1.0.0 h1:h5NvIsCz+nRDapQ0exNv4aJ0yXSI0420omVANTv3GJM= github.com/yagipy/maintidx v1.0.0/go.mod h1:0qNf/I/CCZXSMhsRsrEPDZ+DkekpKLXAJfsTACwgXLk= +github.com/ydb-platform/ydb-go-genproto v0.0.0-20220801095836-cf975531fd1f/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= github.com/ydb-platform/ydb-go-genproto v0.0.0-20221215182650-986f9d10542f/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= github.com/ydb-platform/ydb-go-genproto v0.0.0-20240126124512-dbb0e1720dbf h1:ckwNHVo4bv2tqNkgx3W3HANh3ta1j6TR5qw08J1A7Tw= github.com/ydb-platform/ydb-go-genproto v0.0.0-20240126124512-dbb0e1720dbf/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= +github.com/ydb-platform/ydb-go-sdk-metrics v0.16.4 h1:IleO/tgxypZ/BQWZNDWomf2qsefGnkCT4qW8J+SW2Zo= +github.com/ydb-platform/ydb-go-sdk-metrics v0.16.4/go.mod h1:bqOjIBSt5LtA8fcTprRPGLvlQGkNlqBSRqnL+yZUJh4= github.com/ydb-platform/ydb-go-sdk-otel v0.4.6 h1:4lMGmHsV28xqUGkW9D88htrKq+gJfUUPlsqHZje70Zc= github.com/ydb-platform/ydb-go-sdk-otel v0.4.6/go.mod h1:FVPMQ67NMCjJH92bfHV23bzqcm8thBZQpbnBIyL7zIo= +github.com/ydb-platform/ydb-go-sdk-prometheus v0.12.1 h1:2gPHVcsGCZfApRAYcBygtqRqQErofTI9urP7lWC1KyU= +github.com/ydb-platform/ydb-go-sdk-prometheus v0.12.1/go.mod h1:FtMM5jNK/L6ksnxWqISn50qDf/NiCpwAnqUzMyMFIVY= github.com/ydb-platform/ydb-go-sdk-zerolog v0.14.0 h1:Srr5Hc7duRtM0PsFhyOPuP9jvZb3v2+UCVcrvDeErQo= github.com/ydb-platform/ydb-go-sdk-zerolog v0.14.0/go.mod h1:UeZclyc1+YWkYkCjf1SPjD1ct228obcG7yhLk+UMzOw= +github.com/ydb-platform/ydb-go-sdk/v3 v3.35.1/go.mod h1:eD5OyVA8MuMq3+BYBMKGUfa2faTZhbx+LE+y1RgitFE= github.com/ydb-platform/ydb-go-sdk/v3 v3.46.0/go.mod h1:oSLwnuilwIpaF5bJJMAofnGgzPJusoI3zWMNb8I+GnM= github.com/ydb-platform/ydb-go-sdk/v3 v3.56.1 h1:AtNjus6Wm5pqK4pfYa9QQ83A2omHpzZuBwILB+ylfJ4= github.com/ydb-platform/ydb-go-sdk/v3 v3.56.1/go.mod h1:h7mpZZIAPq65QDZOO/qGVKyadsXzvv2ncs5fKuIho24= diff --git a/internal/datastore/ydb/options.go b/internal/datastore/ydb/options.go index 2affdfedab..9db341dd6b 100644 --- a/internal/datastore/ydb/options.go +++ b/internal/datastore/ydb/options.go @@ -20,17 +20,12 @@ type ydbConfig struct { bulkLoadBatchSize int - // todo find a way to use it - maxRetries uint8 - - gcEnabled bool - enablePrometheusStats bool + gcEnabled bool } var defaultConfig = ydbConfig{ - tablePathPrefix: "", - watchBufferLength: 0, - watchBufferWriteTimeout: 0, + watchBufferLength: 128, + watchBufferWriteTimeout: time.Second, followerReadDelay: 0 * time.Second, revisionQuantization: 5 * time.Second, maxRevisionStalenessPercent: 0.1, @@ -38,9 +33,7 @@ var defaultConfig = ydbConfig{ gcInterval: 3 * time.Minute, gcMaxOperationTime: time.Minute, bulkLoadBatchSize: 1000, - maxRetries: 0, gcEnabled: false, - enablePrometheusStats: false, } // Option provides the facility to configure how clients within the YDB @@ -117,3 +110,17 @@ func FollowerReadDelay(delay time.Duration) Option { func BulkLoadBatchSize(limit int) Option { return func(o *ydbConfig) { o.bulkLoadBatchSize = limit } } + +// WatchBufferLength is the number of entries that can be stored in the watch +// buffer while awaiting read by the client. +// +// This value defaults to 128. +func WatchBufferLength(watchBufferLength uint16) Option { + return func(o *ydbConfig) { o.watchBufferLength = watchBufferLength } +} + +// WatchBufferWriteTimeout is the maximum timeout for writing to the watch buffer, +// after which the caller to the watch will be disconnected. +func WatchBufferWriteTimeout(watchBufferWriteTimeout time.Duration) Option { + return func(o *ydbConfig) { o.watchBufferWriteTimeout = watchBufferWriteTimeout } +} diff --git a/internal/datastore/ydb/ydb.go b/internal/datastore/ydb/ydb.go index 9881fa1dea..dd6c3c486d 100644 --- a/internal/datastore/ydb/ydb.go +++ b/internal/datastore/ydb/ydb.go @@ -7,7 +7,9 @@ import ( "time" sq "github.com/Masterminds/squirrel" + "github.com/prometheus/client_golang/prometheus" ydbOtel "github.com/ydb-platform/ydb-go-sdk-otel" + ydbPrometheus "github.com/ydb-platform/ydb-go-sdk-prometheus" ydbZerolog "github.com/ydb-platform/ydb-go-sdk-zerolog" "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/table" @@ -74,6 +76,7 @@ func newYDBDatastore(ctx context.Context, dsn string, opts ...Option) (*ydbDatas parsedDSN.OriginalDSN, ydbZerolog.WithTraces(&log.Logger, trace.DatabaseSQLEvents), ydbOtel.WithTraces(), + ydbPrometheus.WithTraces(prometheus.DefaultRegisterer), ) if err != nil { return nil, fmt.Errorf("failed to open YDB connectionn: %w", err) From 4a873dc03cfbd32a836641c0d0aab1141362fea7 Mon Sep 17 00:00:00 2001 From: sashayakovtseva Date: Tue, 27 Feb 2024 16:06:26 +0800 Subject: [PATCH 04/14] Disable watch feature Signed-off-by: sashayakovtseva --- internal/datastore/ydb/ydb.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/datastore/ydb/ydb.go b/internal/datastore/ydb/ydb.go index dd6c3c486d..7c8ee0e2e5 100644 --- a/internal/datastore/ydb/ydb.go +++ b/internal/datastore/ydb/ydb.go @@ -150,7 +150,7 @@ func (y *ydbDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, er } func (y *ydbDatastore) Features(_ context.Context) (*datastore.Features, error) { - return &datastore.Features{Watch: datastore.Feature{Enabled: true}}, nil + return &datastore.Features{Watch: datastore.Feature{Enabled: false}}, nil } func (y *ydbDatastore) SnapshotReader(revision datastore.Revision) datastore.Reader { From 90f5c027cfddfe483fbcb51b1d29d975ea172557 Mon Sep 17 00:00:00 2001 From: sashayakovtseva Date: Tue, 27 Feb 2024 17:03:20 +0800 Subject: [PATCH 05/14] Add YDB options to cmd Signed-off-by: sashayakovtseva --- internal/datastore/ydb/options.go | 22 ++++++++- internal/datastore/ydb/ydb.go | 14 +++--- pkg/cmd/datastore/datastore.go | 81 ++++++++++++++++++++----------- 3 files changed, 81 insertions(+), 36 deletions(-) diff --git a/internal/datastore/ydb/options.go b/internal/datastore/ydb/options.go index 9db341dd6b..4bcbbcda41 100644 --- a/internal/datastore/ydb/options.go +++ b/internal/datastore/ydb/options.go @@ -20,10 +20,12 @@ type ydbConfig struct { bulkLoadBatchSize int - gcEnabled bool + gcEnabled bool + enablePrometheusStats bool } var defaultConfig = ydbConfig{ + tablePathPrefix: "", watchBufferLength: 128, watchBufferWriteTimeout: time.Second, followerReadDelay: 0 * time.Second, @@ -33,7 +35,8 @@ var defaultConfig = ydbConfig{ gcInterval: 3 * time.Minute, gcMaxOperationTime: time.Minute, bulkLoadBatchSize: 1000, - gcEnabled: false, + gcEnabled: true, + enablePrometheusStats: false, } // Option provides the facility to configure how clients within the YDB @@ -73,6 +76,13 @@ func GCInterval(interval time.Duration) Option { return func(o *ydbConfig) { o.gcInterval = interval } } +// GCEnabled indicates whether garbage collection is enabled. +// +// GC is enabled by default. +func GCEnabled(isGCEnabled bool) Option { + return func(o *ydbConfig) { o.gcEnabled = isGCEnabled } +} + // GCMaxOperationTime is the maximum operation time of a garbage collection pass before it times out. // // This value defaults to 1 minute. @@ -124,3 +134,11 @@ func WatchBufferLength(watchBufferLength uint16) Option { func WatchBufferWriteTimeout(watchBufferWriteTimeout time.Duration) Option { return func(o *ydbConfig) { o.watchBufferWriteTimeout = watchBufferWriteTimeout } } + +// WithEnablePrometheusStats marks whether Prometheus metrics provided by the Postgres +// clients being used by the datastore are enabled. +// +// Prometheus metrics are disabled by default. +func WithEnablePrometheusStats(enablePrometheusStats bool) Option { + return func(o *ydbConfig) { o.enablePrometheusStats = enablePrometheusStats } +} diff --git a/internal/datastore/ydb/ydb.go b/internal/datastore/ydb/ydb.go index 7c8ee0e2e5..f0f24b5fa5 100644 --- a/internal/datastore/ydb/ydb.go +++ b/internal/datastore/ydb/ydb.go @@ -71,15 +71,17 @@ func newYDBDatastore(ctx context.Context, dsn string, opts ...Option) (*ydbDatas config.tablePathPrefix = parsedDSN.TablePathPrefix } - db, err := ydb.Open( - ctx, - parsedDSN.OriginalDSN, + ydbOpts := []ydb.Option{ ydbZerolog.WithTraces(&log.Logger, trace.DatabaseSQLEvents), ydbOtel.WithTraces(), - ydbPrometheus.WithTraces(prometheus.DefaultRegisterer), - ) + } + if config.enablePrometheusStats { + ydbOpts = append(ydbOpts, ydbPrometheus.WithTraces(prometheus.DefaultRegisterer)) + } + + db, err := ydb.Open(ctx, parsedDSN.OriginalDSN, ydbOpts...) if err != nil { - return nil, fmt.Errorf("failed to open YDB connectionn: %w", err) + return nil, fmt.Errorf("failed to open YDB connection: %w", err) } if _, err := db.Scheme().ListDirectory(ctx, config.tablePathPrefix); err != nil { diff --git a/pkg/cmd/datastore/datastore.go b/pkg/cmd/datastore/datastore.go index 2fd3eaa7c7..d6bd3fdab0 100644 --- a/pkg/cmd/datastore/datastore.go +++ b/pkg/cmd/datastore/datastore.go @@ -16,6 +16,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/postgres" "github.com/authzed/spicedb/internal/datastore/proxy" "github.com/authzed/spicedb/internal/datastore/spanner" + "github.com/authzed/spicedb/internal/datastore/ydb" log "github.com/authzed/spicedb/internal/logging" "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/validationfile" @@ -37,6 +38,7 @@ var BuilderForEngine = map[string]engineBuilderFunc{ MemoryEngine: newMemoryDatstore, SpannerEngine: newSpannerDatastore, MySQLEngine: newMySQLDatastore, + ydb.Engine: newYDBDatastore, } //go:generate go run github.com/ecordell/optgen -output zz_generated.connpool.options.go . ConnPoolConfig @@ -119,26 +121,27 @@ type Config struct { RequestHedgingMaxRequests uint64 `debugmap:"visible"` RequestHedgingQuantile float64 `debugmap:"visible"` + // common (shared among two or more datastores) + FollowerReadDelay time.Duration `debugmap:"visible"` + GCInterval time.Duration `debugmap:"visible"` + GCMaxOperationTime time.Duration `debugmap:"visible"` + TablePrefix string `debugmap:"visible"` + // CRDB - FollowerReadDelay time.Duration `debugmap:"visible"` MaxRetries int `debugmap:"visible"` OverlapKey string `debugmap:"visible"` OverlapStrategy string `debugmap:"visible"` EnableConnectionBalancing bool `debugmap:"visible"` ConnectRate time.Duration `debugmap:"visible"` - // Postgres - GCInterval time.Duration `debugmap:"visible"` - GCMaxOperationTime time.Duration `debugmap:"visible"` - // Spanner SpannerCredentialsFile string `debugmap:"visible"` SpannerEmulatorHost string `debugmap:"visible"` SpannerMinSessions uint64 `debugmap:"visible"` SpannerMaxSessions uint64 `debugmap:"visible"` - // MySQL - TablePrefix string `debugmap:"visible"` + // YDB + YDBBulkLoadBatchSize int `debugmap:"visible"` // Internal WatchBufferLength uint16 `debugmap:"visible"` @@ -187,8 +190,8 @@ func RegisterDatastoreFlagsWithPrefix(flagSet *pflag.FlagSet, prefix string, opt var unusedSplitQueryCount uint16 flagSet.DurationVar(&opts.GCWindow, flagName("datastore-gc-window"), defaults.GCWindow, "amount of time before revisions are garbage collected") - flagSet.DurationVar(&opts.GCInterval, flagName("datastore-gc-interval"), defaults.GCInterval, "amount of time between passes of garbage collection (postgres driver only)") - flagSet.DurationVar(&opts.GCMaxOperationTime, flagName("datastore-gc-max-operation-time"), defaults.GCMaxOperationTime, "maximum amount of time a garbage collection pass can operate before timing out (postgres driver only)") + flagSet.DurationVar(&opts.GCInterval, flagName("datastore-gc-interval"), defaults.GCInterval, "amount of time between passes of garbage collection (postgres, mysql and ydb driver only)") + flagSet.DurationVar(&opts.GCMaxOperationTime, flagName("datastore-gc-max-operation-time"), defaults.GCMaxOperationTime, "maximum amount of time a garbage collection pass can operate before timing out (postgres, mysql and ydb driver only)") flagSet.DurationVar(&opts.RevisionQuantization, flagName("datastore-revision-quantization-interval"), defaults.RevisionQuantization, "boundary interval to which to round the quantized revision") flagSet.Float64Var(&opts.MaxRevisionStalenessPercent, flagName("datastore-revision-quantization-max-staleness-percent"), defaults.MaxRevisionStalenessPercent, "float percentage (where 1 = 100%) of the revision quantization interval where we may opt to select a stale revision for performance reasons. Defaults to 0.1 (representing 10%)") flagSet.BoolVar(&opts.ReadOnly, flagName("datastore-readonly"), defaults.ReadOnly, "set the service to read-only mode") @@ -201,8 +204,8 @@ func RegisterDatastoreFlagsWithPrefix(flagSet *pflag.FlagSet, prefix string, opt flagSet.Float64Var(&opts.RequestHedgingQuantile, flagName("datastore-request-hedging-quantile"), defaults.RequestHedgingQuantile, "quantile of historical datastore request time over which a request will be considered slow") flagSet.BoolVar(&opts.EnableDatastoreMetrics, flagName("datastore-prometheus-metrics"), defaults.EnableDatastoreMetrics, "set to false to disabled prometheus metrics from the datastore") // See crdb doc for info about follower reads and how it is configured: https://www.cockroachlabs.com/docs/stable/follower-reads.html - flagSet.DurationVar(&opts.FollowerReadDelay, flagName("datastore-follower-read-delay-duration"), 4_800*time.Millisecond, "amount of time to subtract from non-sync revision timestamps to ensure they are sufficiently in the past to enable follower reads (cockroach driver only)") - flagSet.IntVar(&opts.MaxRetries, flagName("datastore-max-tx-retries"), 10, "number of times a retriable transaction should be retried") + flagSet.DurationVar(&opts.FollowerReadDelay, flagName("datastore-follower-read-delay-duration"), defaults.FollowerReadDelay, "amount of time to subtract from non-sync revision timestamps to ensure they are sufficiently in the past to enable follower reads (cockroach and ydb driver only)") + flagSet.IntVar(&opts.MaxRetries, flagName("datastore-max-tx-retries"), defaults.MaxRetries, "number of times a retriable transaction should be retried (except ydb driver)") flagSet.StringVar(&opts.OverlapStrategy, flagName("datastore-tx-overlap-strategy"), "static", `strategy to generate transaction overlap keys ("request", "prefix", "static", "insecure") (cockroach driver only - see https://spicedb.dev/d/crdb-overlap for details)"`) flagSet.StringVar(&opts.OverlapKey, flagName("datastore-tx-overlap-key"), "key", "static key to touch when writing to ensure transactions overlap (only used if --datastore-tx-overlap-strategy=static is set; cockroach driver only)") flagSet.BoolVar(&opts.EnableConnectionBalancing, flagName("datastore-connection-balancing"), defaults.EnableConnectionBalancing, "enable connection balancing between database nodes (cockroach driver only)") @@ -211,10 +214,11 @@ func RegisterDatastoreFlagsWithPrefix(flagSet *pflag.FlagSet, prefix string, opt flagSet.StringVar(&opts.SpannerEmulatorHost, flagName("datastore-spanner-emulator-host"), "", "URI of spanner emulator instance used for development and testing (e.g. localhost:9010)") flagSet.Uint64Var(&opts.SpannerMinSessions, flagName("datastore-spanner-min-sessions"), 100, "minimum number of sessions across all Spanner gRPC connections the client can have at a given time") flagSet.Uint64Var(&opts.SpannerMaxSessions, flagName("datastore-spanner-max-sessions"), 400, "maximum number of sessions across all Spanner gRPC connections the client can have at a given time") - flagSet.StringVar(&opts.TablePrefix, flagName("datastore-mysql-table-prefix"), "", "prefix to add to the name of all SpiceDB database tables") - flagSet.StringVar(&opts.MigrationPhase, flagName("datastore-migration-phase"), "", "datastore-specific flag that should be used to signal to a datastore which phase of a multi-step migration it is in") - flagSet.Uint16Var(&opts.WatchBufferLength, flagName("datastore-watch-buffer-length"), 1024, "how large the watch buffer should be before blocking") - flagSet.DurationVar(&opts.WatchBufferWriteTimeout, flagName("datastore-watch-buffer-write-timeout"), 1*time.Second, "how long the watch buffer should queue before forcefully disconnecting the reader") + flagSet.StringVar(&opts.TablePrefix, flagName("datastore-mysql-table-prefix"), defaults.TablePrefix, "prefix to add to the name of all SpiceDB database tables (mysql and ydb driver only)") + flagSet.StringVar(&opts.MigrationPhase, flagName("datastore-migration-phase"), "", "datastore-specific flag that should be used to signal to a datastore which phase of a multi-step migration it is in (postgres and spanner driver only)") + flagSet.Uint16Var(&opts.WatchBufferLength, flagName("datastore-watch-buffer-length"), defaults.WatchBufferLength, "how large the watch buffer should be before blocking") + flagSet.DurationVar(&opts.WatchBufferWriteTimeout, flagName("datastore-watch-buffer-write-timeout"), defaults.WatchBufferWriteTimeout, "how long the watch buffer should queue before forcefully disconnecting the reader") + flagSet.IntVar(&opts.YDBBulkLoadBatchSize, flagName("datastore-bulk-load-size"), defaults.YDBBulkLoadBatchSize, "number of rows BulkLoad will process in a single batch (ydb driver only)") // disabling stats is only for tests flagSet.BoolVar(&opts.DisableStats, flagName("datastore-disable-stats"), false, "disable recording relationship counts to the stats table") @@ -238,6 +242,7 @@ func RegisterDatastoreFlagsWithPrefix(flagSet *pflag.FlagSet, prefix string, opt func DefaultDatastoreConfig() *Config { return &Config{ Engine: MemoryEngine, + URI: "", GCWindow: 24 * time.Hour, LegacyFuzzing: -1, RevisionQuantization: 5 * time.Second, @@ -245,31 +250,33 @@ func DefaultDatastoreConfig() *Config { ReadConnPool: *DefaultReadConnPool(), WriteConnPool: *DefaultWriteConnPool(), ReadOnly: false, - MaxRetries: 10, - OverlapKey: "key", - OverlapStrategy: "static", - ConnectRate: 100 * time.Millisecond, - EnableConnectionBalancing: true, - GCInterval: 3 * time.Minute, - GCMaxOperationTime: 1 * time.Minute, - WatchBufferLength: 1024, - WatchBufferWriteTimeout: 1 * time.Second, EnableDatastoreMetrics: true, DisableStats: false, BootstrapFiles: []string{}, - BootstrapTimeout: 10 * time.Second, + BootstrapFileContents: nil, BootstrapOverwrite: false, + BootstrapTimeout: 10 * time.Second, RequestHedgingEnabled: false, RequestHedgingInitialSlowValue: 10000000, RequestHedgingMaxRequests: 1_000_000, RequestHedgingQuantile: 0.95, + FollowerReadDelay: 4_800 * time.Millisecond, + GCInterval: 3 * time.Minute, + GCMaxOperationTime: 1 * time.Minute, + TablePrefix: "", + MaxRetries: 10, + OverlapKey: "key", + OverlapStrategy: "static", + EnableConnectionBalancing: true, + ConnectRate: 100 * time.Millisecond, SpannerCredentialsFile: "", SpannerEmulatorHost: "", - TablePrefix: "", - MigrationPhase: "", - FollowerReadDelay: 4_800 * time.Millisecond, SpannerMinSessions: 100, SpannerMaxSessions: 400, + YDBBulkLoadBatchSize: 1000, + WatchBufferLength: 1024, + WatchBufferWriteTimeout: 1 * time.Second, + MigrationPhase: "", } } @@ -418,6 +425,24 @@ func newPostgresDatastore(ctx context.Context, opts Config) (datastore.Datastore return postgres.NewPostgresDatastore(ctx, opts.URI, pgOpts...) } +func newYDBDatastore(ctx context.Context, config Config) (datastore.Datastore, error) { + opts := []ydb.Option{ + ydb.GCWindow(config.GCWindow), + ydb.GCEnabled(!config.ReadOnly), + ydb.RevisionQuantization(config.RevisionQuantization), + ydb.MaxRevisionStalenessPercent(config.MaxRevisionStalenessPercent), + ydb.FollowerReadDelay(config.FollowerReadDelay), + ydb.GCInterval(config.GCInterval), + ydb.GCMaxOperationTime(config.GCMaxOperationTime), + ydb.WithTablePathPrefix(config.TablePrefix), + ydb.WatchBufferLength(config.WatchBufferLength), + ydb.WatchBufferWriteTimeout(config.WatchBufferWriteTimeout), + ydb.BulkLoadBatchSize(config.YDBBulkLoadBatchSize), + ydb.WithEnablePrometheusStats(config.EnableDatastoreMetrics), + } + return ydb.NewYDBDatastore(ctx, config.URI, opts...) +} + func newSpannerDatastore(ctx context.Context, opts Config) (datastore.Datastore, error) { return spanner.NewSpannerDatastore( ctx, From 8ac5b5256c0a63960f1e4b5068e78b4344a99f78 Mon Sep 17 00:00:00 2001 From: sashayakovtseva Date: Tue, 27 Feb 2024 17:19:16 +0800 Subject: [PATCH 06/14] Use sync.Once to close ydb Signed-off-by: sashayakovtseva --- internal/datastore/ydb/ydb.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/internal/datastore/ydb/ydb.go b/internal/datastore/ydb/ydb.go index f0f24b5fa5..f73d0d99ba 100644 --- a/internal/datastore/ydb/ydb.go +++ b/internal/datastore/ydb/ydb.go @@ -3,6 +3,7 @@ package ydb import ( "context" "fmt" + "sync" "sync/atomic" "time" @@ -59,6 +60,7 @@ type ydbDatastore struct { originalDSN string + closeOnce sync.Once // isClosed used in HeadRevision only to pass datastore tests. isClosed atomic.Bool } @@ -110,15 +112,20 @@ func newYDBDatastore(ctx context.Context, dsn string, opts ...Option) (*ydbDatas } func (y *ydbDatastore) Close() error { - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) - defer cancel() - - if err := y.driver.Close(ctx); err != nil { - log.Warn().Err(err).Msg("failed to shutdown YDB driver") + if y.isClosed.Load() { + return nil } - y.isClosed.Store(true) - return nil + var err error + y.closeOnce.Do(func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) + defer cancel() + + err = y.driver.Close(ctx) + y.isClosed.Store(true) + }) + + return err } func (y *ydbDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) { From e9ea5511501f8847492dd23ec02cfc2d224533d6 Mon Sep 17 00:00:00 2001 From: sashayakovtseva Date: Wed, 28 Feb 2024 13:37:16 +0800 Subject: [PATCH 07/14] Fix linter Signed-off-by: sashayakovtseva --- internal/datastore/ydb/common/common.go | 1 - internal/datastore/ydb/reader.go | 1 - internal/datastore/ydb/reader_test.go | 4 +- internal/datastore/ydb/readwrite_test.go | 2 + internal/datastore/ydb/stats_test.go | 2 + internal/datastore/ydb/watch.go | 1 - pkg/cmd/datastore/zz_generated.options.go | 55 +++++++++++++---------- 7 files changed, 38 insertions(+), 28 deletions(-) diff --git a/internal/datastore/ydb/common/common.go b/internal/datastore/ydb/common/common.go index 1005f7ff84..63a0871a4c 100644 --- a/internal/datastore/ydb/common/common.go +++ b/internal/datastore/ydb/common/common.go @@ -53,5 +53,4 @@ func AddTablePrefix(query string, tablePathPrefix string) string { buffer.WriteString(query) return buffer.String() - } diff --git a/internal/datastore/ydb/reader.go b/internal/datastore/ydb/reader.go index 3ce9c39df1..71351edc7c 100644 --- a/internal/datastore/ydb/reader.go +++ b/internal/datastore/ydb/reader.go @@ -215,7 +215,6 @@ func (r *ydbReader) LookupNamespacesWithNames( builder = builder.View(ixUqNamespaceLiving) } return builder - }, ) if err != nil { diff --git a/internal/datastore/ydb/reader_test.go b/internal/datastore/ydb/reader_test.go index d5f380557c..4dc6fc48d1 100644 --- a/internal/datastore/ydb/reader_test.go +++ b/internal/datastore/ydb/reader_test.go @@ -1,3 +1,5 @@ +//go:build ci && docker + package ydb import ( @@ -477,7 +479,6 @@ func TestYDBReaderRelationships(t *testing.T) { t.Cleanup(func() { yDS.Close() }) err = yDS.driver.Table().Do(context.Background(), func(ctx context.Context, s table.Session) error { - stmt, err := s.Prepare( ctx, common.AddTablePrefix(` @@ -570,7 +571,6 @@ func TestYDBReaderRelationships(t *testing.T) { } require.Equal(t, lo.FromPtr(expect.caveatContext), actualCaveatContext) - } testQueryRelationships := func( diff --git a/internal/datastore/ydb/readwrite_test.go b/internal/datastore/ydb/readwrite_test.go index 7d3d0873a3..8b841506c1 100644 --- a/internal/datastore/ydb/readwrite_test.go +++ b/internal/datastore/ydb/readwrite_test.go @@ -1,3 +1,5 @@ +//go:build ci && docker + package ydb import ( diff --git a/internal/datastore/ydb/stats_test.go b/internal/datastore/ydb/stats_test.go index 53af78a75e..e3f84f4513 100644 --- a/internal/datastore/ydb/stats_test.go +++ b/internal/datastore/ydb/stats_test.go @@ -1,3 +1,5 @@ +//go:build ci && docker + package ydb import ( diff --git a/internal/datastore/ydb/watch.go b/internal/datastore/ydb/watch.go index e55d50a83e..e5c9787a92 100644 --- a/internal/datastore/ydb/watch.go +++ b/internal/datastore/ydb/watch.go @@ -54,7 +54,6 @@ func (y *ydbDatastore) Watch( type cdcEvent struct { Key []string NewImage json.RawMessage - Ts []int } type namespaceConfigImage struct { diff --git a/pkg/cmd/datastore/zz_generated.options.go b/pkg/cmd/datastore/zz_generated.options.go index fbb25752f3..4cc5732de8 100644 --- a/pkg/cmd/datastore/zz_generated.options.go +++ b/pkg/cmd/datastore/zz_generated.options.go @@ -51,18 +51,19 @@ func (c *Config) ToOption() ConfigOption { to.RequestHedgingMaxRequests = c.RequestHedgingMaxRequests to.RequestHedgingQuantile = c.RequestHedgingQuantile to.FollowerReadDelay = c.FollowerReadDelay + to.GCInterval = c.GCInterval + to.GCMaxOperationTime = c.GCMaxOperationTime + to.TablePrefix = c.TablePrefix to.MaxRetries = c.MaxRetries to.OverlapKey = c.OverlapKey to.OverlapStrategy = c.OverlapStrategy to.EnableConnectionBalancing = c.EnableConnectionBalancing to.ConnectRate = c.ConnectRate - to.GCInterval = c.GCInterval - to.GCMaxOperationTime = c.GCMaxOperationTime to.SpannerCredentialsFile = c.SpannerCredentialsFile to.SpannerEmulatorHost = c.SpannerEmulatorHost to.SpannerMinSessions = c.SpannerMinSessions to.SpannerMaxSessions = c.SpannerMaxSessions - to.TablePrefix = c.TablePrefix + to.YDBBulkLoadBatchSize = c.YDBBulkLoadBatchSize to.WatchBufferLength = c.WatchBufferLength to.WatchBufferWriteTimeout = c.WatchBufferWriteTimeout to.MigrationPhase = c.MigrationPhase @@ -92,18 +93,19 @@ func (c Config) DebugMap() map[string]any { debugMap["RequestHedgingMaxRequests"] = helpers.DebugValue(c.RequestHedgingMaxRequests, false) debugMap["RequestHedgingQuantile"] = helpers.DebugValue(c.RequestHedgingQuantile, false) debugMap["FollowerReadDelay"] = helpers.DebugValue(c.FollowerReadDelay, false) + debugMap["GCInterval"] = helpers.DebugValue(c.GCInterval, false) + debugMap["GCMaxOperationTime"] = helpers.DebugValue(c.GCMaxOperationTime, false) + debugMap["TablePrefix"] = helpers.DebugValue(c.TablePrefix, false) debugMap["MaxRetries"] = helpers.DebugValue(c.MaxRetries, false) debugMap["OverlapKey"] = helpers.DebugValue(c.OverlapKey, false) debugMap["OverlapStrategy"] = helpers.DebugValue(c.OverlapStrategy, false) debugMap["EnableConnectionBalancing"] = helpers.DebugValue(c.EnableConnectionBalancing, false) debugMap["ConnectRate"] = helpers.DebugValue(c.ConnectRate, false) - debugMap["GCInterval"] = helpers.DebugValue(c.GCInterval, false) - debugMap["GCMaxOperationTime"] = helpers.DebugValue(c.GCMaxOperationTime, false) debugMap["SpannerCredentialsFile"] = helpers.DebugValue(c.SpannerCredentialsFile, false) debugMap["SpannerEmulatorHost"] = helpers.DebugValue(c.SpannerEmulatorHost, false) debugMap["SpannerMinSessions"] = helpers.DebugValue(c.SpannerMinSessions, false) debugMap["SpannerMaxSessions"] = helpers.DebugValue(c.SpannerMaxSessions, false) - debugMap["TablePrefix"] = helpers.DebugValue(c.TablePrefix, false) + debugMap["YDBBulkLoadBatchSize"] = helpers.DebugValue(c.YDBBulkLoadBatchSize, false) debugMap["WatchBufferLength"] = helpers.DebugValue(c.WatchBufferLength, false) debugMap["WatchBufferWriteTimeout"] = helpers.DebugValue(c.WatchBufferWriteTimeout, false) debugMap["MigrationPhase"] = helpers.DebugValue(c.MigrationPhase, false) @@ -280,6 +282,27 @@ func WithFollowerReadDelay(followerReadDelay time.Duration) ConfigOption { } } +// WithGCInterval returns an option that can set GCInterval on a Config +func WithGCInterval(gCInterval time.Duration) ConfigOption { + return func(c *Config) { + c.GCInterval = gCInterval + } +} + +// WithGCMaxOperationTime returns an option that can set GCMaxOperationTime on a Config +func WithGCMaxOperationTime(gCMaxOperationTime time.Duration) ConfigOption { + return func(c *Config) { + c.GCMaxOperationTime = gCMaxOperationTime + } +} + +// WithTablePrefix returns an option that can set TablePrefix on a Config +func WithTablePrefix(tablePrefix string) ConfigOption { + return func(c *Config) { + c.TablePrefix = tablePrefix + } +} + // WithMaxRetries returns an option that can set MaxRetries on a Config func WithMaxRetries(maxRetries int) ConfigOption { return func(c *Config) { @@ -315,20 +338,6 @@ func WithConnectRate(connectRate time.Duration) ConfigOption { } } -// WithGCInterval returns an option that can set GCInterval on a Config -func WithGCInterval(gCInterval time.Duration) ConfigOption { - return func(c *Config) { - c.GCInterval = gCInterval - } -} - -// WithGCMaxOperationTime returns an option that can set GCMaxOperationTime on a Config -func WithGCMaxOperationTime(gCMaxOperationTime time.Duration) ConfigOption { - return func(c *Config) { - c.GCMaxOperationTime = gCMaxOperationTime - } -} - // WithSpannerCredentialsFile returns an option that can set SpannerCredentialsFile on a Config func WithSpannerCredentialsFile(spannerCredentialsFile string) ConfigOption { return func(c *Config) { @@ -357,10 +366,10 @@ func WithSpannerMaxSessions(spannerMaxSessions uint64) ConfigOption { } } -// WithTablePrefix returns an option that can set TablePrefix on a Config -func WithTablePrefix(tablePrefix string) ConfigOption { +// WithYDBBulkLoadBatchSize returns an option that can set YDBBulkLoadBatchSize on a Config +func WithYDBBulkLoadBatchSize(yDBBulkLoadBatchSize int) ConfigOption { return func(c *Config) { - c.TablePrefix = tablePrefix + c.YDBBulkLoadBatchSize = yDBBulkLoadBatchSize } } From 344e26ff733893d4979c2c3a9ef93f3838d95543 Mon Sep 17 00:00:00 2001 From: sashayakovtseva Date: Thu, 29 Feb 2024 05:52:35 +0400 Subject: [PATCH 08/14] Fix error string Signed-off-by: sashayakovtseva --- internal/datastore/ydb/query.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/datastore/ydb/query.go b/internal/datastore/ydb/query.go index 5191526152..149a3b6c1a 100644 --- a/internal/datastore/ydb/query.go +++ b/internal/datastore/ydb/query.go @@ -167,7 +167,7 @@ func queryRow( return err } if !res.NextRow() { - return fmt.Errorf("no unique id rows") + return fmt.Errorf("no rows in result set") } if err := res.Scan(values...); err != nil { return err From f87d036d715684f2aa975be940ae140e6523e9cb Mon Sep 17 00:00:00 2001 From: sashayakovtseva Date: Fri, 1 Mar 2024 14:51:49 +0300 Subject: [PATCH 09/14] Add cache option for tx executor Signed-off-by: sashayakovtseva --- internal/datastore/ydb/query.go | 16 ++++++++++++++++ internal/datastore/ydb/ydb.go | 7 ++++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/internal/datastore/ydb/query.go b/internal/datastore/ydb/query.go index 149a3b6c1a..603da5bd02 100644 --- a/internal/datastore/ydb/query.go +++ b/internal/datastore/ydb/query.go @@ -146,6 +146,22 @@ func (se sessionQueryExecutor) Execute( return res, err } +// txQueryExecutor implements queryExecutor for YDB transactional actor. +// This is a convenient wrapper to add custom options for all queries. +type txQueryExecutor struct { + tx table.TransactionActor +} + +func (t txQueryExecutor) Execute( + ctx context.Context, + query string, + params *table.QueryParameters, + opts ...options.ExecuteDataQueryOption, +) (result.Result, error) { + opts = append(opts, options.WithKeepInCache(true)) + return t.tx.Execute(ctx, query, params, opts...) +} + func queryRow( ctx context.Context, executor queryExecutor, diff --git a/internal/datastore/ydb/ydb.go b/internal/datastore/ydb/ydb.go index f73d0d99ba..0c63cdaec7 100644 --- a/internal/datastore/ydb/ydb.go +++ b/internal/datastore/ydb/ydb.go @@ -198,7 +198,12 @@ func (y *ydbDatastore) ReadWriteTx( newRev = revisions.NewForTimestamp(now) rw := &ydbReadWriter{ - ydbReader: newYDBReader(y.config.tablePathPrefix, tx, livingObjectModifier, false), + ydbReader: newYDBReader( + y.config.tablePathPrefix, + txQueryExecutor{tx: tx}, + livingObjectModifier, + false, + ), bulkLoadBatchSize: y.config.bulkLoadBatchSize, newRevision: newRev, } From c1530409c34c909ac15ca905b712dbbfc5574c16 Mon Sep 17 00:00:00 2001 From: sashayakovtseva Date: Fri, 1 Mar 2024 18:03:13 +0300 Subject: [PATCH 10/14] Add a few spans Signed-off-by: sashayakovtseva --- internal/datastore/ydb/query.go | 6 ++++++ internal/datastore/ydb/readwrite.go | 7 ++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/internal/datastore/ydb/query.go b/internal/datastore/ydb/query.go index 603da5bd02..d2470aff3f 100644 --- a/internal/datastore/ydb/query.go +++ b/internal/datastore/ydb/query.go @@ -295,6 +295,9 @@ func executeDeleteQuery( deleteRev revisions.TimestampRevision, pred sq.Sqlizer, ) error { + ctx, span := tracer.Start(ctx, "executeDeleteQuery") + defer span.End() + return executeQuery( ctx, tablePathPrefix, @@ -305,6 +308,9 @@ func executeDeleteQuery( // executeQuery is a helper for queries that don't care about result set. func executeQuery(ctx context.Context, tablePathPrefix string, executor queryExecutor, q sq.Yqliser) error { + ctx, span := tracer.Start(ctx, "executeQuery") + defer span.End() + sql, args, err := q.ToYQL() if err != nil { return fmt.Errorf("failed to build query: %w", err) diff --git a/internal/datastore/ydb/readwrite.go b/internal/datastore/ydb/readwrite.go index 4150befe7f..3c7fef4dcb 100644 --- a/internal/datastore/ydb/readwrite.go +++ b/internal/datastore/ydb/readwrite.go @@ -9,6 +9,7 @@ import ( "github.com/jzelinskie/stringz" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/proto" @@ -302,7 +303,8 @@ func (rw *ydbReadWriter) selectTuples( ctx context.Context, in []*core.RelationTuple, ) ([]*core.RelationTuple, error) { - span := trace.SpanFromContext(ctx) + ctx, span := tracer.Start(ctx, "selectTuples", trace.WithAttributes(attribute.Int("count", len(in)))) + defer span.End() if len(in) == 0 { return nil, nil @@ -339,6 +341,9 @@ func writeDefinitions[T coreDefinition]( defs []T, useVT bool, ) error { + ctx, span := tracer.Start(ctx, "writeDefinitions", trace.WithAttributes(attribute.Int("count", len(defs)))) + defer span.End() + if len(defs) == 0 { return nil } From 27e2d1c634069136c86ac1ade943710495beabfa Mon Sep 17 00:00:00 2001 From: sashayakovtseva Date: Mon, 4 Mar 2024 13:04:19 +0300 Subject: [PATCH 11/14] Fix schema watch Signed-off-by: sashayakovtseva --- internal/datastore/ydb/watch.go | 319 +++++++++++++++++++------------- internal/datastore/ydb/ydb.go | 5 +- 2 files changed, 197 insertions(+), 127 deletions(-) diff --git a/internal/datastore/ydb/watch.go b/internal/datastore/ydb/watch.go index e5c9787a92..788ce4c892 100644 --- a/internal/datastore/ydb/watch.go +++ b/internal/datastore/ydb/watch.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "strconv" - "strings" "time" "github.com/google/uuid" @@ -14,9 +13,11 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes" "github.com/authzed/spicedb/internal/datastore/common" "github.com/authzed/spicedb/internal/datastore/revisions" + log "github.com/authzed/spicedb/internal/logging" "github.com/authzed/spicedb/pkg/datastore" core "github.com/authzed/spicedb/pkg/proto/core/v1" "github.com/authzed/spicedb/pkg/spiceerrors" @@ -46,30 +47,54 @@ func (y *ydbDatastore) Watch( return updates, errs } + y.watchWg.Add(1) go y.watch(ctx, afterRevision, options, updates, errs) return updates, errs } type cdcEvent struct { - Key []string + Key []cdcKeyElement NewImage json.RawMessage } +type cdcKeyElement struct { + String string + Int64 int64 +} + +func (c *cdcKeyElement) UnmarshalJSON(in []byte) error { + if len(in) == 0 { + return fmt.Errorf("unexpected element len of 0") + } + + if in[0] == '"' { + c.String = string(in[1 : len(in)-1]) + } else { + var err error + c.Int64, err = strconv.ParseInt(string(in), 10, 64) + if err != nil { + return fmt.Errorf("failed to parse int64 component: %w", err) + } + } + + return nil +} + type namespaceConfigImage struct { - SerializedConfig []byte - DeletedAtUnixNano *int64 + SerializedConfig []byte `json:"serialized_config"` + DeletedAtUnixNano *int64 `json:"deleted_at_unix_nano"` } type caveatImage struct { - Definition []byte - DeletedAtUnixNano *int64 + Definition []byte `json:"definition"` + DeletedAtUnixNano *int64 `json:"deleted_at_unix_nano"` } type relationTupleImage struct { - CaveatName *string - CaveatContext *[]byte - DeletedAtUnixNano *int64 + CaveatName *string `json:"caveat_name"` + CaveatContext *[]byte `json:"caveat_context"` + DeletedAtUnixNano *int64 `json:"deleted_at_unix_nano"` } func (y *ydbDatastore) watch( @@ -79,6 +104,7 @@ func (y *ydbDatastore) watch( updates chan *datastore.RevisionChanges, errs chan error, ) { + defer y.watchWg.Done() defer close(updates) defer close(errs) @@ -87,6 +113,7 @@ func (y *ydbDatastore) watch( errs <- fmt.Errorf("expected timestamp revision, got %T", afterRevision) return } + readFromTime := afterTimestampRevision.Time() tableToTopicName := map[string]string{ tableRelationTuple: y.config.tablePathPrefix + "/" + tableRelationTuple + "/" + changefeedSpicedbWatch, @@ -114,7 +141,6 @@ func (y *ydbDatastore) watch( errs <- datastore.NewWatchCanceledErr() return } - errs <- err } @@ -143,18 +169,45 @@ func (y *ydbDatastore) watch( } } + consumerUUID := uuid.New().String() + log.Trace(). + Strs("topic", topics). + Time("read_from", readFromTime). + Str("consumer_uuid", consumerUUID). + Msg("starting YDB reader") + var selectors topicoptions.ReadSelectors for _, topic := range topics { selectors = append(selectors, topicoptions.ReadSelector{ Path: topic, - ReadFrom: afterTimestampRevision.Time(), + ReadFrom: readFromTime, }, ) + + if err := y.driver.Topic().Alter(ctx, topic, + topicoptions.AlterWithAddConsumers(topictypes.Consumer{ + Name: consumerUUID, + SupportedCodecs: []topictypes.Codec{topictypes.CodecRaw}, + ReadFrom: readFromTime, + }), + ); err != nil { + sendError(fmt.Errorf("failed to create consumer: %w", err)) + return + } + defer func(topic string) { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) + defer cancel() + + err := y.driver.Topic().Alter(ctx, topic, topicoptions.AlterWithDropConsumers(consumerUUID)) + if err != nil { + log.Warn().Str("topic", topic).Str("consumer", consumerUUID).Err(err).Msg("failed to remove consumer") + } + }(topic) } reader, err := y.driver.Topic().StartReader( - uuid.New().String(), + consumerUUID, selectors, topicoptions.WithReaderCommitMode(topicoptions.CommitModeSync), ) @@ -169,181 +222,195 @@ func (y *ydbDatastore) watch( return } + // WARNING! This is for test purpose only and only for namespace-aware schema watch! + // todo Fix this when 'resolved'-like messages are supported. + const expectedChangesCount = 4 + var ( rErr error msg *topicreader.Message event cdcEvent tracked = common.NewChanges(revisions.TimestampIDKeyFunc, opts.Content) + + changesCount int + changesRevision revisions.TimestampRevision ) - for msg, rErr = reader.ReadMessage(ctx); rErr != nil; msg, rErr = reader.ReadMessage(ctx) { + + for msg, rErr = reader.ReadMessage(ctx); rErr == nil; msg, rErr = reader.ReadMessage(ctx) { if err := topicsugar.JSONUnmarshal(msg, &event); err != nil { sendError(fmt.Errorf("failed to unmarshal cdc event: %w", err)) return } - // todo - // Resolved indicates that the specified revision is "complete"; no additional updates can come in before or at it. - // Therefore, at this point, we issue tracked updates from before that time, and the checkpoint update. - if false { - rev := revisions.NewForTimestamp(1) - changes := tracked.FilterAndRemoveRevisionChanges(revisions.TimestampIDKeyLessThanFunc, rev) - for i := range changes { - if !sendChange(&changes[i]) { - return - } - } - - if opts.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints { - if !sendChange(&datastore.RevisionChanges{ - Revision: rev, - IsCheckpoint: true, - }) { - return - } - } - continue - } + log.Trace(). + Str("topic", msg.Topic()). + Int64("partition", msg.PartitionID()). + Str("message_group_id", msg.MessageGroupID). + Str("producer_id", msg.ProducerID). + Int64("seq_no", msg.SeqNo). + Int64("offset", msg.Offset). + Time("created_at", msg.CreatedAt). + Time("written_at", msg.WrittenAt). + Any("metadata", msg.Metadata). + Any("event", event). + Msg("got new YDB CDC event") switch topicToTableName[msg.Topic()] { case tableRelationTuple: if len(event.Key) != 7 { - err := spiceerrors.MustBugf( - "unexpected PK size. want 7, got %d (%q)", - len(event.Key), strings.Join(event.Key, ","), - ) - sendError(err) + sendError(spiceerrors.MustBugf("unexpected PK size. want 7, got %d (%v)", len(event.Key), event.Key)) return } - createdAtUnixNano, _ := strconv.Atoi(event.Key[6]) - createRev := revisions.NewForTimestamp(int64(createdAtUnixNano)) + createdAtUnixNano := event.Key[6].Int64 + createRev := revisions.NewForTimestamp(createdAtUnixNano) tuple := &core.RelationTuple{ ResourceAndRelation: &core.ObjectAndRelation{ - Namespace: event.Key[0], - ObjectId: event.Key[1], - Relation: event.Key[2], + Namespace: event.Key[0].String, + ObjectId: event.Key[1].String, + Relation: event.Key[2].String, }, Subject: &core.ObjectAndRelation{ - Namespace: event.Key[3], - ObjectId: event.Key[4], - Relation: event.Key[5], + Namespace: event.Key[3].String, + ObjectId: event.Key[4].String, + Relation: event.Key[5].String, }, } - if event.NewImage != nil { - var changes relationTupleImage - if err := json.Unmarshal(event.NewImage, &changes); err != nil { - sendError(fmt.Errorf("failed to unmarshal relation tuple new image: %w", err)) + if event.NewImage == nil { + break + } + + var changes relationTupleImage + if err := json.Unmarshal(event.NewImage, &changes); err != nil { + sendError(fmt.Errorf("failed to unmarshal relation tuple new image: %w", err)) + return + } + + var structuredCtx map[string]any + if changes.CaveatContext != nil { + if err := json.Unmarshal(*changes.CaveatContext, &structuredCtx); err != nil { + sendError(fmt.Errorf("failed to unmarshal caveat context new image: %w", err)) return } + } - var structuredCtx map[string]any - if changes.CaveatContext != nil { - if err := json.Unmarshal(*changes.CaveatContext, &structuredCtx); err != nil { - sendError(fmt.Errorf("failed to unmarshal caveat context new image: %w", err)) - return - } - } + ctxCaveat, err := common.ContextualizedCaveatFrom(lo.FromPtr(changes.CaveatName), structuredCtx) + if err != nil { + sendError(err) + return + } + tuple.Caveat = ctxCaveat - ctxCaveat, err := common.ContextualizedCaveatFrom(lo.FromPtr(changes.CaveatName), structuredCtx) + if changes.DeletedAtUnixNano != nil { + deleteRev := revisions.NewForTimestamp(*changes.DeletedAtUnixNano) + err := tracked.AddRelationshipChange(ctx, deleteRev, tuple, core.RelationTupleUpdate_DELETE) if err != nil { sendError(err) return } - tuple.Caveat = ctxCaveat - - if changes.DeletedAtUnixNano == nil { - err := tracked.AddRelationshipChange(ctx, createRev, tuple, core.RelationTupleUpdate_TOUCH) - if err != nil { - sendError(err) - return - } - } else { - deleteRev := revisions.NewForTimestamp(*changes.DeletedAtUnixNano) - err := tracked.AddRelationshipChange(ctx, deleteRev, tuple, core.RelationTupleUpdate_DELETE) - if err != nil { - sendError(err) - return - } - } + break + } + + if err := tracked.AddRelationshipChange(ctx, createRev, tuple, core.RelationTupleUpdate_TOUCH); err != nil { + sendError(err) + return } case tableNamespaceConfig: if len(event.Key) != 2 { - err := spiceerrors.MustBugf( - "unexpected PK size. want 2, got %d (%q)", - len(event.Key), strings.Join(event.Key, ","), - ) - sendError(err) + sendError(spiceerrors.MustBugf("unexpected PK size. want 2, got %d (%v)", len(event.Key), event.Key)) return } - namespaceName := event.Key[0] - createdAtUnixNano, _ := strconv.Atoi(event.Key[1]) - createRev := revisions.NewForTimestamp(int64(createdAtUnixNano)) + namespaceName := event.Key[0].String + createRev := revisions.NewForTimestamp(event.Key[1].Int64) - if event.NewImage != nil { - var changes namespaceConfigImage - if err := json.Unmarshal(event.NewImage, &changes); err != nil { - sendError(fmt.Errorf("failed to unmarshal namespace new image: %w", err)) - return - } + if event.NewImage == nil { + break + } - var loaded core.NamespaceDefinition - if err := loaded.UnmarshalVT(changes.SerializedConfig); err != nil { - sendError(fmt.Errorf("failed to unmarshal namespace config: %w", err)) - return - } + var changes namespaceConfigImage + if err := json.Unmarshal(event.NewImage, &changes); err != nil { + sendError(fmt.Errorf("failed to unmarshal namespace new image: %w", err)) + return + } + changesCount++ + changesRevision = createRev - if changes.DeletedAtUnixNano == nil { - tracked.AddChangedDefinition(ctx, createRev, &loaded) - } else { - deleteRev := revisions.NewForTimestamp(*changes.DeletedAtUnixNano) - tracked.AddDeletedNamespace(ctx, deleteRev, namespaceName) - } + if changes.DeletedAtUnixNano != nil { + deleteRev := revisions.NewForTimestamp(*changes.DeletedAtUnixNano) + tracked.AddDeletedNamespace(ctx, deleteRev, namespaceName) + break + } + + var loaded core.NamespaceDefinition + if err := loaded.UnmarshalVT(changes.SerializedConfig); err != nil { + sendError(fmt.Errorf("failed to unmarshal namespace config: %w", err)) + return } + tracked.AddChangedDefinition(ctx, createRev, &loaded) case tableCaveat: if len(event.Key) != 2 { - err := spiceerrors.MustBugf( - "unexpected PK size. want 2, got %d (%q)", - len(event.Key), strings.Join(event.Key, ","), - ) - sendError(err) + sendError(spiceerrors.MustBugf("unexpected PK size. want 2, got %d (%v)", len(event.Key), event.Key)) return } - caveatName := event.Key[0] - createdAtUnixNano, _ := strconv.Atoi(event.Key[1]) - createRev := revisions.NewForTimestamp(int64(createdAtUnixNano)) + caveatName := event.Key[0].String + createRev := revisions.NewForTimestamp(event.Key[1].Int64) - if event.NewImage != nil { - var changes caveatImage - if err := json.Unmarshal(event.NewImage, &changes); err != nil { - sendError(fmt.Errorf("failed to unmarshal caveat new image: %w", err)) - return - } + if event.NewImage == nil { + break + } - var loaded core.CaveatDefinition - if err := loaded.UnmarshalVT(changes.Definition); err != nil { - sendError(fmt.Errorf("failed to unmarshal caveat config: %w", err)) - return - } + var changes caveatImage + if err := json.Unmarshal(event.NewImage, &changes); err != nil { + sendError(fmt.Errorf("failed to unmarshal caveat new image: %w", err)) + return + } - if changes.DeletedAtUnixNano == nil { - tracked.AddChangedDefinition(ctx, createRev, &loaded) - } else { - deleteRev := revisions.NewForTimestamp(*changes.DeletedAtUnixNano) - tracked.AddDeletedNamespace(ctx, deleteRev, caveatName) - } + if changes.DeletedAtUnixNano != nil { + deleteRev := revisions.NewForTimestamp(*changes.DeletedAtUnixNano) + tracked.AddDeletedNamespace(ctx, deleteRev, caveatName) + break + } + + var loaded core.CaveatDefinition + if err := loaded.UnmarshalVT(changes.Definition); err != nil { + sendError(fmt.Errorf("failed to unmarshal caveat config: %w", err)) + return } + tracked.AddChangedDefinition(ctx, createRev, &loaded) } if err := reader.Commit(ctx, msg); err != nil { sendError(fmt.Errorf("failed to commit offset: %w", err)) return } + + if changesCount == expectedChangesCount { + changes := tracked.AsRevisionChanges(revisions.TimestampIDKeyLessThanFunc) + for i := range changes { + if !sendChange(&changes[i]) { + return + } + } + + if opts.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints { + if !sendChange(&datastore.RevisionChanges{ + Revision: changesRevision, + IsCheckpoint: true, + }) { + return + } + } + + changesCount = 0 + changesRevision = 0 + tracked = common.NewChanges(revisions.TimestampIDKeyFunc, opts.Content) + } } if rErr != nil { diff --git a/internal/datastore/ydb/ydb.go b/internal/datastore/ydb/ydb.go index 0c63cdaec7..9ccb296d8c 100644 --- a/internal/datastore/ydb/ydb.go +++ b/internal/datastore/ydb/ydb.go @@ -63,6 +63,7 @@ type ydbDatastore struct { closeOnce sync.Once // isClosed used in HeadRevision only to pass datastore tests. isClosed atomic.Bool + watchWg sync.WaitGroup } func newYDBDatastore(ctx context.Context, dsn string, opts ...Option) (*ydbDatastore, error) { @@ -118,6 +119,8 @@ func (y *ydbDatastore) Close() error { var err error y.closeOnce.Do(func() { + y.watchWg.Wait() + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) defer cancel() @@ -159,7 +162,7 @@ func (y *ydbDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, er } func (y *ydbDatastore) Features(_ context.Context) (*datastore.Features, error) { - return &datastore.Features{Watch: datastore.Feature{Enabled: false}}, nil + return &datastore.Features{Watch: datastore.Feature{Enabled: true}}, nil } func (y *ydbDatastore) SnapshotReader(revision datastore.Revision) datastore.Reader { From a576962abb0654e1eaa50116e1b9e017768df109 Mon Sep 17 00:00:00 2001 From: sashayakovtseva Date: Mon, 4 Mar 2024 16:59:04 +0300 Subject: [PATCH 12/14] Auto partition by load only Signed-off-by: sashayakovtseva --- .../zz_migration.0001_initial_schema.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/internal/datastore/ydb/migrations/zz_migration.0001_initial_schema.go b/internal/datastore/ydb/migrations/zz_migration.0001_initial_schema.go index 29d9b5ea24..54490c76d1 100644 --- a/internal/datastore/ydb/migrations/zz_migration.0001_initial_schema.go +++ b/internal/datastore/ydb/migrations/zz_migration.0001_initial_schema.go @@ -36,7 +36,6 @@ CREATE TABLE metadata ( PRIMARY KEY (unique_id) );` - // todo AUTO_PARTITIONING_BY_LOAD? // ideally PK should be (namespace, deleted_at_unix_nano), but since deleted_at_unix_nano is // updated during delete operation it cannot be used. simply (namespace) is also not applicable // b/c there might be deleted namespaces with the same name as currently living. @@ -49,9 +48,13 @@ CREATE TABLE namespace_config ( deleted_at_unix_nano Int64, PRIMARY KEY (namespace, created_at_unix_nano), INDEX uq_namespace_living GLOBAL SYNC ON (deleted_at_unix_nano, namespace) COVER (serialized_config) +) +WITH ( + AUTO_PARTITIONING_BY_SIZE = DISABLED, + AUTO_PARTITIONING_BY_LOAD = ENABLED, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 3 );` - // todo AUTO_PARTITIONING_BY_LOAD? // ideally PK should be (name, deleted_at_unix_nano), but since deleted_at_unix_nano is // updated during delete operation it cannot be used. simply (name) is also not applicable // b/c there might be deleted caveats with the same name as currently living. @@ -64,10 +67,14 @@ CREATE TABLE caveat ( deleted_at_unix_nano Int64, PRIMARY KEY (name, created_at_unix_nano), INDEX uq_caveat_living GLOBAL SYNC ON (deleted_at_unix_nano, name) COVER (definition) +) +WITH ( + AUTO_PARTITIONING_BY_SIZE = DISABLED, + AUTO_PARTITIONING_BY_LOAD = ENABLED, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 3 );` // todo use correct indexes. - // todo AUTO_PARTITIONING_BY_LOAD? createRelationTuple = ` CREATE TABLE relation_tuple ( namespace Utf8 NOT NULL, @@ -86,9 +93,13 @@ CREATE TABLE relation_tuple ( INDEX ix_relation_tuple_by_subject_relation GLOBAL SYNC ON (userset_namespace, userset_relation, namespace, relation), INDEX ix_relation_tuple_alive_by_resource_rel_subject_covering GLOBAL SYNC ON (namespace, relation, userset_namespace) COVER (caveat_name, caveat_context), INDEX ix_gc_index GLOBAL SYNC ON (deleted_at_unix_nano) +) +WITH ( + AUTO_PARTITIONING_BY_SIZE = DISABLED, + AUTO_PARTITIONING_BY_LOAD = ENABLED, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 3 );` - // todo TOPIC_MIN_ACTIVE_PARTITIONS? createNamespaceConfigChangefeed = ` ALTER TABLE namespace_config ADD CHANGEFEED spicedb_watch From 34ab6589c0e7c673c38918e571f3ef3e517bc969 Mon Sep 17 00:00:00 2001 From: sashayakovtseva Date: Tue, 5 Mar 2024 13:06:09 +0300 Subject: [PATCH 13/14] Fix build after RW tx interface change Signed-off-by: sashayakovtseva --- internal/datastore/ydb/reader.go | 2 ++ internal/datastore/ydb/readwrite.go | 18 +++++++++++++++--- internal/datastore/ydb/ydb.go | 2 +- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/internal/datastore/ydb/reader.go b/internal/datastore/ydb/reader.go index 71351edc7c..345879ed8e 100644 --- a/internal/datastore/ydb/reader.go +++ b/internal/datastore/ydb/reader.go @@ -15,6 +15,8 @@ import ( core "github.com/authzed/spicedb/pkg/proto/core/v1" ) +var _ datastore.Reader = (*ydbReader)(nil) + type ydbReader struct { tablePathPrefix string executor queryExecutor diff --git a/internal/datastore/ydb/readwrite.go b/internal/datastore/ydb/readwrite.go index 3c7fef4dcb..46e1b9bbd2 100644 --- a/internal/datastore/ydb/readwrite.go +++ b/internal/datastore/ydb/readwrite.go @@ -17,11 +17,14 @@ import ( "github.com/authzed/spicedb/internal/datastore/revisions" ydbCommon "github.com/authzed/spicedb/internal/datastore/ydb/common" "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/datastore/options" core "github.com/authzed/spicedb/pkg/proto/core/v1" "github.com/authzed/spicedb/pkg/spiceerrors" "github.com/authzed/spicedb/pkg/tuple" ) +var _ datastore.ReadWriteTransaction = (*ydbReadWriter)(nil) + type ydbReadWriter struct { *ydbReader bulkLoadBatchSize int @@ -166,7 +169,16 @@ func (rw *ydbReadWriter) WriteRelationships(ctx context.Context, mutations []*co } // DeleteRelationships deletes all Relationships that match the provided filter. -func (rw *ydbReadWriter) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error { +func (rw *ydbReadWriter) DeleteRelationships( + ctx context.Context, + filter *v1.RelationshipFilter, + opts ...options.DeleteOptionsOption, +) (bool, error) { + delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...) + if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 { + return false, fmt.Errorf("limit is currently not supported") + } + pred := sq.Eq{ colNamespace: filter.GetResourceType(), } @@ -196,10 +208,10 @@ func (rw *ydbReadWriter) DeleteRelationships(ctx context.Context, filter *v1.Rel rw.newRevision, pred, ); err != nil { - return fmt.Errorf("failed to delete relations: %w", err) + return false, fmt.Errorf("failed to delete relations: %w", err) } - return nil + return false, nil } // WriteNamespaces takes proto namespace definitions and persists them. diff --git a/internal/datastore/ydb/ydb.go b/internal/datastore/ydb/ydb.go index 9ccb296d8c..1f8651248a 100644 --- a/internal/datastore/ydb/ydb.go +++ b/internal/datastore/ydb/ydb.go @@ -32,7 +32,7 @@ func init() { } var ( - _ datastore.Datastore = &ydbDatastore{} + _ datastore.Datastore = (*ydbDatastore)(nil) ParseRevisionString = revisions.RevisionParser(revisions.Timestamp) From c9ad4890dfcb0797084433272075e7c6b44b5fb0 Mon Sep 17 00:00:00 2001 From: sashayakovtseva Date: Tue, 5 Mar 2024 14:48:28 +0300 Subject: [PATCH 14/14] Increase drop consumer timeout Signed-off-by: sashayakovtseva --- internal/datastore/ydb/watch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/datastore/ydb/watch.go b/internal/datastore/ydb/watch.go index 788ce4c892..49abca0ab1 100644 --- a/internal/datastore/ydb/watch.go +++ b/internal/datastore/ydb/watch.go @@ -196,7 +196,7 @@ func (y *ydbDatastore) watch( return } defer func(topic string) { - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() err := y.driver.Topic().Alter(ctx, topic, topicoptions.AlterWithDropConsumers(consumerUUID))