From 05c28cd699f59507c947094e4f84784a9f7a9437 Mon Sep 17 00:00:00 2001 From: Sasha Yakovtseva Date: Fri, 1 Mar 2024 09:28:16 +0200 Subject: [PATCH] Implement bulk load (#11) --- internal/datastore/ydb/options.go | 14 ++++++- internal/datastore/ydb/readwrite.go | 58 ++++++++++++++++++++++++++-- internal/datastore/ydb/ydb.go | 5 ++- internal/testserver/datastore/ydb.go | 11 +++--- pkg/datastore/test/bulk.go | 44 ++++++++++++++++++--- pkg/datastore/test/caveat.go | 6 +++ pkg/datastore/test/namespace.go | 6 +++ pkg/datastore/test/pagination.go | 5 +++ pkg/datastore/test/revisions.go | 8 +++- pkg/datastore/test/stats.go | 1 + pkg/datastore/test/transactions.go | 1 + pkg/datastore/test/tuples.go | 18 +++++++-- pkg/datastore/test/watch.go | 7 ++++ 13 files changed, 161 insertions(+), 23 deletions(-) diff --git a/internal/datastore/ydb/options.go b/internal/datastore/ydb/options.go index 59baf6fba6..2affdfedab 100644 --- a/internal/datastore/ydb/options.go +++ b/internal/datastore/ydb/options.go @@ -18,6 +18,8 @@ type ydbConfig struct { gcInterval time.Duration gcMaxOperationTime time.Duration + bulkLoadBatchSize int + // todo find a way to use it maxRetries uint8 @@ -29,12 +31,13 @@ var defaultConfig = ydbConfig{ tablePathPrefix: "", watchBufferLength: 0, watchBufferWriteTimeout: 0, - followerReadDelay: 5 * time.Second, + followerReadDelay: 0 * time.Second, revisionQuantization: 5 * time.Second, maxRevisionStalenessPercent: 0.1, gcWindow: 24 * time.Hour, gcInterval: 3 * time.Minute, gcMaxOperationTime: time.Minute, + bulkLoadBatchSize: 1000, maxRetries: 0, gcEnabled: false, enablePrometheusStats: false, @@ -103,7 +106,14 @@ func MaxRevisionStalenessPercent(stalenessPercent float64) Option { // FollowerReadDelay is the time delay to apply to enable historical reads. // -// This value defaults to 5 seconds. +// This value defaults to 0 seconds. func FollowerReadDelay(delay time.Duration) Option { return func(o *ydbConfig) { o.followerReadDelay = delay } } + +// BulkLoadBatchSize is the number of rows BulkLoad will process in a single batch. +// +// This value defaults to 1000. +func BulkLoadBatchSize(limit int) Option { + return func(o *ydbConfig) { o.bulkLoadBatchSize = limit } +} diff --git a/internal/datastore/ydb/readwrite.go b/internal/datastore/ydb/readwrite.go index 4936511410..4150befe7f 100644 --- a/internal/datastore/ydb/readwrite.go +++ b/internal/datastore/ydb/readwrite.go @@ -23,7 +23,8 @@ import ( type ydbReadWriter struct { *ydbReader - newRevision revisions.TimestampRevision + bulkLoadBatchSize int + newRevision revisions.TimestampRevision } // WriteCaveats stores the provided caveats. @@ -241,9 +242,60 @@ func (rw *ydbReadWriter) DeleteNamespaces(ctx context.Context, names ...string) return nil } +// BulkLoad takes a relationship source iterator, and writes all the +// relationships to the backing datastore in an optimized fashion. This +// method can and will omit checks and otherwise cut corners in the +// interest of performance, and should not be relied upon for OLTP-style +// workloads. +// For YDB this is not an effective way insert relationships. +// Recommended insert bulk size is < 100k per transaction. Under the hood this method +// works just like WriteRelationships with CREATE operation splitting input into a batches. func (rw *ydbReadWriter) BulkLoad(ctx context.Context, iter datastore.BulkWriteRelationshipSource) (uint64, error) { - // TODO implement me - panic("implement me") + var ( + insertionTuples []*core.RelationTuple + tpl *core.RelationTuple + err error + totalCount int + ) + + tpl, err = iter.Next(ctx) + for tpl != nil && err == nil { + insertionTuples = insertionTuples[:0] + insertBuilder := insertRelationsBuilder + + for ; tpl != nil && err == nil && len(insertionTuples) < rw.bulkLoadBatchSize; tpl, err = iter.Next(ctx) { + // need to copy, see datastore.BulkWriteRelationshipSource docs + insertionTuples = append(insertionTuples, tpl.CloneVT()) + insertBuilder, err = appendForInsertion(insertBuilder, tpl, rw.newRevision) + if err != nil { + return 0, fmt.Errorf("failed to append tuple for insertion: %w", err) + } + } + if err != nil { + return 0, fmt.Errorf("failed to read source: %w", err) + } + + if len(insertionTuples) > 0 { + dups, err := rw.selectTuples(ctx, insertionTuples) + if err != nil { + return 0, fmt.Errorf("failed to ensure CREATE tuples uniqueness: %w", err) + } + if len(dups) > 0 { + return 0, datastoreCommon.NewCreateRelationshipExistsError(dups[0]) + } + + if err := executeQuery(ctx, rw.tablePathPrefix, rw.executor, insertBuilder); err != nil { + return 0, fmt.Errorf("failed to insert tuples: %w", err) + } + } + + totalCount += len(insertionTuples) + } + if err != nil { + return 0, fmt.Errorf("failed to read source: %w", err) + } + + return uint64(totalCount), nil } func (rw *ydbReadWriter) selectTuples( diff --git a/internal/datastore/ydb/ydb.go b/internal/datastore/ydb/ydb.go index 28617a406b..bf011198bc 100644 --- a/internal/datastore/ydb/ydb.go +++ b/internal/datastore/ydb/ydb.go @@ -183,8 +183,9 @@ func (y *ydbDatastore) ReadWriteTx( newRev = revisions.NewForTimestamp(now) rw := &ydbReadWriter{ - ydbReader: newYDBReader(y.config.tablePathPrefix, tx, livingObjectModifier, false), - newRevision: newRev, + ydbReader: newYDBReader(y.config.tablePathPrefix, tx, livingObjectModifier, false), + bulkLoadBatchSize: y.config.bulkLoadBatchSize, + newRevision: newRev, } return fn(ctx, rw) diff --git a/internal/testserver/datastore/ydb.go b/internal/testserver/datastore/ydb.go index b67f40f597..f6c2db0b93 100644 --- a/internal/testserver/datastore/ydb.go +++ b/internal/testserver/datastore/ydb.go @@ -135,7 +135,7 @@ func NewYDBEngineForTest(bridgeNetworkName string) (RunningEngineForTest, func() }, cleanup, nil } -func (r ydbTester) NewDatabase(t testing.TB) string { +func (y ydbTester) NewDatabase(t testing.TB) string { // there's no easy way to create new database in a local YDB, // so create a new directory instead. @@ -143,19 +143,20 @@ func (r ydbTester) NewDatabase(t testing.TB) string { require.NoError(t, err) directory := fmt.Sprintf("/%s/%s", ydbDefaultDatabase, uniquePortion) - dsn := fmt.Sprintf("grpc://%s:%d/%s?table_path_prefix=%s", r.hostname, r.port, ydbDefaultDatabase, directory) + dsn := fmt.Sprintf("grpc://%s:%d/%s?table_path_prefix=%s", y.hostname, y.port, ydbDefaultDatabase, directory) return dsn } -func (r ydbTester) NewDatastore(t testing.TB, initFunc InitFunc) datastore.Datastore { - dsn := r.NewDatabase(t) +func (y ydbTester) NewDatastore(t testing.TB, initFunc InitFunc) datastore.Datastore { + dsn := y.NewDatabase(t) - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) defer cancel() migrationDriver, err := ydbMigrations.NewYDBDriver(ctx, dsn) require.NoError(t, err) + t.Cleanup(func() { _ = migrationDriver.Close(ctx) }) err = ydbMigrations.YDBMigrations.Run(ctx, migrationDriver, migrate.Head, migrate.LiveRun) require.NoError(t, err) diff --git a/pkg/datastore/test/bulk.go b/pkg/datastore/test/bulk.go index 0424805ace..8c15833e98 100644 --- a/pkg/datastore/test/bulk.go +++ b/pkg/datastore/test/bulk.go @@ -6,14 +6,18 @@ import ( "strconv" "testing" + "github.com/samber/lo" "github.com/stretchr/testify/require" "github.com/authzed/spicedb/internal/testfixtures" "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/datastore/options" core "github.com/authzed/spicedb/pkg/proto/core/v1" ) func BulkUploadTest(t *testing.T, tester DatastoreTester) { + const ydbSelectLimit = 1000 + testCases := []int{0, 1, 10, 100, 1_000, 10_000} for _, tc := range testCases { @@ -23,6 +27,7 @@ func BulkUploadTest(t *testing.T, tester DatastoreTester) { rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) + t.Cleanup(func() { _ = rawDS.Close() }) ds, _ := testfixtures.StandardDatastoreWithSchema(rawDS, require) bulkSource := testfixtures.NewBulkTupleGenerator( @@ -46,13 +51,39 @@ func BulkUploadTest(t *testing.T, tester DatastoreTester) { head, err := ds.HeadRevision(ctx) require.NoError(err) - iter, err := ds.SnapshotReader(head).QueryRelationships(ctx, datastore.RelationshipsFilter{ - ResourceType: testfixtures.DocumentNS.Name, - }) - require.NoError(err) - defer iter.Close() + var ( + after *core.RelationTuple + isLastCheck bool + ) + for left := tc; !isLastCheck; { + if left == 0 { + isLastCheck = true + } + + iter, err := ds.SnapshotReader(head).QueryRelationships(ctx, datastore.RelationshipsFilter{ + ResourceType: testfixtures.DocumentNS.Name, + }, + options.WithLimit(lo.ToPtr(uint64(ydbSelectLimit))), + options.WithSort(options.ByResource), + options.WithAfter(after), + ) + require.NoError(err) + + expect := ydbSelectLimit + if left < ydbSelectLimit { + expect = left + } + + tRequire.VerifyIteratorCount(iter, expect) + + if expect > 0 { + after, err = iter.Cursor() + require.NoError(err) + } - tRequire.VerifyIteratorCount(iter, tc) + iter.Close() + left -= expect + } }) } } @@ -63,6 +94,7 @@ func BulkUploadErrorsTest(t *testing.T, tester DatastoreTester) { rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) + t.Cleanup(func() { _ = rawDS.Close() }) ds, _ := testfixtures.StandardDatastoreWithSchema(rawDS, require) diff --git a/pkg/datastore/test/caveat.go b/pkg/datastore/test/caveat.go index c50af406a6..c6e461d0ec 100644 --- a/pkg/datastore/test/caveat.go +++ b/pkg/datastore/test/caveat.go @@ -29,6 +29,7 @@ func CaveatNotFoundTest(t *testing.T, tester DatastoreTester) { ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) + t.Cleanup(func() { _ = ds.Close() }) ctx := context.Background() @@ -43,6 +44,7 @@ func WriteReadDeleteCaveatTest(t *testing.T, tester DatastoreTester) { req := require.New(t) ds, err := tester.New(0*time.Second, veryLargeGCInterval, veryLargeGCWindow, 1) req.NoError(err) + t.Cleanup(func() { _ = ds.Close() }) skipIfNotCaveatStorer(t, ds) @@ -130,6 +132,7 @@ func WriteCaveatedRelationshipTest(t *testing.T, tester DatastoreTester) { req := require.New(t) ds, err := tester.New(0*time.Second, veryLargeGCInterval, veryLargeGCWindow, 1) req.NoError(err) + t.Cleanup(func() { _ = ds.Close() }) skipIfNotCaveatStorer(t, ds) @@ -203,6 +206,7 @@ func CaveatedRelationshipFilterTest(t *testing.T, tester DatastoreTester) { req := require.New(t) ds, err := tester.New(0*time.Second, veryLargeGCInterval, veryLargeGCWindow, 1) req.NoError(err) + t.Cleanup(func() { _ = ds.Close() }) skipIfNotCaveatStorer(t, ds) @@ -245,6 +249,7 @@ func CaveatSnapshotReadsTest(t *testing.T, tester DatastoreTester) { req := require.New(t) ds, err := tester.New(0*time.Second, veryLargeGCInterval, veryLargeGCWindow, 1) req.NoError(err) + t.Cleanup(func() { _ = ds.Close() }) skipIfNotCaveatStorer(t, ds) @@ -278,6 +283,7 @@ func CaveatedRelationshipWatchTest(t *testing.T, tester DatastoreTester) { req := require.New(t) ds, err := tester.New(0*time.Second, veryLargeGCInterval, veryLargeGCWindow, 16) req.NoError(err) + t.Cleanup(func() { _ = ds.Close() }) skipIfNotCaveatStorer(t, ds) ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/datastore/test/namespace.go b/pkg/datastore/test/namespace.go index b3daacfe31..c3e6eb13fb 100644 --- a/pkg/datastore/test/namespace.go +++ b/pkg/datastore/test/namespace.go @@ -37,6 +37,7 @@ func NamespaceNotFoundTest(t *testing.T, tester DatastoreTester) { ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) + t.Cleanup(func() { _ = ds.Close() }) ctx := context.Background() @@ -54,6 +55,7 @@ func NamespaceWriteTest(t *testing.T, tester DatastoreTester) { ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) + t.Cleanup(func() { _ = ds.Close() }) ctx := context.Background() @@ -145,6 +147,7 @@ func NamespaceDeleteTest(t *testing.T, tester DatastoreTester) { rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) + t.Cleanup(func() { _ = rawDS.Close() }) ds, revision := testfixtures.StandardDatastoreWithData(rawDS, require) ctx := context.Background() @@ -193,6 +196,7 @@ func NamespaceDeleteTest(t *testing.T, tester DatastoreTester) { func NamespaceMultiDeleteTest(t *testing.T, tester DatastoreTester) { rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(t, err) + t.Cleanup(func() { _ = rawDS.Close() }) ds, revision := testfixtures.StandardDatastoreWithData(rawDS, require.New(t)) ctx := context.Background() @@ -221,6 +225,7 @@ func EmptyNamespaceDeleteTest(t *testing.T, tester DatastoreTester) { rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) + t.Cleanup(func() { _ = rawDS.Close() }) ds, revision := testfixtures.StandardDatastoreWithData(rawDS, require) ctx := context.Background() @@ -266,6 +271,7 @@ definition document { // Write the namespace definition to the datastore. ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) + t.Cleanup(func() { _ = ds.Close() }) ctx := context.Background() updatedRevision, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { diff --git a/pkg/datastore/test/pagination.go b/pkg/datastore/test/pagination.go index 5ddf4b4912..80f99b16fe 100644 --- a/pkg/datastore/test/pagination.go +++ b/pkg/datastore/test/pagination.go @@ -32,6 +32,7 @@ func OrderingTest(t *testing.T, tester DatastoreTester) { rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(t, err) + t.Cleanup(func() { _ = rawDS.Close() }) ds, rev := testfixtures.StandardDatastoreWithData(rawDS, require.New(t)) tRequire := testfixtures.TupleChecker{Require: require.New(t), DS: ds} @@ -107,6 +108,7 @@ func LimitTest(t *testing.T, tester DatastoreTester) { rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(t, err) + t.Cleanup(func() { _ = rawDS.Close() }) ds, rev := testfixtures.StandardDatastoreWithData(rawDS, require.New(t)) tRequire := testfixtures.TupleChecker{Require: require.New(t), DS: ds} @@ -202,6 +204,7 @@ var orderedTestCases = []struct { func OrderedLimitTest(t *testing.T, tester DatastoreTester) { rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(t, err) + t.Cleanup(func() { _ = rawDS.Close() }) ds, rev := testfixtures.StandardDatastoreWithData(rawDS, require.New(t)) tRequire := testfixtures.TupleChecker{Require: require.New(t), DS: ds} @@ -255,6 +258,7 @@ func OrderedLimitTest(t *testing.T, tester DatastoreTester) { func ResumeTest(t *testing.T, tester DatastoreTester) { rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(t, err) + t.Cleanup(func() { _ = rawDS.Close() }) ds, rev := testfixtures.StandardDatastoreWithData(rawDS, require.New(t)) tRequire := testfixtures.TupleChecker{Require: require.New(t), DS: ds} @@ -325,6 +329,7 @@ func CursorErrorsTest(t *testing.T, tester DatastoreTester) { rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(t, err) + t.Cleanup(func() { _ = rawDS.Close() }) ds, rev := testfixtures.StandardDatastoreWithData(rawDS, require.New(t)) ctx := context.Background() diff --git a/pkg/datastore/test/revisions.go b/pkg/datastore/test/revisions.go index 1c42d69932..45ae706258 100644 --- a/pkg/datastore/test/revisions.go +++ b/pkg/datastore/test/revisions.go @@ -34,6 +34,7 @@ func RevisionQuantizationTest(t *testing.T, tester DatastoreTester) { ds, err := tester.New(tc.quantizationRange, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) + t.Cleanup(func() { _ = ds.Close() }) ctx := context.Background() veryFirstRevision, err := ds.OptimizedRevision(ctx) @@ -59,6 +60,7 @@ func RevisionQuantizationTest(t *testing.T, tester DatastoreTester) { time.Sleep(tc.quantizationRange) // Now we should ONLY get revisions later than the now revision + // IF follower read delay is set to 0 for start := time.Now(); time.Since(start) < 10*time.Millisecond; { testRevision, err := ds.OptimizedRevision(ctx) require.NoError(err) @@ -75,6 +77,7 @@ func RevisionSerializationTest(t *testing.T, tester DatastoreTester) { ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) + t.Cleanup(func() { _ = ds.Close() }) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() @@ -98,6 +101,7 @@ func RevisionGCTest(t *testing.T, tester DatastoreTester) { ds, err := tester.New(0, 10*time.Millisecond, 300*time.Millisecond, 1) require.NoError(err) + t.Cleanup(func() { _ = ds.Close() }) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -141,7 +145,7 @@ func RevisionGCTest(t *testing.T, tester DatastoreTester) { // require.Error(ds.CheckRevision(ctx, head), "expected head revision to be valid if out of GC window") // // latest state of the system is invalid if head revision is out of GC window - //_, _, err = ds.SnapshotReader(head).ReadNamespaceByName(ctx, "foo/bar") + // _, _, err = ds.SnapshotReader(head).ReadNamespaceByName(ctx, "foo/bar") // require.Error(err, "expected previously written schema to exist at out-of-GC window head") // check freshly fetched head revision is valid after GC window elapsed @@ -177,6 +181,7 @@ func SequentialRevisionsTest(t *testing.T, tester DatastoreTester) { ds, err := tester.New(0, 10*time.Second, 300*time.Minute, 1) require.NoError(err) + t.Cleanup(func() { _ = ds.Close() }) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -200,6 +205,7 @@ func ConcurrentRevisionsTest(t *testing.T, tester DatastoreTester) { ds, err := tester.New(0, 10*time.Second, 300*time.Minute, 1) require.NoError(err) + t.Cleanup(func() { _ = ds.Close() }) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() diff --git a/pkg/datastore/test/stats.go b/pkg/datastore/test/stats.go index aa0cb785cb..c2209ce967 100644 --- a/pkg/datastore/test/stats.go +++ b/pkg/datastore/test/stats.go @@ -18,6 +18,7 @@ func StatsTest(t *testing.T, tester DatastoreTester) { ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) + t.Cleanup(func() { _ = ds.Close() }) ds, _ = testfixtures.StandardDatastoreWithData(ds, require) diff --git a/pkg/datastore/test/transactions.go b/pkg/datastore/test/transactions.go index b08d11472b..b2221ff66d 100644 --- a/pkg/datastore/test/transactions.go +++ b/pkg/datastore/test/transactions.go @@ -33,6 +33,7 @@ func RetryTest(t *testing.T, tester DatastoreTester) { rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) + t.Cleanup(func() { _ = rawDS.Close() }) ds := rawDS.(TestableDatastore) diff --git a/pkg/datastore/test/tuples.go b/pkg/datastore/test/tuples.go index 108685fff6..0fc9357dbe 100644 --- a/pkg/datastore/test/tuples.go +++ b/pkg/datastore/test/tuples.go @@ -46,7 +46,7 @@ func SimpleTest(t *testing.T, tester DatastoreTester) { ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) - defer ds.Close() + t.Cleanup(func() { _ = ds.Close() }) ctx := context.Background() @@ -264,7 +264,7 @@ func ObjectIDsTest(t *testing.T, tester DatastoreTester) { ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) - defer ds.Close() + t.Cleanup(func() { _ = ds.Close() }) tpl := makeTestTuple(tc, tc) require.NoError(tpl.Validate()) @@ -379,7 +379,7 @@ func DeleteRelationshipsTest(t *testing.T, tester DatastoreTester) { ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) - defer ds.Close() + t.Cleanup(func() { _ = ds.Close() }) setupDatastore(ds, require) @@ -427,7 +427,7 @@ func InvalidReadsTest(t *testing.T, tester DatastoreTester) { ds, err := tester.New(0, veryLargeGCInterval, testGCDuration, 1) require.NoError(err) - defer ds.Close() + t.Cleanup(func() { _ = ds.Close() }) setupDatastore(ds, require) @@ -471,6 +471,7 @@ func DeleteNotExistantTest(t *testing.T, tester DatastoreTester) { rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) + t.Cleanup(func() { _ = rawDS.Close() }) ds, _ := testfixtures.StandardDatastoreWithData(rawDS, require) ctx := context.Background() @@ -492,6 +493,7 @@ func DeleteAlreadyDeletedTest(t *testing.T, tester DatastoreTester) { rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) + t.Cleanup(func() { _ = rawDS.Close() }) ds, _ := testfixtures.StandardDatastoreWithData(rawDS, require) ctx := context.Background() @@ -527,6 +529,7 @@ func WriteDeleteWriteTest(t *testing.T, tester DatastoreTester) { rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) + t.Cleanup(func() { _ = rawDS.Close() }) ds, _ := testfixtures.StandardDatastoreWithData(rawDS, require) ctx := context.Background() @@ -554,6 +557,7 @@ func CreateAlreadyExistingTest(t *testing.T, tester DatastoreTester) { rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) + t.Cleanup(func() { _ = rawDS.Close() }) ds, _ := testfixtures.StandardDatastoreWithData(rawDS, require) ctx := context.Background() @@ -583,6 +587,7 @@ func TouchAlreadyExistingTest(t *testing.T, tester DatastoreTester) { rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) + t.Cleanup(func() { _ = rawDS.Close() }) ds, _ := testfixtures.StandardDatastoreWithData(rawDS, require) ctx := context.Background() @@ -613,6 +618,7 @@ func CreateDeleteTouchTest(t *testing.T, tester DatastoreTester) { rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) + t.Cleanup(func() { _ = rawDS.Close() }) ds, _ := testfixtures.StandardDatastoreWithData(rawDS, require) ctx := context.Background() @@ -754,6 +760,7 @@ func CreateTouchDeleteTouchTest(t *testing.T, tester DatastoreTester) { rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) + t.Cleanup(func() { _ = rawDS.Close() }) ds, _ := testfixtures.StandardDatastoreWithData(rawDS, require) ctx := context.Background() @@ -788,6 +795,7 @@ func TouchAlreadyExistingCaveatedTest(t *testing.T, tester DatastoreTester) { rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) + t.Cleanup(func() { _ = rawDS.Close() }) ds, _ := testfixtures.StandardDatastoreWithData(rawDS, require) ctx := context.Background() @@ -813,6 +821,7 @@ func MultipleReadsInRWTTest(t *testing.T, tester DatastoreTester) { rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) + t.Cleanup(func() { _ = rawDS.Close() }) ds, _ := testfixtures.StandardDatastoreWithData(rawDS, require) ctx := context.Background() @@ -842,6 +851,7 @@ func ConcurrentWriteSerializationTest(t *testing.T, tester DatastoreTester) { rawDS, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) + t.Cleanup(func() { _ = rawDS.Close() }) ds, _ := testfixtures.StandardDatastoreWithData(rawDS, require) ctx := context.Background() diff --git a/pkg/datastore/test/watch.go b/pkg/datastore/test/watch.go index 0191796bbc..04877d1be5 100644 --- a/pkg/datastore/test/watch.go +++ b/pkg/datastore/test/watch.go @@ -60,6 +60,7 @@ func WatchTest(t *testing.T, tester DatastoreTester) { ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 16) require.NoError(err) + t.Cleanup(func() { _ = ds.Close() }) setupDatastore(ds, require) @@ -191,6 +192,7 @@ func WatchCancelTest(t *testing.T, tester DatastoreTester) { ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) require.NoError(err) + t.Cleanup(func() { _ = ds.Close() }) startWatchRevision := setupDatastore(ds, require) @@ -237,6 +239,7 @@ func WatchWithTouchTest(t *testing.T, tester DatastoreTester) { ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 16) require.NoError(err) + t.Cleanup(func() { _ = ds.Close() }) setupDatastore(ds, require) @@ -342,6 +345,7 @@ func WatchWithDeleteTest(t *testing.T, tester DatastoreTester) { ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 16) require.NoError(err) + t.Cleanup(func() { _ = ds.Close() }) setupDatastore(ds, require) @@ -434,6 +438,7 @@ func WatchSchemaTest(t *testing.T, tester DatastoreTester) { ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 16) require.NoError(err) + t.Cleanup(func() { _ = ds.Close() }) setupDatastore(ds, require) @@ -494,6 +499,7 @@ func WatchAllTest(t *testing.T, tester DatastoreTester) { ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 16) require.NoError(err) + t.Cleanup(func() { _ = ds.Close() }) setupDatastore(ds, require) @@ -626,6 +632,7 @@ func WatchCheckpointsTest(t *testing.T, tester DatastoreTester) { ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 16) require.NoError(err) + t.Cleanup(func() { _ = ds.Close() }) setupDatastore(ds, require)