Skip to content

Commit

Permalink
Implement datastore watch (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
sashayakovtseva committed Mar 7, 2024
1 parent 3b5ea9d commit 04604b9
Show file tree
Hide file tree
Showing 16 changed files with 707 additions and 94 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240126124512-dbb0e1720dbf
github.com/ydb-platform/ydb-go-sdk-otel v0.4.6
github.com/ydb-platform/ydb-go-sdk-prometheus v0.12.1
github.com/ydb-platform/ydb-go-sdk-zerolog v0.14.0
github.com/ydb-platform/ydb-go-sdk/v3 v3.56.1
go.opencensus.io v0.24.0
Expand Down Expand Up @@ -322,6 +323,7 @@ require (
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
github.com/xen0n/gosmopolitan v1.2.2 // indirect
github.com/yagipy/maintidx v1.0.0 // indirect
github.com/ydb-platform/ydb-go-sdk-metrics v0.16.4 // indirect
github.com/yeya24/promlinter v0.2.0 // indirect
github.com/ykadowak/zerologlint v0.1.3 // indirect
gitlab.com/bosi/decorder v0.4.1 // indirect
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ github.com/johannesboyne/gofakes3 v0.0.0-20230914150226-f005f5cc03aa h1:a6Hc6Hlq
github.com/johannesboyne/gofakes3 v0.0.0-20230914150226-f005f5cc03aa/go.mod h1:AxgWC4DDX54O2WDoQO1Ceabtn6IbktjU/7bigor+66g=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
github.com/jonboulle/clockwork v0.3.0 h1:9BSCMi8C+0qdApAp4auwX0RkLGUjs956h0EkuQymUhg=
github.com/jonboulle/clockwork v0.3.0/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
Expand Down Expand Up @@ -897,13 +898,19 @@ github.com/xen0n/gosmopolitan v1.2.2 h1:/p2KTnMzwRexIW8GlKawsTWOxn7UHA+jCMF/V8HH
github.com/xen0n/gosmopolitan v1.2.2/go.mod h1:7XX7Mj61uLYrj0qmeN0zi7XDon9JRAEhYQqAPLVNTeg=
github.com/yagipy/maintidx v1.0.0 h1:h5NvIsCz+nRDapQ0exNv4aJ0yXSI0420omVANTv3GJM=
github.com/yagipy/maintidx v1.0.0/go.mod h1:0qNf/I/CCZXSMhsRsrEPDZ+DkekpKLXAJfsTACwgXLk=
github.com/ydb-platform/ydb-go-genproto v0.0.0-20220801095836-cf975531fd1f/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
github.com/ydb-platform/ydb-go-genproto v0.0.0-20221215182650-986f9d10542f/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240126124512-dbb0e1720dbf h1:ckwNHVo4bv2tqNkgx3W3HANh3ta1j6TR5qw08J1A7Tw=
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240126124512-dbb0e1720dbf/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
github.com/ydb-platform/ydb-go-sdk-metrics v0.16.4 h1:IleO/tgxypZ/BQWZNDWomf2qsefGnkCT4qW8J+SW2Zo=
github.com/ydb-platform/ydb-go-sdk-metrics v0.16.4/go.mod h1:bqOjIBSt5LtA8fcTprRPGLvlQGkNlqBSRqnL+yZUJh4=
github.com/ydb-platform/ydb-go-sdk-otel v0.4.6 h1:4lMGmHsV28xqUGkW9D88htrKq+gJfUUPlsqHZje70Zc=
github.com/ydb-platform/ydb-go-sdk-otel v0.4.6/go.mod h1:FVPMQ67NMCjJH92bfHV23bzqcm8thBZQpbnBIyL7zIo=
github.com/ydb-platform/ydb-go-sdk-prometheus v0.12.1 h1:2gPHVcsGCZfApRAYcBygtqRqQErofTI9urP7lWC1KyU=
github.com/ydb-platform/ydb-go-sdk-prometheus v0.12.1/go.mod h1:FtMM5jNK/L6ksnxWqISn50qDf/NiCpwAnqUzMyMFIVY=
github.com/ydb-platform/ydb-go-sdk-zerolog v0.14.0 h1:Srr5Hc7duRtM0PsFhyOPuP9jvZb3v2+UCVcrvDeErQo=
github.com/ydb-platform/ydb-go-sdk-zerolog v0.14.0/go.mod h1:UeZclyc1+YWkYkCjf1SPjD1ct228obcG7yhLk+UMzOw=
github.com/ydb-platform/ydb-go-sdk/v3 v3.35.1/go.mod h1:eD5OyVA8MuMq3+BYBMKGUfa2faTZhbx+LE+y1RgitFE=
github.com/ydb-platform/ydb-go-sdk/v3 v3.46.0/go.mod h1:oSLwnuilwIpaF5bJJMAofnGgzPJusoI3zWMNb8I+GnM=
github.com/ydb-platform/ydb-go-sdk/v3 v3.56.1 h1:AtNjus6Wm5pqK4pfYa9QQ83A2omHpzZuBwILB+ylfJ4=
github.com/ydb-platform/ydb-go-sdk/v3 v3.56.1/go.mod h1:h7mpZZIAPq65QDZOO/qGVKyadsXzvv2ncs5fKuIho24=
Expand Down
1 change: 0 additions & 1 deletion internal/datastore/ydb/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,4 @@ func AddTablePrefix(query string, tablePathPrefix string) string {
buffer.WriteString(query)

return buffer.String()

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ CREATE TABLE metadata (
PRIMARY KEY (unique_id)
);`

// todo AUTO_PARTITIONING_BY_LOAD?
// ideally PK should be (namespace, deleted_at_unix_nano), but since deleted_at_unix_nano is
// updated during delete operation it cannot be used. simply (namespace) is also not applicable
// b/c there might be deleted namespaces with the same name as currently living.
Expand All @@ -49,9 +48,13 @@ CREATE TABLE namespace_config (
deleted_at_unix_nano Int64,
PRIMARY KEY (namespace, created_at_unix_nano),
INDEX uq_namespace_living GLOBAL SYNC ON (deleted_at_unix_nano, namespace) COVER (serialized_config)
)
WITH (
AUTO_PARTITIONING_BY_SIZE = DISABLED,
AUTO_PARTITIONING_BY_LOAD = ENABLED,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 3
);`

// todo AUTO_PARTITIONING_BY_LOAD?
// ideally PK should be (name, deleted_at_unix_nano), but since deleted_at_unix_nano is
// updated during delete operation it cannot be used. simply (name) is also not applicable
// b/c there might be deleted caveats with the same name as currently living.
Expand All @@ -64,11 +67,14 @@ CREATE TABLE caveat (
deleted_at_unix_nano Int64,
PRIMARY KEY (name, created_at_unix_nano),
INDEX uq_caveat_living GLOBAL SYNC ON (deleted_at_unix_nano, name) COVER (definition)
)
WITH (
AUTO_PARTITIONING_BY_SIZE = DISABLED,
AUTO_PARTITIONING_BY_LOAD = ENABLED,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 3
);`

// todo discuss JsonDocument instead of Json.
// todo check Ensure on insert, check indexed.
// todo AUTO_PARTITIONING_BY_LOAD?
// todo use correct indexes.
createRelationTuple = `
CREATE TABLE relation_tuple (
namespace Utf8 NOT NULL,
Expand All @@ -87,6 +93,41 @@ CREATE TABLE relation_tuple (
INDEX ix_relation_tuple_by_subject_relation GLOBAL SYNC ON (userset_namespace, userset_relation, namespace, relation),
INDEX ix_relation_tuple_alive_by_resource_rel_subject_covering GLOBAL SYNC ON (namespace, relation, userset_namespace) COVER (caveat_name, caveat_context),
INDEX ix_gc_index GLOBAL SYNC ON (deleted_at_unix_nano)
)
WITH (
AUTO_PARTITIONING_BY_SIZE = DISABLED,
AUTO_PARTITIONING_BY_LOAD = ENABLED,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 3
);`

createNamespaceConfigChangefeed = `
ALTER TABLE namespace_config
ADD CHANGEFEED spicedb_watch
WITH (
FORMAT = 'JSON',
MODE = 'NEW_IMAGE',
RETENTION_PERIOD = Interval('PT1H'),
VIRTUAL_TIMESTAMPS = TRUE
);`

createCaveatChangefeed = `
ALTER TABLE caveat
ADD CHANGEFEED spicedb_watch
WITH (
FORMAT = 'JSON',
MODE = 'NEW_IMAGE',
RETENTION_PERIOD = Interval('PT1H'),
VIRTUAL_TIMESTAMPS = TRUE
);`

createRelationTupleChangefeed = `
ALTER TABLE relation_tuple
ADD CHANGEFEED spicedb_watch
WITH (
FORMAT = 'JSON',
MODE = 'NEW_IMAGE',
RETENTION_PERIOD = Interval('PT1H'),
VIRTUAL_TIMESTAMPS = TRUE
);`

insertUniqueID = `INSERT INTO metadata (unique_id) VALUES (CAST(RandomUuid(1) as String));`
Expand All @@ -101,6 +142,9 @@ func init() {
common.AddTablePrefix(createNamespaceConfig, client.opts.tablePathPrefix),
common.AddTablePrefix(createCaveat, client.opts.tablePathPrefix),
common.AddTablePrefix(createRelationTuple, client.opts.tablePathPrefix),
common.AddTablePrefix(createNamespaceConfigChangefeed, client.opts.tablePathPrefix),
common.AddTablePrefix(createCaveatChangefeed, client.opts.tablePathPrefix),
common.AddTablePrefix(createRelationTupleChangefeed, client.opts.tablePathPrefix),
}
for _, stmt := range statements {
if err := s.ExecuteSchemeQuery(ctx, stmt); err != nil {
Expand Down
39 changes: 32 additions & 7 deletions internal/datastore/ydb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,22 @@ type ydbConfig struct {

bulkLoadBatchSize int

// todo find a way to use it
maxRetries uint8

gcEnabled bool
enablePrometheusStats bool
}

var defaultConfig = ydbConfig{
tablePathPrefix: "",
watchBufferLength: 0,
watchBufferWriteTimeout: 0,
watchBufferLength: 128,
watchBufferWriteTimeout: time.Second,
followerReadDelay: 0 * time.Second,
revisionQuantization: 5 * time.Second,
maxRevisionStalenessPercent: 0.1,
gcWindow: 24 * time.Hour,
gcInterval: 3 * time.Minute,
gcMaxOperationTime: time.Minute,
bulkLoadBatchSize: 1000,
maxRetries: 0,
gcEnabled: false,
gcEnabled: true,
enablePrometheusStats: false,
}

Expand Down Expand Up @@ -80,6 +76,13 @@ func GCInterval(interval time.Duration) Option {
return func(o *ydbConfig) { o.gcInterval = interval }
}

// GCEnabled indicates whether garbage collection is enabled.
//
// GC is enabled by default.
func GCEnabled(isGCEnabled bool) Option {
return func(o *ydbConfig) { o.gcEnabled = isGCEnabled }
}

// GCMaxOperationTime is the maximum operation time of a garbage collection pass before it times out.
//
// This value defaults to 1 minute.
Expand Down Expand Up @@ -117,3 +120,25 @@ func FollowerReadDelay(delay time.Duration) Option {
func BulkLoadBatchSize(limit int) Option {
return func(o *ydbConfig) { o.bulkLoadBatchSize = limit }
}

// WatchBufferLength is the number of entries that can be stored in the watch
// buffer while awaiting read by the client.
//
// This value defaults to 128.
func WatchBufferLength(watchBufferLength uint16) Option {
return func(o *ydbConfig) { o.watchBufferLength = watchBufferLength }
}

// WatchBufferWriteTimeout is the maximum timeout for writing to the watch buffer,
// after which the caller to the watch will be disconnected.
func WatchBufferWriteTimeout(watchBufferWriteTimeout time.Duration) Option {
return func(o *ydbConfig) { o.watchBufferWriteTimeout = watchBufferWriteTimeout }
}

// WithEnablePrometheusStats marks whether Prometheus metrics provided by the Postgres
// clients being used by the datastore are enabled.
//
// Prometheus metrics are disabled by default.
func WithEnablePrometheusStats(enablePrometheusStats bool) Option {
return func(o *ydbConfig) { o.enablePrometheusStats = enablePrometheusStats }
}
26 changes: 25 additions & 1 deletion internal/datastore/ydb/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
)

const (
changefeedSpicedbWatch = "spicedb_watch"

// common
colCreatedAtUnixNano = "created_at_unix_nano"
colDeletedAtUnixNano = "deleted_at_unix_nano"
Expand Down Expand Up @@ -144,6 +146,22 @@ func (se sessionQueryExecutor) Execute(
return res, err
}

// txQueryExecutor implements queryExecutor for YDB transactional actor.
// This is a convenient wrapper to add custom options for all queries.
type txQueryExecutor struct {
tx table.TransactionActor
}

func (t txQueryExecutor) Execute(
ctx context.Context,
query string,
params *table.QueryParameters,
opts ...options.ExecuteDataQueryOption,
) (result.Result, error) {
opts = append(opts, options.WithKeepInCache(true))
return t.tx.Execute(ctx, query, params, opts...)
}

func queryRow(
ctx context.Context,
executor queryExecutor,
Expand All @@ -165,7 +183,7 @@ func queryRow(
return err
}
if !res.NextRow() {
return fmt.Errorf("no unique id rows")
return fmt.Errorf("no rows in result set")
}
if err := res.Scan(values...); err != nil {
return err
Expand Down Expand Up @@ -277,6 +295,9 @@ func executeDeleteQuery(
deleteRev revisions.TimestampRevision,
pred sq.Sqlizer,
) error {
ctx, span := tracer.Start(ctx, "executeDeleteQuery")
defer span.End()

return executeQuery(
ctx,
tablePathPrefix,
Expand All @@ -287,6 +308,9 @@ func executeDeleteQuery(

// executeQuery is a helper for queries that don't care about result set.
func executeQuery(ctx context.Context, tablePathPrefix string, executor queryExecutor, q sq.Yqliser) error {
ctx, span := tracer.Start(ctx, "executeQuery")
defer span.End()

sql, args, err := q.ToYQL()
if err != nil {
return fmt.Errorf("failed to build query: %w", err)
Expand Down
3 changes: 2 additions & 1 deletion internal/datastore/ydb/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
core "github.com/authzed/spicedb/pkg/proto/core/v1"
)

var _ datastore.Reader = (*ydbReader)(nil)

type ydbReader struct {
tablePathPrefix string
executor queryExecutor
Expand Down Expand Up @@ -215,7 +217,6 @@ func (r *ydbReader) LookupNamespacesWithNames(
builder = builder.View(ixUqNamespaceLiving)
}
return builder

},
)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/ydb/reader_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:build ci && docker

package ydb

import (
Expand Down Expand Up @@ -477,7 +479,6 @@ func TestYDBReaderRelationships(t *testing.T) {
t.Cleanup(func() { yDS.Close() })

err = yDS.driver.Table().Do(context.Background(), func(ctx context.Context, s table.Session) error {

stmt, err := s.Prepare(
ctx,
common.AddTablePrefix(`
Expand Down Expand Up @@ -570,7 +571,6 @@ func TestYDBReaderRelationships(t *testing.T) {
}

require.Equal(t, lo.FromPtr(expect.caveatContext), actualCaveatContext)

}

testQueryRelationships := func(
Expand Down
25 changes: 21 additions & 4 deletions internal/datastore/ydb/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,22 @@ import (
"github.com/jzelinskie/stringz"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/proto"

datastoreCommon "github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/internal/datastore/revisions"
ydbCommon "github.com/authzed/spicedb/internal/datastore/ydb/common"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/spiceerrors"
"github.com/authzed/spicedb/pkg/tuple"
)

var _ datastore.ReadWriteTransaction = (*ydbReadWriter)(nil)

type ydbReadWriter struct {
*ydbReader
bulkLoadBatchSize int
Expand Down Expand Up @@ -165,7 +169,16 @@ func (rw *ydbReadWriter) WriteRelationships(ctx context.Context, mutations []*co
}

// DeleteRelationships deletes all Relationships that match the provided filter.
func (rw *ydbReadWriter) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error {
func (rw *ydbReadWriter) DeleteRelationships(
ctx context.Context,
filter *v1.RelationshipFilter,
opts ...options.DeleteOptionsOption,
) (bool, error) {
delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...)
if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 {
return false, fmt.Errorf("limit is currently not supported")
}

pred := sq.Eq{
colNamespace: filter.GetResourceType(),
}
Expand Down Expand Up @@ -195,10 +208,10 @@ func (rw *ydbReadWriter) DeleteRelationships(ctx context.Context, filter *v1.Rel
rw.newRevision,
pred,
); err != nil {
return fmt.Errorf("failed to delete relations: %w", err)
return false, fmt.Errorf("failed to delete relations: %w", err)
}

return nil
return false, nil
}

// WriteNamespaces takes proto namespace definitions and persists them.
Expand Down Expand Up @@ -302,7 +315,8 @@ func (rw *ydbReadWriter) selectTuples(
ctx context.Context,
in []*core.RelationTuple,
) ([]*core.RelationTuple, error) {
span := trace.SpanFromContext(ctx)
ctx, span := tracer.Start(ctx, "selectTuples", trace.WithAttributes(attribute.Int("count", len(in))))
defer span.End()

if len(in) == 0 {
return nil, nil
Expand Down Expand Up @@ -339,6 +353,9 @@ func writeDefinitions[T coreDefinition](
defs []T,
useVT bool,
) error {
ctx, span := tracer.Start(ctx, "writeDefinitions", trace.WithAttributes(attribute.Int("count", len(defs))))
defer span.End()

if len(defs) == 0 {
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions internal/datastore/ydb/readwrite_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:build ci && docker

package ydb

import (
Expand Down
2 changes: 2 additions & 0 deletions internal/datastore/ydb/stats_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:build ci && docker

package ydb

import (
Expand Down
Loading

0 comments on commit 04604b9

Please sign in to comment.