Skip to content

Commit

Permalink
Implement YDB revisions and namespace reader (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
sashayakovtseva authored Feb 16, 2024
1 parent ba949a8 commit 682711a
Show file tree
Hide file tree
Showing 9 changed files with 604 additions and 99 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/envoyproxy/protoc-gen-validate v1.0.2
github.com/exaring/otelpgx v0.5.2
github.com/fatih/color v1.15.0
github.com/flymedllva/ydb-go-qb v0.0.0-20240108142018-7a30d57e17f1
github.com/go-errors/errors v1.5.1
github.com/go-logr/zerologr v1.2.3
github.com/go-sql-driver/mysql v1.7.1
Expand Down Expand Up @@ -71,6 +72,7 @@ require (
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a
github.com/ydb-platform/ydb-go-sdk-otel v0.4.6
github.com/ydb-platform/ydb-go-sdk-zerolog v0.14.0
github.com/ydb-platform/ydb-go-sdk/v3 v3.54.3
Expand Down Expand Up @@ -319,7 +321,6 @@ require (
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
github.com/xen0n/gosmopolitan v1.2.2 // indirect
github.com/yagipy/maintidx v1.0.0 // indirect
github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a // indirect
github.com/yeya24/promlinter v0.2.0 // indirect
github.com/ykadowak/zerologlint v0.1.3 // indirect
gitlab.com/bosi/decorder v0.4.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/firefart/nonamedreturns v1.0.4 h1:abzI1p7mAEPYuR4A+VLKn4eNDOycjYo2phmY9sfv40Y=
github.com/firefart/nonamedreturns v1.0.4/go.mod h1:TDhe/tjI1BXo48CmYbUduTV7BdIga8MAO/xbKdcVsGI=
github.com/flymedllva/ydb-go-qb v0.0.0-20240108142018-7a30d57e17f1 h1:BIQ+at2xPTQlGxnj/gUcPM05fZ3UR5UBwDVHFKryarE=
github.com/flymedllva/ydb-go-qb v0.0.0-20240108142018-7a30d57e17f1/go.mod h1:2r7JdzfiShqSH6/RsjIO9NmY8KT+SVfcHUP/0qucLYk=
github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY=
github.com/frankban/quicktest v1.14.4/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
Expand Down
56 changes: 52 additions & 4 deletions internal/datastore/ydb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type ydbConfig struct {
watchBufferLength uint16
watchBufferWriteTimeout time.Duration

followerReadDelay time.Duration
revisionQuantization time.Duration
maxRevisionStalenessPercent float64

Expand All @@ -23,24 +24,71 @@ type ydbConfig struct {
enablePrometheusStats bool
}

var defaultConfig = ydbConfig{
tablePathPrefix: "",
watchBufferLength: 0,
watchBufferWriteTimeout: 0,
followerReadDelay: 5 * time.Second,
revisionQuantization: 5 * time.Second,
maxRevisionStalenessPercent: 0.1,
gcWindow: 24 * time.Hour,
gcInterval: 0,
gcMaxOperationTime: 0,
maxRetries: 0,
gcEnabled: false,
enablePrometheusStats: false,
}

// Option provides the facility to configure how clients within the YDB
// datastore interact with the running YDB database.
type Option func(*ydbConfig)

func generateConfig(options []Option) ydbConfig {
var computed ydbConfig
func generateConfig(options []Option) *ydbConfig {
computed := defaultConfig
for _, option := range options {
option(&computed)
}

return computed
return &computed
}

// WithTablePathPrefix sets table prefix that will be implicitly added to all YDB queries.
// See https://ydb.tech/docs/en/yql/reference/syntax/pragma#table-path-prefix for details.
//
// Default is empty.
// DSN parameter takes precedence over this option.
// Non-empty DSN parameter takes precedence over this option.
func WithTablePathPrefix(prefix string) Option {
return func(o *ydbConfig) { o.tablePathPrefix = prefix }
}

// GCWindow is the maximum age of a passed revision that will be considered
// valid.
//
// This value defaults to 24 hours.
func GCWindow(window time.Duration) Option {
return func(o *ydbConfig) { o.gcWindow = window }
}

// RevisionQuantization is the time bucket size to which advertised revisions
// will be rounded.
//
// This value defaults to 5 seconds.
func RevisionQuantization(bucketSize time.Duration) Option {
return func(o *ydbConfig) { o.revisionQuantization = bucketSize }
}

// MaxRevisionStalenessPercent is the amount of time, expressed as a percentage of
// the revision quantization window, that a previously computed rounded revision
// can still be advertised after the next rounded revision would otherwise be ready.
//
// This value defaults to 0.1 (10%).
func MaxRevisionStalenessPercent(stalenessPercent float64) Option {
return func(o *ydbConfig) { o.maxRevisionStalenessPercent = stalenessPercent }
}

// FollowerReadDelay is the time delay to apply to enable historial reads.
//
// This value defaults to 5 seconds.
func FollowerReadDelay(delay time.Duration) Option {
return func(o *ydbConfig) { o.followerReadDelay = delay }
}
108 changes: 108 additions & 0 deletions internal/datastore/ydb/query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package ydb

import (
"context"
"fmt"

yq "github.com/flymedllva/ydb-go-qb/yqb"
"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/indexed"

"github.com/authzed/spicedb/internal/datastore/revisions"
)

const (
colCreatedAtUnixNano = "created_at_unix_nano"
colDeletedAtUnixNano = "deleted_at_unix_nano"

// namespace_config
tableNamespaceConfig = "namespace_config"
colSerializedConfig = "serialized_config"
colNamespace = "namespace"
)

var (
livingObjectModifier = queryModifier(func(builder yq.SelectBuilder) yq.SelectBuilder {
return builder.Where(yq.Eq{colDeletedAtUnixNano: nil})
})
)

type queryModifier func(yq.SelectBuilder) yq.SelectBuilder

func revisionedQueryModifier(revision revisions.TimestampRevision) queryModifier {
return func(builder yq.SelectBuilder) yq.SelectBuilder {
return builder.
Where(yq.LtOrEq{colCreatedAtUnixNano: revision.TimestampNanoSec()}).
Where(yq.Or{
yq.Eq{colDeletedAtUnixNano: nil},
yq.Gt{colDeletedAtUnixNano: revision.TimestampNanoSec()},
})
}
}

type queryExecutor interface {
Execute(
ctx context.Context,
query string,
params *table.QueryParameters,
opts ...options.ExecuteDataQueryOption,
) (result.Result, error)
}

// sessionQueryExecutor implements queryExecutor for YDB sessions
// with online read-only auto commit transaction mode.
type sessionQueryExecutor struct {
driver *ydb.Driver
}

func (se sessionQueryExecutor) Execute(
ctx context.Context,
query string,
params *table.QueryParameters,
opts ...options.ExecuteDataQueryOption,
) (result.Result, error) {
// todo check whether it is ok to close result outside of Do busy loop.
var res result.Result
err := se.driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
var err error
_, res, err = s.Execute(ctx, table.OnlineReadOnlyTxControl(), query, params, opts...)
return err
}, table.WithIdempotent())
return res, err
}

func queryRow(
ctx context.Context,
executor queryExecutor,
query string,
queryParams *table.QueryParameters,
values ...indexed.RequiredOrOptional,
) error {
res, err := executor.Execute(
ctx,
query,
queryParams,
)
if err != nil {
return err
}
defer res.Close()

if err := res.NextResultSetErr(ctx); err != nil {
return err
}
if !res.NextRow() {
return fmt.Errorf("no unique id rows")
}
if err := res.Scan(values...); err != nil {
return err
}
if err := res.Err(); err != nil {
return err
}

return nil
}
126 changes: 122 additions & 4 deletions internal/datastore/ydb/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,137 @@ import (
"context"
"fmt"

sq "github.com/Masterminds/squirrel"
yq "github.com/flymedllva/ydb-go-qb/yqb"
"github.com/ydb-platform/ydb-go-sdk/v3/table"

"github.com/authzed/spicedb/internal/datastore/revisions"
"github.com/authzed/spicedb/internal/datastore/ydb/common"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
)

type ydbReader struct {
tablePathPrefix string
executor queryExecutor
modifier queryModifier
}

func (r *ydbReader) ReadCaveatByName(ctx context.Context, name string) (caveat *core.CaveatDefinition, lastWritten datastore.Revision, err error) {
// TODO implement me
panic("implement me")
}

func (r *ydbReader) ListAllCaveats(ctx context.Context) ([]datastore.RevisionedCaveat, error) {
// TODO implement me
panic("implement me")
}

func (r *ydbReader) LookupCaveatsWithNames(ctx context.Context, names []string) ([]datastore.RevisionedCaveat, error) {
// TODO implement me
panic("implement me")
}

func (r *ydbReader) QueryRelationships(ctx context.Context, filter datastore.RelationshipsFilter, options ...options.QueryOptionsOption) (datastore.RelationshipIterator, error) {
// TODO implement me
panic("implement me")
}

func (r *ydbReader) ReverseQueryRelationships(ctx context.Context, subjectsFilter datastore.SubjectsFilter, options ...options.ReverseQueryOptionsOption) (datastore.RelationshipIterator, error) {
// TODO implement me
panic("implement me")
}

func (r *ydbReader) ReadNamespaceByName(
ctx context.Context,
nsName string,
) (*core.NamespaceDefinition, datastore.Revision, error) {
loaded, version, err := r.loadNamespace(ctx, nsName)
if err != nil {
return nil, datastore.NoRevision, fmt.Errorf("filed to read namespace: %w", err)
}
return loaded, version, nil
}

func (r *ydbReader) ListAllNamespaces(ctx context.Context) ([]datastore.RevisionedNamespace, error) {
nsDefsWithRevisions, err := loadAllNamespaces(ctx, r.tablePathPrefix, r.executor, r.modifier)
if err != nil {
return nil, fmt.Errorf("failed to list namespaces: %w", err)
}

return nsDefsWithRevisions, err
}

func (r *ydbReader) LookupNamespacesWithNames(
ctx context.Context,
nsNames []string,
) ([]datastore.RevisionedNamespace, error) {
if len(nsNames) == 0 {
return nil, nil
}

var clause yq.Or
for _, nsName := range nsNames {
clause = append(clause, yq.Eq{colNamespace: nsName})
}

nsDefsWithRevisions, err := loadAllNamespaces(
ctx,
r.tablePathPrefix,
r.executor,
func(builder yq.SelectBuilder) yq.SelectBuilder {
return r.modifier(builder).Where(clause)
},
)
if err != nil {
return nil, fmt.Errorf("failed to list namespaces with names: %w", err)
}

return nsDefsWithRevisions, err
}

var readNamespaceBuilder = yq.Select(colSerializedConfig, colCreatedAtUnixNano).From(tableNamespaceConfig)

func (r *ydbReader) loadNamespace(
ctx context.Context,
namespace string,
) (*core.NamespaceDefinition, revisions.TimestampRevision, error) {
ctx, span := tracer.Start(ctx, "loadNamespace")
defer span.End()

defs, err := loadAllNamespaces(
ctx,
r.tablePathPrefix,
r.executor,
func(builder yq.SelectBuilder) yq.SelectBuilder {
return r.modifier(builder).Where(sq.Eq{colNamespace: namespace})
},
)
if err != nil {
return nil, revisions.TimestampRevision(0), err
}

if len(defs) < 1 {
return nil, revisions.TimestampRevision(0), datastore.NewNamespaceNotFoundErr(namespace)
}

return defs[0].Definition, defs[0].LastWrittenRevision.(revisions.TimestampRevision), nil
}

func loadAllNamespaces(
ctx context.Context,
tx table.TransactionActor,
nsQuery string,
tablePathPrefix string,
executor queryExecutor,
modifier queryModifier,
) ([]datastore.RevisionedNamespace, error) {
res, err := tx.Execute(ctx, nsQuery, nil)
sql, args, err := modifier(readNamespaceBuilder).ToYdbSql()
if err != nil {
return nil, err
}

sql = common.AddTablePrefix(sql, tablePathPrefix)
res, err := executor.Execute(ctx, sql, table.NewQueryParameters(args...))
if err != nil {
return nil, err
}
Expand All @@ -33,7 +151,7 @@ func loadAllNamespaces(

var loaded core.NamespaceDefinition
if err := loaded.UnmarshalVT(config); err != nil {
return nil, fmt.Errorf("unable to read namespace config: %w", err)
return nil, fmt.Errorf("failed to read namespace config: %w", err)
}

revision := revisions.NewForTimestamp(createdAtUnixNano)
Expand Down
Loading

0 comments on commit 682711a

Please sign in to comment.