Skip to content

Commit 4c99d8f

Browse files
Insert relation tuples AS_TABLE, hack watch, add session options (#18)
1 parent adb2a28 commit 4c99d8f

File tree

8 files changed

+510
-279
lines changed

8 files changed

+510
-279
lines changed

internal/datastore/benchmark/driver_bench_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
//go:build ci && docker
2-
// +build ci,docker
32

43
package benchmark
54

@@ -18,6 +17,7 @@ import (
1817
"github.com/authzed/spicedb/internal/datastore/mysql"
1918
"github.com/authzed/spicedb/internal/datastore/postgres"
2019
"github.com/authzed/spicedb/internal/datastore/spanner"
20+
"github.com/authzed/spicedb/internal/datastore/ydb"
2121
"github.com/authzed/spicedb/internal/testfixtures"
2222
testdatastore "github.com/authzed/spicedb/internal/testserver/datastore"
2323
"github.com/authzed/spicedb/internal/testserver/datastore/config"
@@ -47,6 +47,8 @@ var drivers = []struct {
4747
{postgres.Engine, "", nil},
4848
{crdb.Engine, "-overlap-static", []dsconfig.ConfigOption{dsconfig.WithOverlapStrategy("static")}},
4949
{crdb.Engine, "-overlap-insecure", []dsconfig.ConfigOption{dsconfig.WithOverlapStrategy("insecure")}},
50+
{ydb.Engine, "-uniqueness-check", []dsconfig.ConfigOption{dsconfig.WithYDBEnableUniquenessCheck(true)}},
51+
{ydb.Engine, "-no-uniqueness-check", []dsconfig.ConfigOption{dsconfig.WithYDBEnableUniquenessCheck(false)}},
5052
{mysql.Engine, "", nil},
5153
}
5254

internal/datastore/ydb/options.go

+48-7
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,35 @@ type ydbConfig struct {
1919
gcInterval time.Duration
2020
gcMaxOperationTime time.Duration
2121

22+
sessionCountLimit int
23+
sessionKeepaliveTimeout time.Duration
24+
sessionIdleThreshold time.Duration
25+
2226
bulkLoadBatchSize int
2327

24-
gcEnabled bool
28+
enableGC bool
2529
enablePrometheusStats bool
30+
enableUniquenessCheck bool
2631
}
2732

2833
var defaultConfig = ydbConfig{
34+
tablePathPrefix: "",
35+
certificatePath: "",
2936
watchBufferLength: 128,
3037
watchBufferWriteTimeout: time.Second,
38+
followerReadDelay: 0,
3139
revisionQuantization: 5 * time.Second,
3240
maxRevisionStalenessPercent: 0.1,
3341
gcWindow: 24 * time.Hour,
3442
gcInterval: 3 * time.Minute,
3543
gcMaxOperationTime: time.Minute,
44+
sessionCountLimit: 50,
45+
sessionKeepaliveTimeout: 10 * time.Second,
46+
sessionIdleThreshold: 60 * time.Second,
3647
bulkLoadBatchSize: 1000,
37-
gcEnabled: true,
48+
enableGC: true,
49+
enablePrometheusStats: false,
50+
enableUniquenessCheck: true,
3851
}
3952

4053
// Option provides the facility to configure how clients within the YDB
@@ -65,6 +78,27 @@ func WithCertificatePath(path string) Option {
6578
return func(o *ydbConfig) { o.certificatePath = path }
6679
}
6780

81+
// WithSessionCountLimit sets the maximum size of internal session pool in table.Client.
82+
//
83+
// This value defaults to 50.
84+
func WithSessionCountLimit(in int) Option {
85+
return func(o *ydbConfig) { o.sessionCountLimit = in }
86+
}
87+
88+
// WithSessionKeepaliveTimeout sets timeout of keep alive requests for session in table.Client.
89+
//
90+
// This value defaults to 10 seconds.
91+
func WithSessionKeepaliveTimeout(in time.Duration) Option {
92+
return func(o *ydbConfig) { o.sessionKeepaliveTimeout = in }
93+
}
94+
95+
// WithSessionIdleThreshold defines idle session lifetime threshold.
96+
//
97+
// This value defaults to 60 seconds.
98+
func WithSessionIdleThreshold(in time.Duration) Option {
99+
return func(o *ydbConfig) { o.sessionIdleThreshold = in }
100+
}
101+
68102
// GCWindow is the maximum age of a passed revision that will be considered
69103
// valid.
70104
//
@@ -80,11 +114,11 @@ func GCInterval(interval time.Duration) Option {
80114
return func(o *ydbConfig) { o.gcInterval = interval }
81115
}
82116

83-
// GCEnabled indicates whether garbage collection is enabled.
117+
// WithEnableGC indicates whether garbage collection is enabled.
84118
//
85119
// GC is enabled by default.
86-
func GCEnabled(isGCEnabled bool) Option {
87-
return func(o *ydbConfig) { o.gcEnabled = isGCEnabled }
120+
func WithEnableGC(isGCEnabled bool) Option {
121+
return func(o *ydbConfig) { o.enableGC = isGCEnabled }
88122
}
89123

90124
// GCMaxOperationTime is the maximum operation time of a garbage collection pass before it times out.
@@ -143,6 +177,13 @@ func WatchBufferWriteTimeout(watchBufferWriteTimeout time.Duration) Option {
143177
// clients being used by the datastore are enabled.
144178
//
145179
// Prometheus metrics are disabled by default.
146-
func WithEnablePrometheusStats(enablePrometheusStats bool) Option {
147-
return func(o *ydbConfig) { o.enablePrometheusStats = enablePrometheusStats }
180+
func WithEnablePrometheusStats(v bool) Option {
181+
return func(o *ydbConfig) { o.enablePrometheusStats = v }
182+
}
183+
184+
// WithEnableUniquenessCheck marks whether relation tuples will be checked against
185+
// unique index during CREATE operation. YDB doesn't support unique secondary indexes,
186+
// and since this check is quite expensive one may turn it off.
187+
func WithEnableUniquenessCheck(v bool) Option {
188+
return func(o *ydbConfig) { o.enableUniquenessCheck = v }
148189
}

internal/datastore/ydb/query.go

+37-1
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,42 @@ var (
100100
colCaveatContext,
101101
colCreatedAtUnixNano,
102102
)
103+
104+
insertAsTableRelationsYQL, _ = yq.Insert(tableRelationTuple).Select(
105+
yq.Select(
106+
colNamespace,
107+
colObjectID,
108+
colRelation,
109+
colUsersetNamespace,
110+
colUsersetObjectID,
111+
colUsersetRelation,
112+
colCaveatName,
113+
colCaveatContext,
114+
colCreatedAtUnixNano,
115+
).From("AS_TABLE($values)"),
116+
).MustSql()
117+
118+
readLivingRelationYQL, _ = yq.Select(
119+
colNamespace,
120+
colObjectID,
121+
colRelation,
122+
colUsersetNamespace,
123+
colUsersetObjectID,
124+
colUsersetRelation,
125+
colCaveatName,
126+
colCaveatContext,
127+
).From(tableRelationTuple).
128+
View(ixUqRelationLiving).
129+
Where(livingObjectPredicate).
130+
Where(fmt.Sprintf("(%s, %s, %s, %s, %s, %s) IN $values",
131+
colNamespace,
132+
colObjectID,
133+
colRelation,
134+
colUsersetNamespace,
135+
colUsersetObjectID,
136+
colUsersetRelation,
137+
)).
138+
MustSql()
103139
)
104140

105141
type queryModifier func(sq.SelectBuilder) sq.SelectBuilder
@@ -267,7 +303,7 @@ func queryTuples(
267303
var structuredCtx map[string]any
268304
if caveatCtx != nil {
269305
if err := json.Unmarshal(*caveatCtx, &structuredCtx); err != nil {
270-
return nil, fmt.Errorf("failed to unmarhsla relation tuple caveat context: %w", err)
306+
return nil, fmt.Errorf("failed to unmarshal relation tuple caveat context: %w", err)
271307
}
272308
}
273309

internal/datastore/ydb/readwrite.go

+75-39
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ var _ datastore.ReadWriteTransaction = (*ydbReadWriter)(nil)
2727

2828
type ydbReadWriter struct {
2929
*ydbReader
30-
bulkLoadBatchSize int
31-
newRevision revisions.TimestampRevision
30+
bulkLoadBatchSize int
31+
newRevision revisions.TimestampRevision
32+
enableUniquenessCheck bool
3233
}
3334

3435
// WriteCaveats stores the provided caveats.
@@ -62,11 +63,9 @@ func (rw *ydbReadWriter) DeleteCaveats(ctx context.Context, names []string) erro
6263

6364
// WriteRelationships takes a list of tuple mutations and applies them to the datastore.
6465
func (rw *ydbReadWriter) WriteRelationships(ctx context.Context, mutations []*core.RelationTupleUpdate) error {
65-
insertBuilder := insertRelationsBuilder
6666
deleteBuilder := deleteRelationBuilder
6767

6868
var (
69-
err error
7069
insertionTuples []*core.RelationTuple
7170
touchTuples []*core.RelationTuple
7271

@@ -80,10 +79,6 @@ func (rw *ydbReadWriter) WriteRelationships(ctx context.Context, mutations []*co
8079
switch mut.GetOperation() {
8180
case core.RelationTupleUpdate_CREATE:
8281
insertionTuples = append(insertionTuples, tpl)
83-
insertBuilder, err = appendForInsertion(insertBuilder, tpl, rw.newRevision)
84-
if err != nil {
85-
return fmt.Errorf("failed to append tuple for insertion: %w", err)
86-
}
8782

8883
case core.RelationTupleUpdate_TOUCH:
8984
touchTuples = append(touchTuples, tpl)
@@ -97,7 +92,7 @@ func (rw *ydbReadWriter) WriteRelationships(ctx context.Context, mutations []*co
9792
}
9893

9994
// Perform SELECT queries first as a part of uniqueness check.
100-
if len(insertionTuples) > 0 {
95+
if len(insertionTuples) > 0 && rw.enableUniquenessCheck {
10196
dups, err := rw.selectTuples(ctx, insertionTuples)
10297
if err != nil {
10398
return fmt.Errorf("failed to ensure CREATE tuples uniqueness: %w", err)
@@ -121,10 +116,6 @@ func (rw *ydbReadWriter) WriteRelationships(ctx context.Context, mutations []*co
121116
pk := tuple.StringWithoutCaveat(touchTuples[i])
122117
dup, ok := duplicatePKToRel[pk]
123118
if !ok {
124-
insertBuilder, err = appendForInsertion(insertBuilder, touchTuples[i], rw.newRevision)
125-
if err != nil {
126-
return fmt.Errorf("failed to append tuple for insertion: %w", err)
127-
}
128119
i++
129120
continue
130121
}
@@ -136,10 +127,6 @@ func (rw *ydbReadWriter) WriteRelationships(ctx context.Context, mutations []*co
136127
}
137128

138129
deleteClauses = append(deleteClauses, exactRelationshipClause(dup))
139-
insertBuilder, err = appendForInsertion(insertBuilder, touchTuples[i], rw.newRevision)
140-
if err != nil {
141-
return fmt.Errorf("failed to append tuple for insertion: %w", err)
142-
}
143130
i++
144131
}
145132
}
@@ -160,9 +147,18 @@ func (rw *ydbReadWriter) WriteRelationships(ctx context.Context, mutations []*co
160147

161148
// Run CREATE and TOUCH insertions, if any.
162149
if len(insertionTuples) > 0 || len(touchTuples) > 0 {
163-
if err := executeQuery(ctx, rw.tablePathPrefix, rw.executor, insertBuilder); err != nil {
150+
in := append(insertionTuples, touchTuples...)
151+
query := ydbCommon.AddTablePrefix(insertAsTableRelationsYQL, rw.tablePathPrefix)
152+
153+
res, err := rw.executor.Execute(
154+
ctx,
155+
query,
156+
table.NewQueryParameters(table.ValueParam("$values", relationsToListValue(in, rw.newRevision))),
157+
)
158+
if err != nil {
164159
return fmt.Errorf("failed to insert tuples: %w", err)
165160
}
161+
defer res.Close()
166162
}
167163

168164
return nil
@@ -289,12 +285,14 @@ func (rw *ydbReadWriter) BulkLoad(ctx context.Context, iter datastore.BulkWriteR
289285
}
290286

291287
if len(insertionTuples) > 0 {
292-
dups, err := rw.selectTuples(ctx, insertionTuples)
293-
if err != nil {
294-
return 0, fmt.Errorf("failed to ensure CREATE tuples uniqueness: %w", err)
295-
}
296-
if len(dups) > 0 {
297-
return 0, datastoreCommon.NewCreateRelationshipExistsError(dups[0])
288+
if rw.enableUniquenessCheck {
289+
dups, err := rw.selectTuples(ctx, insertionTuples)
290+
if err != nil {
291+
return 0, fmt.Errorf("failed to ensure CREATE tuples uniqueness: %w", err)
292+
}
293+
if len(dups) > 0 {
294+
return 0, datastoreCommon.NewCreateRelationshipExistsError(dups[0])
295+
}
298296
}
299297

300298
if err := executeQuery(ctx, rw.tablePathPrefix, rw.executor, insertBuilder); err != nil {
@@ -311,28 +309,22 @@ func (rw *ydbReadWriter) BulkLoad(ctx context.Context, iter datastore.BulkWriteR
311309
return uint64(totalCount), nil
312310
}
313311

314-
func (rw *ydbReadWriter) selectTuples(
315-
ctx context.Context,
316-
in []*core.RelationTuple,
317-
) ([]*core.RelationTuple, error) {
312+
func (rw *ydbReadWriter) selectTuples(ctx context.Context, in []*core.RelationTuple) ([]*core.RelationTuple, error) {
318313
ctx, span := tracer.Start(ctx, "selectTuples", trace.WithAttributes(attribute.Int("count", len(in))))
319314
defer span.End()
320315

321316
if len(in) == 0 {
322317
return nil, nil
323318
}
324319

325-
var pred sq.Or
326-
for _, r := range in {
327-
pred = append(pred, exactRelationshipClause(r))
328-
}
329-
330-
sql, args, err := rw.modifier(readRelationBuilder).View(ixUqRelationLiving).Where(pred).ToYQL()
331-
if err != nil {
332-
return nil, fmt.Errorf("failed to build query: %w", err)
333-
}
334-
335-
return queryTuples(ctx, rw.tablePathPrefix, sql, table.NewQueryParameters(args...), span, rw.executor)
320+
return queryTuples(
321+
ctx,
322+
rw.tablePathPrefix,
323+
readLivingRelationYQL,
324+
table.NewQueryParameters(table.ValueParam("$values", relationsToSelectListValueParam(in))),
325+
span,
326+
rw.executor,
327+
)
336328
}
337329

338330
type coreDefinition interface {
@@ -441,6 +433,50 @@ func appendForInsertion(
441433
return b.Values(valuesToWrite...), nil
442434
}
443435

436+
func relationsToListValue(in []*core.RelationTuple, rev revisions.TimestampRevision) types.Value {
437+
out := make([]types.Value, len(in))
438+
for i := range in {
439+
var caveatName *string
440+
var caveatContext *[]byte
441+
if in[i].GetCaveat() != nil {
442+
caveatName = &in[i].GetCaveat().CaveatName
443+
cc, err := in[i].GetCaveat().GetContext().MarshalJSON()
444+
if err != nil {
445+
panic(err)
446+
}
447+
caveatContext = &cc
448+
}
449+
450+
out[i] = types.StructValue(
451+
types.StructFieldValue(colNamespace, types.UTF8Value(in[i].GetResourceAndRelation().GetNamespace())),
452+
types.StructFieldValue(colObjectID, types.UTF8Value(in[i].GetResourceAndRelation().GetObjectId())),
453+
types.StructFieldValue(colRelation, types.UTF8Value(in[i].GetResourceAndRelation().GetRelation())),
454+
types.StructFieldValue(colUsersetNamespace, types.UTF8Value(in[i].GetSubject().GetNamespace())),
455+
types.StructFieldValue(colUsersetObjectID, types.UTF8Value(in[i].GetSubject().GetObjectId())),
456+
types.StructFieldValue(colUsersetRelation, types.UTF8Value(in[i].GetSubject().GetRelation())),
457+
types.StructFieldValue(colCaveatName, types.NullableUTF8Value(caveatName)),
458+
types.StructFieldValue(colCaveatContext, types.NullableJSONDocumentValueFromBytes(caveatContext)),
459+
types.StructFieldValue(colCreatedAtUnixNano, types.Int64Value(rev.TimestampNanoSec())),
460+
)
461+
}
462+
return types.ListValue(out...)
463+
}
464+
465+
func relationsToSelectListValueParam(in []*core.RelationTuple) types.Value {
466+
out := make([]types.Value, len(in))
467+
for i := range in {
468+
out[i] = types.TupleValue(
469+
types.UTF8Value(in[i].GetResourceAndRelation().GetNamespace()),
470+
types.UTF8Value(in[i].GetResourceAndRelation().GetObjectId()),
471+
types.UTF8Value(in[i].GetResourceAndRelation().GetRelation()),
472+
types.UTF8Value(in[i].GetSubject().GetNamespace()),
473+
types.UTF8Value(in[i].GetSubject().GetObjectId()),
474+
types.UTF8Value(in[i].GetSubject().GetRelation()),
475+
)
476+
}
477+
return types.ListValue(out...)
478+
}
479+
444480
func exactRelationshipClause(r *core.RelationTuple) sq.Eq {
445481
return sq.Eq{
446482
colNamespace: r.GetResourceAndRelation().GetNamespace(),

0 commit comments

Comments
 (0)