Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement datastore watch #12

Merged
merged 14 commits into from
Mar 7, 2024
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240126124512-dbb0e1720dbf
github.com/ydb-platform/ydb-go-sdk-otel v0.4.6
github.com/ydb-platform/ydb-go-sdk-prometheus v0.12.1
github.com/ydb-platform/ydb-go-sdk-zerolog v0.14.0
github.com/ydb-platform/ydb-go-sdk/v3 v3.56.1
go.opencensus.io v0.24.0
Expand Down Expand Up @@ -322,6 +323,7 @@ require (
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
github.com/xen0n/gosmopolitan v1.2.2 // indirect
github.com/yagipy/maintidx v1.0.0 // indirect
github.com/ydb-platform/ydb-go-sdk-metrics v0.16.4 // indirect
github.com/yeya24/promlinter v0.2.0 // indirect
github.com/ykadowak/zerologlint v0.1.3 // indirect
gitlab.com/bosi/decorder v0.4.1 // indirect
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ github.com/johannesboyne/gofakes3 v0.0.0-20230914150226-f005f5cc03aa h1:a6Hc6Hlq
github.com/johannesboyne/gofakes3 v0.0.0-20230914150226-f005f5cc03aa/go.mod h1:AxgWC4DDX54O2WDoQO1Ceabtn6IbktjU/7bigor+66g=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
github.com/jonboulle/clockwork v0.3.0 h1:9BSCMi8C+0qdApAp4auwX0RkLGUjs956h0EkuQymUhg=
github.com/jonboulle/clockwork v0.3.0/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
Expand Down Expand Up @@ -897,13 +898,19 @@ github.com/xen0n/gosmopolitan v1.2.2 h1:/p2KTnMzwRexIW8GlKawsTWOxn7UHA+jCMF/V8HH
github.com/xen0n/gosmopolitan v1.2.2/go.mod h1:7XX7Mj61uLYrj0qmeN0zi7XDon9JRAEhYQqAPLVNTeg=
github.com/yagipy/maintidx v1.0.0 h1:h5NvIsCz+nRDapQ0exNv4aJ0yXSI0420omVANTv3GJM=
github.com/yagipy/maintidx v1.0.0/go.mod h1:0qNf/I/CCZXSMhsRsrEPDZ+DkekpKLXAJfsTACwgXLk=
github.com/ydb-platform/ydb-go-genproto v0.0.0-20220801095836-cf975531fd1f/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
github.com/ydb-platform/ydb-go-genproto v0.0.0-20221215182650-986f9d10542f/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240126124512-dbb0e1720dbf h1:ckwNHVo4bv2tqNkgx3W3HANh3ta1j6TR5qw08J1A7Tw=
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240126124512-dbb0e1720dbf/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
github.com/ydb-platform/ydb-go-sdk-metrics v0.16.4 h1:IleO/tgxypZ/BQWZNDWomf2qsefGnkCT4qW8J+SW2Zo=
github.com/ydb-platform/ydb-go-sdk-metrics v0.16.4/go.mod h1:bqOjIBSt5LtA8fcTprRPGLvlQGkNlqBSRqnL+yZUJh4=
github.com/ydb-platform/ydb-go-sdk-otel v0.4.6 h1:4lMGmHsV28xqUGkW9D88htrKq+gJfUUPlsqHZje70Zc=
github.com/ydb-platform/ydb-go-sdk-otel v0.4.6/go.mod h1:FVPMQ67NMCjJH92bfHV23bzqcm8thBZQpbnBIyL7zIo=
github.com/ydb-platform/ydb-go-sdk-prometheus v0.12.1 h1:2gPHVcsGCZfApRAYcBygtqRqQErofTI9urP7lWC1KyU=
github.com/ydb-platform/ydb-go-sdk-prometheus v0.12.1/go.mod h1:FtMM5jNK/L6ksnxWqISn50qDf/NiCpwAnqUzMyMFIVY=
github.com/ydb-platform/ydb-go-sdk-zerolog v0.14.0 h1:Srr5Hc7duRtM0PsFhyOPuP9jvZb3v2+UCVcrvDeErQo=
github.com/ydb-platform/ydb-go-sdk-zerolog v0.14.0/go.mod h1:UeZclyc1+YWkYkCjf1SPjD1ct228obcG7yhLk+UMzOw=
github.com/ydb-platform/ydb-go-sdk/v3 v3.35.1/go.mod h1:eD5OyVA8MuMq3+BYBMKGUfa2faTZhbx+LE+y1RgitFE=
github.com/ydb-platform/ydb-go-sdk/v3 v3.46.0/go.mod h1:oSLwnuilwIpaF5bJJMAofnGgzPJusoI3zWMNb8I+GnM=
github.com/ydb-platform/ydb-go-sdk/v3 v3.56.1 h1:AtNjus6Wm5pqK4pfYa9QQ83A2omHpzZuBwILB+ylfJ4=
github.com/ydb-platform/ydb-go-sdk/v3 v3.56.1/go.mod h1:h7mpZZIAPq65QDZOO/qGVKyadsXzvv2ncs5fKuIho24=
Expand Down
1 change: 0 additions & 1 deletion internal/datastore/ydb/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,4 @@ func AddTablePrefix(query string, tablePathPrefix string) string {
buffer.WriteString(query)

return buffer.String()

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ CREATE TABLE metadata (
PRIMARY KEY (unique_id)
);`

// todo AUTO_PARTITIONING_BY_LOAD?
// ideally PK should be (namespace, deleted_at_unix_nano), but since deleted_at_unix_nano is
// updated during delete operation it cannot be used. simply (namespace) is also not applicable
// b/c there might be deleted namespaces with the same name as currently living.
Expand All @@ -49,9 +48,13 @@ CREATE TABLE namespace_config (
deleted_at_unix_nano Int64,
PRIMARY KEY (namespace, created_at_unix_nano),
INDEX uq_namespace_living GLOBAL SYNC ON (deleted_at_unix_nano, namespace) COVER (serialized_config)
)
WITH (
AUTO_PARTITIONING_BY_SIZE = DISABLED,
AUTO_PARTITIONING_BY_LOAD = ENABLED,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 3
);`

// todo AUTO_PARTITIONING_BY_LOAD?
// ideally PK should be (name, deleted_at_unix_nano), but since deleted_at_unix_nano is
// updated during delete operation it cannot be used. simply (name) is also not applicable
// b/c there might be deleted caveats with the same name as currently living.
Expand All @@ -64,11 +67,14 @@ CREATE TABLE caveat (
deleted_at_unix_nano Int64,
PRIMARY KEY (name, created_at_unix_nano),
INDEX uq_caveat_living GLOBAL SYNC ON (deleted_at_unix_nano, name) COVER (definition)
)
WITH (
AUTO_PARTITIONING_BY_SIZE = DISABLED,
AUTO_PARTITIONING_BY_LOAD = ENABLED,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 3
);`

// todo discuss JsonDocument instead of Json.
// todo check Ensure on insert, check indexed.
// todo AUTO_PARTITIONING_BY_LOAD?
// todo use correct indexes.
createRelationTuple = `
CREATE TABLE relation_tuple (
namespace Utf8 NOT NULL,
Expand All @@ -87,6 +93,41 @@ CREATE TABLE relation_tuple (
INDEX ix_relation_tuple_by_subject_relation GLOBAL SYNC ON (userset_namespace, userset_relation, namespace, relation),
INDEX ix_relation_tuple_alive_by_resource_rel_subject_covering GLOBAL SYNC ON (namespace, relation, userset_namespace) COVER (caveat_name, caveat_context),
INDEX ix_gc_index GLOBAL SYNC ON (deleted_at_unix_nano)
)
WITH (
AUTO_PARTITIONING_BY_SIZE = DISABLED,
AUTO_PARTITIONING_BY_LOAD = ENABLED,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 3
);`

createNamespaceConfigChangefeed = `
ALTER TABLE namespace_config
ADD CHANGEFEED spicedb_watch
WITH (
FORMAT = 'JSON',
MODE = 'NEW_IMAGE',
RETENTION_PERIOD = Interval('PT1H'),
VIRTUAL_TIMESTAMPS = TRUE
);`

createCaveatChangefeed = `
ALTER TABLE caveat
ADD CHANGEFEED spicedb_watch
WITH (
FORMAT = 'JSON',
MODE = 'NEW_IMAGE',
RETENTION_PERIOD = Interval('PT1H'),
VIRTUAL_TIMESTAMPS = TRUE
);`

createRelationTupleChangefeed = `
ALTER TABLE relation_tuple
ADD CHANGEFEED spicedb_watch
WITH (
FORMAT = 'JSON',
MODE = 'NEW_IMAGE',
RETENTION_PERIOD = Interval('PT1H'),
VIRTUAL_TIMESTAMPS = TRUE
);`

insertUniqueID = `INSERT INTO metadata (unique_id) VALUES (CAST(RandomUuid(1) as String));`
Expand All @@ -101,6 +142,9 @@ func init() {
common.AddTablePrefix(createNamespaceConfig, client.opts.tablePathPrefix),
common.AddTablePrefix(createCaveat, client.opts.tablePathPrefix),
common.AddTablePrefix(createRelationTuple, client.opts.tablePathPrefix),
common.AddTablePrefix(createNamespaceConfigChangefeed, client.opts.tablePathPrefix),
common.AddTablePrefix(createCaveatChangefeed, client.opts.tablePathPrefix),
common.AddTablePrefix(createRelationTupleChangefeed, client.opts.tablePathPrefix),
}
for _, stmt := range statements {
if err := s.ExecuteSchemeQuery(ctx, stmt); err != nil {
Expand Down
39 changes: 32 additions & 7 deletions internal/datastore/ydb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,22 @@ type ydbConfig struct {

bulkLoadBatchSize int

// todo find a way to use it
maxRetries uint8

gcEnabled bool
enablePrometheusStats bool
}

var defaultConfig = ydbConfig{
tablePathPrefix: "",
watchBufferLength: 0,
watchBufferWriteTimeout: 0,
watchBufferLength: 128,
watchBufferWriteTimeout: time.Second,
followerReadDelay: 0 * time.Second,
revisionQuantization: 5 * time.Second,
maxRevisionStalenessPercent: 0.1,
gcWindow: 24 * time.Hour,
gcInterval: 3 * time.Minute,
gcMaxOperationTime: time.Minute,
bulkLoadBatchSize: 1000,
maxRetries: 0,
gcEnabled: false,
gcEnabled: true,
enablePrometheusStats: false,
}

Expand Down Expand Up @@ -80,6 +76,13 @@ func GCInterval(interval time.Duration) Option {
return func(o *ydbConfig) { o.gcInterval = interval }
}

// GCEnabled indicates whether garbage collection is enabled.
//
// GC is enabled by default.
func GCEnabled(isGCEnabled bool) Option {
return func(o *ydbConfig) { o.gcEnabled = isGCEnabled }
}

// GCMaxOperationTime is the maximum operation time of a garbage collection pass before it times out.
//
// This value defaults to 1 minute.
Expand Down Expand Up @@ -117,3 +120,25 @@ func FollowerReadDelay(delay time.Duration) Option {
func BulkLoadBatchSize(limit int) Option {
return func(o *ydbConfig) { o.bulkLoadBatchSize = limit }
}

// WatchBufferLength is the number of entries that can be stored in the watch
// buffer while awaiting read by the client.
//
// This value defaults to 128.
func WatchBufferLength(watchBufferLength uint16) Option {
return func(o *ydbConfig) { o.watchBufferLength = watchBufferLength }
}

// WatchBufferWriteTimeout is the maximum timeout for writing to the watch buffer,
// after which the caller to the watch will be disconnected.
func WatchBufferWriteTimeout(watchBufferWriteTimeout time.Duration) Option {
return func(o *ydbConfig) { o.watchBufferWriteTimeout = watchBufferWriteTimeout }
}

// WithEnablePrometheusStats marks whether Prometheus metrics provided by the Postgres
// clients being used by the datastore are enabled.
//
// Prometheus metrics are disabled by default.
func WithEnablePrometheusStats(enablePrometheusStats bool) Option {
return func(o *ydbConfig) { o.enablePrometheusStats = enablePrometheusStats }
}
26 changes: 25 additions & 1 deletion internal/datastore/ydb/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
)

const (
changefeedSpicedbWatch = "spicedb_watch"

// common
colCreatedAtUnixNano = "created_at_unix_nano"
colDeletedAtUnixNano = "deleted_at_unix_nano"
Expand Down Expand Up @@ -144,6 +146,22 @@ func (se sessionQueryExecutor) Execute(
return res, err
}

// txQueryExecutor implements queryExecutor for YDB transactional actor.
// This is a convenient wrapper to add custom options for all queries.
type txQueryExecutor struct {
tx table.TransactionActor
}

func (t txQueryExecutor) Execute(
ctx context.Context,
query string,
params *table.QueryParameters,
opts ...options.ExecuteDataQueryOption,
) (result.Result, error) {
opts = append(opts, options.WithKeepInCache(true))
return t.tx.Execute(ctx, query, params, opts...)
}

func queryRow(
ctx context.Context,
executor queryExecutor,
Expand All @@ -165,7 +183,7 @@ func queryRow(
return err
}
if !res.NextRow() {
return fmt.Errorf("no unique id rows")
return fmt.Errorf("no rows in result set")
}
if err := res.Scan(values...); err != nil {
return err
Expand Down Expand Up @@ -277,6 +295,9 @@ func executeDeleteQuery(
deleteRev revisions.TimestampRevision,
pred sq.Sqlizer,
) error {
ctx, span := tracer.Start(ctx, "executeDeleteQuery")
defer span.End()

return executeQuery(
ctx,
tablePathPrefix,
Expand All @@ -287,6 +308,9 @@ func executeDeleteQuery(

// executeQuery is a helper for queries that don't care about result set.
func executeQuery(ctx context.Context, tablePathPrefix string, executor queryExecutor, q sq.Yqliser) error {
ctx, span := tracer.Start(ctx, "executeQuery")
defer span.End()

sql, args, err := q.ToYQL()
if err != nil {
return fmt.Errorf("failed to build query: %w", err)
Expand Down
3 changes: 2 additions & 1 deletion internal/datastore/ydb/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
core "github.com/authzed/spicedb/pkg/proto/core/v1"
)

var _ datastore.Reader = (*ydbReader)(nil)

type ydbReader struct {
tablePathPrefix string
executor queryExecutor
Expand Down Expand Up @@ -215,7 +217,6 @@ func (r *ydbReader) LookupNamespacesWithNames(
builder = builder.View(ixUqNamespaceLiving)
}
return builder

},
)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/ydb/reader_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:build ci && docker

package ydb

import (
Expand Down Expand Up @@ -477,7 +479,6 @@ func TestYDBReaderRelationships(t *testing.T) {
t.Cleanup(func() { yDS.Close() })

err = yDS.driver.Table().Do(context.Background(), func(ctx context.Context, s table.Session) error {

stmt, err := s.Prepare(
ctx,
common.AddTablePrefix(`
Expand Down Expand Up @@ -570,7 +571,6 @@ func TestYDBReaderRelationships(t *testing.T) {
}

require.Equal(t, lo.FromPtr(expect.caveatContext), actualCaveatContext)

}

testQueryRelationships := func(
Expand Down
25 changes: 21 additions & 4 deletions internal/datastore/ydb/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,22 @@ import (
"github.com/jzelinskie/stringz"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/proto"

datastoreCommon "github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/internal/datastore/revisions"
ydbCommon "github.com/authzed/spicedb/internal/datastore/ydb/common"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/spiceerrors"
"github.com/authzed/spicedb/pkg/tuple"
)

var _ datastore.ReadWriteTransaction = (*ydbReadWriter)(nil)

type ydbReadWriter struct {
*ydbReader
bulkLoadBatchSize int
Expand Down Expand Up @@ -165,7 +169,16 @@ func (rw *ydbReadWriter) WriteRelationships(ctx context.Context, mutations []*co
}

// DeleteRelationships deletes all Relationships that match the provided filter.
func (rw *ydbReadWriter) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter) error {
func (rw *ydbReadWriter) DeleteRelationships(
ctx context.Context,
filter *v1.RelationshipFilter,
opts ...options.DeleteOptionsOption,
) (bool, error) {
delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...)
if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 {
return false, fmt.Errorf("limit is currently not supported")
}

pred := sq.Eq{
colNamespace: filter.GetResourceType(),
}
Expand Down Expand Up @@ -195,10 +208,10 @@ func (rw *ydbReadWriter) DeleteRelationships(ctx context.Context, filter *v1.Rel
rw.newRevision,
pred,
); err != nil {
return fmt.Errorf("failed to delete relations: %w", err)
return false, fmt.Errorf("failed to delete relations: %w", err)
}

return nil
return false, nil
}

// WriteNamespaces takes proto namespace definitions and persists them.
Expand Down Expand Up @@ -302,7 +315,8 @@ func (rw *ydbReadWriter) selectTuples(
ctx context.Context,
in []*core.RelationTuple,
) ([]*core.RelationTuple, error) {
span := trace.SpanFromContext(ctx)
ctx, span := tracer.Start(ctx, "selectTuples", trace.WithAttributes(attribute.Int("count", len(in))))
defer span.End()

if len(in) == 0 {
return nil, nil
Expand Down Expand Up @@ -339,6 +353,9 @@ func writeDefinitions[T coreDefinition](
defs []T,
useVT bool,
) error {
ctx, span := tracer.Start(ctx, "writeDefinitions", trace.WithAttributes(attribute.Int("count", len(defs))))
defer span.End()

if len(defs) == 0 {
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions internal/datastore/ydb/readwrite_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:build ci && docker

package ydb

import (
Expand Down
2 changes: 2 additions & 0 deletions internal/datastore/ydb/stats_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:build ci && docker

package ydb

import (
Expand Down
Loading
Loading