Skip to content

Commit

Permalink
Implement bulk load (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
sashayakovtseva committed Mar 7, 2024
1 parent 97cb9dd commit 3b5ea9d
Show file tree
Hide file tree
Showing 13 changed files with 161 additions and 23 deletions.
14 changes: 12 additions & 2 deletions internal/datastore/ydb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type ydbConfig struct {
gcInterval time.Duration
gcMaxOperationTime time.Duration

bulkLoadBatchSize int

// todo find a way to use it
maxRetries uint8

Expand All @@ -29,12 +31,13 @@ var defaultConfig = ydbConfig{
tablePathPrefix: "",
watchBufferLength: 0,
watchBufferWriteTimeout: 0,
followerReadDelay: 5 * time.Second,
followerReadDelay: 0 * time.Second,
revisionQuantization: 5 * time.Second,
maxRevisionStalenessPercent: 0.1,
gcWindow: 24 * time.Hour,
gcInterval: 3 * time.Minute,
gcMaxOperationTime: time.Minute,
bulkLoadBatchSize: 1000,
maxRetries: 0,
gcEnabled: false,
enablePrometheusStats: false,
Expand Down Expand Up @@ -103,7 +106,14 @@ func MaxRevisionStalenessPercent(stalenessPercent float64) Option {

// FollowerReadDelay is the time delay to apply to enable historical reads.
//
// This value defaults to 5 seconds.
// This value defaults to 0 seconds.
func FollowerReadDelay(delay time.Duration) Option {
return func(o *ydbConfig) { o.followerReadDelay = delay }
}

// BulkLoadBatchSize is the number of rows BulkLoad will process in a single batch.
//
// This value defaults to 1000.
func BulkLoadBatchSize(limit int) Option {
return func(o *ydbConfig) { o.bulkLoadBatchSize = limit }
}
58 changes: 55 additions & 3 deletions internal/datastore/ydb/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import (

type ydbReadWriter struct {
*ydbReader
newRevision revisions.TimestampRevision
bulkLoadBatchSize int
newRevision revisions.TimestampRevision
}

// WriteCaveats stores the provided caveats.
Expand Down Expand Up @@ -241,9 +242,60 @@ func (rw *ydbReadWriter) DeleteNamespaces(ctx context.Context, names ...string)
return nil
}

// BulkLoad takes a relationship source iterator, and writes all the
// relationships to the backing datastore in an optimized fashion. This
// method can and will omit checks and otherwise cut corners in the
// interest of performance, and should not be relied upon for OLTP-style
// workloads.
// For YDB this is not an effective way insert relationships.
// Recommended insert bulk size is < 100k per transaction. Under the hood this method
// works just like WriteRelationships with CREATE operation splitting input into a batches.
func (rw *ydbReadWriter) BulkLoad(ctx context.Context, iter datastore.BulkWriteRelationshipSource) (uint64, error) {
// TODO implement me
panic("implement me")
var (
insertionTuples []*core.RelationTuple
tpl *core.RelationTuple
err error
totalCount int
)

tpl, err = iter.Next(ctx)
for tpl != nil && err == nil {
insertionTuples = insertionTuples[:0]
insertBuilder := insertRelationsBuilder

for ; tpl != nil && err == nil && len(insertionTuples) < rw.bulkLoadBatchSize; tpl, err = iter.Next(ctx) {
// need to copy, see datastore.BulkWriteRelationshipSource docs
insertionTuples = append(insertionTuples, tpl.CloneVT())
insertBuilder, err = appendForInsertion(insertBuilder, tpl, rw.newRevision)
if err != nil {
return 0, fmt.Errorf("failed to append tuple for insertion: %w", err)
}
}
if err != nil {
return 0, fmt.Errorf("failed to read source: %w", err)
}

if len(insertionTuples) > 0 {
dups, err := rw.selectTuples(ctx, insertionTuples)
if err != nil {
return 0, fmt.Errorf("failed to ensure CREATE tuples uniqueness: %w", err)
}
if len(dups) > 0 {
return 0, datastoreCommon.NewCreateRelationshipExistsError(dups[0])
}

if err := executeQuery(ctx, rw.tablePathPrefix, rw.executor, insertBuilder); err != nil {
return 0, fmt.Errorf("failed to insert tuples: %w", err)
}
}

totalCount += len(insertionTuples)
}
if err != nil {
return 0, fmt.Errorf("failed to read source: %w", err)
}

return uint64(totalCount), nil
}

func (rw *ydbReadWriter) selectTuples(
Expand Down
5 changes: 3 additions & 2 deletions internal/datastore/ydb/ydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,9 @@ func (y *ydbDatastore) ReadWriteTx(
newRev = revisions.NewForTimestamp(now)

rw := &ydbReadWriter{
ydbReader: newYDBReader(y.config.tablePathPrefix, tx, livingObjectModifier, false),
newRevision: newRev,
ydbReader: newYDBReader(y.config.tablePathPrefix, tx, livingObjectModifier, false),
bulkLoadBatchSize: y.config.bulkLoadBatchSize,
newRevision: newRev,
}

return fn(ctx, rw)
Expand Down
11 changes: 6 additions & 5 deletions internal/testserver/datastore/ydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,27 +135,28 @@ func NewYDBEngineForTest(bridgeNetworkName string) (RunningEngineForTest, func()
}, cleanup, nil
}

func (r ydbTester) NewDatabase(t testing.TB) string {
func (y ydbTester) NewDatabase(t testing.TB) string {
// there's no easy way to create new database in a local YDB,
// so create a new directory instead.

uniquePortion, err := secrets.TokenHex(4)
require.NoError(t, err)

directory := fmt.Sprintf("/%s/%s", ydbDefaultDatabase, uniquePortion)
dsn := fmt.Sprintf("grpc://%s:%d/%s?table_path_prefix=%s", r.hostname, r.port, ydbDefaultDatabase, directory)
dsn := fmt.Sprintf("grpc://%s:%d/%s?table_path_prefix=%s", y.hostname, y.port, ydbDefaultDatabase, directory)

return dsn
}

func (r ydbTester) NewDatastore(t testing.TB, initFunc InitFunc) datastore.Datastore {
dsn := r.NewDatabase(t)
func (y ydbTester) NewDatastore(t testing.TB, initFunc InitFunc) datastore.Datastore {
dsn := y.NewDatabase(t)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
defer cancel()

migrationDriver, err := ydbMigrations.NewYDBDriver(ctx, dsn)
require.NoError(t, err)
t.Cleanup(func() { _ = migrationDriver.Close(ctx) })

err = ydbMigrations.YDBMigrations.Run(ctx, migrationDriver, migrate.Head, migrate.LiveRun)
require.NoError(t, err)
Expand Down
44 changes: 38 additions & 6 deletions pkg/datastore/test/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@ import (
"strconv"
"testing"

"github.com/samber/lo"
"github.com/stretchr/testify/require"

"github.com/authzed/spicedb/internal/testfixtures"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
)

func BulkUploadTest(t *testing.T, tester DatastoreTester) {
const ydbSelectLimit = 1000

testCases := []int{0, 1, 10, 100, 1_000, 10_000}

for _, tc := range testCases {
Expand All @@ -23,6 +27,7 @@ func BulkUploadTest(t *testing.T, tester DatastoreTester) {

rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1)
require.NoError(err)
t.Cleanup(func() { _ = rawDS.Close() })

ds, _ := testfixtures.StandardDatastoreWithSchema(rawDS, require)
bulkSource := testfixtures.NewBulkTupleGenerator(
Expand All @@ -46,13 +51,39 @@ func BulkUploadTest(t *testing.T, tester DatastoreTester) {
head, err := ds.HeadRevision(ctx)
require.NoError(err)

iter, err := ds.SnapshotReader(head).QueryRelationships(ctx, datastore.RelationshipsFilter{
ResourceType: testfixtures.DocumentNS.Name,
})
require.NoError(err)
defer iter.Close()
var (
after *core.RelationTuple
isLastCheck bool
)
for left := tc; !isLastCheck; {
if left == 0 {
isLastCheck = true
}

iter, err := ds.SnapshotReader(head).QueryRelationships(ctx, datastore.RelationshipsFilter{
ResourceType: testfixtures.DocumentNS.Name,
},
options.WithLimit(lo.ToPtr(uint64(ydbSelectLimit))),
options.WithSort(options.ByResource),
options.WithAfter(after),
)
require.NoError(err)

expect := ydbSelectLimit
if left < ydbSelectLimit {
expect = left
}

tRequire.VerifyIteratorCount(iter, expect)

if expect > 0 {
after, err = iter.Cursor()
require.NoError(err)
}

tRequire.VerifyIteratorCount(iter, tc)
iter.Close()
left -= expect
}
})
}
}
Expand All @@ -63,6 +94,7 @@ func BulkUploadErrorsTest(t *testing.T, tester DatastoreTester) {

rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1)
require.NoError(err)
t.Cleanup(func() { _ = rawDS.Close() })

ds, _ := testfixtures.StandardDatastoreWithSchema(rawDS, require)

Expand Down
6 changes: 6 additions & 0 deletions pkg/datastore/test/caveat.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func CaveatNotFoundTest(t *testing.T, tester DatastoreTester) {

ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1)
require.NoError(err)
t.Cleanup(func() { _ = ds.Close() })

ctx := context.Background()

Expand All @@ -43,6 +44,7 @@ func WriteReadDeleteCaveatTest(t *testing.T, tester DatastoreTester) {
req := require.New(t)
ds, err := tester.New(0*time.Second, veryLargeGCInterval, veryLargeGCWindow, 1)
req.NoError(err)
t.Cleanup(func() { _ = ds.Close() })

skipIfNotCaveatStorer(t, ds)

Expand Down Expand Up @@ -130,6 +132,7 @@ func WriteCaveatedRelationshipTest(t *testing.T, tester DatastoreTester) {
req := require.New(t)
ds, err := tester.New(0*time.Second, veryLargeGCInterval, veryLargeGCWindow, 1)
req.NoError(err)
t.Cleanup(func() { _ = ds.Close() })

skipIfNotCaveatStorer(t, ds)

Expand Down Expand Up @@ -203,6 +206,7 @@ func CaveatedRelationshipFilterTest(t *testing.T, tester DatastoreTester) {
req := require.New(t)
ds, err := tester.New(0*time.Second, veryLargeGCInterval, veryLargeGCWindow, 1)
req.NoError(err)
t.Cleanup(func() { _ = ds.Close() })

skipIfNotCaveatStorer(t, ds)

Expand Down Expand Up @@ -245,6 +249,7 @@ func CaveatSnapshotReadsTest(t *testing.T, tester DatastoreTester) {
req := require.New(t)
ds, err := tester.New(0*time.Second, veryLargeGCInterval, veryLargeGCWindow, 1)
req.NoError(err)
t.Cleanup(func() { _ = ds.Close() })

skipIfNotCaveatStorer(t, ds)

Expand Down Expand Up @@ -278,6 +283,7 @@ func CaveatedRelationshipWatchTest(t *testing.T, tester DatastoreTester) {
req := require.New(t)
ds, err := tester.New(0*time.Second, veryLargeGCInterval, veryLargeGCWindow, 16)
req.NoError(err)
t.Cleanup(func() { _ = ds.Close() })

skipIfNotCaveatStorer(t, ds)
ctx, cancel := context.WithCancel(context.Background())
Expand Down
6 changes: 6 additions & 0 deletions pkg/datastore/test/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func NamespaceNotFoundTest(t *testing.T, tester DatastoreTester) {

ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1)
require.NoError(err)
t.Cleanup(func() { _ = ds.Close() })

ctx := context.Background()

Expand All @@ -54,6 +55,7 @@ func NamespaceWriteTest(t *testing.T, tester DatastoreTester) {

ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1)
require.NoError(err)
t.Cleanup(func() { _ = ds.Close() })

ctx := context.Background()

Expand Down Expand Up @@ -145,6 +147,7 @@ func NamespaceDeleteTest(t *testing.T, tester DatastoreTester) {

rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1)
require.NoError(err)
t.Cleanup(func() { _ = rawDS.Close() })

ds, revision := testfixtures.StandardDatastoreWithData(rawDS, require)
ctx := context.Background()
Expand Down Expand Up @@ -193,6 +196,7 @@ func NamespaceDeleteTest(t *testing.T, tester DatastoreTester) {
func NamespaceMultiDeleteTest(t *testing.T, tester DatastoreTester) {
rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1)
require.NoError(t, err)
t.Cleanup(func() { _ = rawDS.Close() })

ds, revision := testfixtures.StandardDatastoreWithData(rawDS, require.New(t))
ctx := context.Background()
Expand Down Expand Up @@ -221,6 +225,7 @@ func EmptyNamespaceDeleteTest(t *testing.T, tester DatastoreTester) {

rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1)
require.NoError(err)
t.Cleanup(func() { _ = rawDS.Close() })

ds, revision := testfixtures.StandardDatastoreWithData(rawDS, require)
ctx := context.Background()
Expand Down Expand Up @@ -266,6 +271,7 @@ definition document {
// Write the namespace definition to the datastore.
ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1)
require.NoError(err)
t.Cleanup(func() { _ = ds.Close() })

ctx := context.Background()
updatedRevision, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error {
Expand Down
5 changes: 5 additions & 0 deletions pkg/datastore/test/pagination.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func OrderingTest(t *testing.T, tester DatastoreTester) {

rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1)
require.NoError(t, err)
t.Cleanup(func() { _ = rawDS.Close() })

ds, rev := testfixtures.StandardDatastoreWithData(rawDS, require.New(t))
tRequire := testfixtures.TupleChecker{Require: require.New(t), DS: ds}
Expand Down Expand Up @@ -107,6 +108,7 @@ func LimitTest(t *testing.T, tester DatastoreTester) {

rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1)
require.NoError(t, err)
t.Cleanup(func() { _ = rawDS.Close() })

ds, rev := testfixtures.StandardDatastoreWithData(rawDS, require.New(t))
tRequire := testfixtures.TupleChecker{Require: require.New(t), DS: ds}
Expand Down Expand Up @@ -202,6 +204,7 @@ var orderedTestCases = []struct {
func OrderedLimitTest(t *testing.T, tester DatastoreTester) {
rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1)
require.NoError(t, err)
t.Cleanup(func() { _ = rawDS.Close() })

ds, rev := testfixtures.StandardDatastoreWithData(rawDS, require.New(t))
tRequire := testfixtures.TupleChecker{Require: require.New(t), DS: ds}
Expand Down Expand Up @@ -255,6 +258,7 @@ func OrderedLimitTest(t *testing.T, tester DatastoreTester) {
func ResumeTest(t *testing.T, tester DatastoreTester) {
rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1)
require.NoError(t, err)
t.Cleanup(func() { _ = rawDS.Close() })

ds, rev := testfixtures.StandardDatastoreWithData(rawDS, require.New(t))
tRequire := testfixtures.TupleChecker{Require: require.New(t), DS: ds}
Expand Down Expand Up @@ -325,6 +329,7 @@ func CursorErrorsTest(t *testing.T, tester DatastoreTester) {

rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1)
require.NoError(t, err)
t.Cleanup(func() { _ = rawDS.Close() })

ds, rev := testfixtures.StandardDatastoreWithData(rawDS, require.New(t))
ctx := context.Background()
Expand Down
Loading

0 comments on commit 3b5ea9d

Please sign in to comment.