From 277573fec11173f2f483d24ff34da203a49f2d54 Mon Sep 17 00:00:00 2001 From: sashayakovtseva Date: Mon, 12 Feb 2024 14:52:27 +0800 Subject: [PATCH 1/3] Use yq Signed-off-by: sashayakovtseva --- internal/datastore/ydb/reader.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/datastore/ydb/reader.go b/internal/datastore/ydb/reader.go index 10ff4cdcd2..5cdb6a7010 100644 --- a/internal/datastore/ydb/reader.go +++ b/internal/datastore/ydb/reader.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - sq "github.com/Masterminds/squirrel" yq "github.com/flymedllva/ydb-go-qb/yqb" "github.com/ydb-platform/ydb-go-sdk/v3/table" @@ -108,7 +107,7 @@ func (r *ydbReader) loadNamespace( r.tablePathPrefix, r.executor, func(builder yq.SelectBuilder) yq.SelectBuilder { - return r.modifier(builder).Where(sq.Eq{colNamespace: namespace}) + return r.modifier(builder).Where(yq.Eq{colNamespace: namespace}) }, ) if err != nil { From d55d5faa568f1ca6e6fd830352db9bddeea33f92 Mon Sep 17 00:00:00 2001 From: sashayakovtseva Date: Mon, 12 Feb 2024 17:51:44 +0800 Subject: [PATCH 2/3] Implement caveats reader Signed-off-by: sashayakovtseva --- internal/datastore/ydb/query.go | 5 ++ internal/datastore/ydb/reader.go | 119 +++++++++++++++++++++++++++++-- 2 files changed, 117 insertions(+), 7 deletions(-) diff --git a/internal/datastore/ydb/query.go b/internal/datastore/ydb/query.go index c10e52fe70..1cb593f344 100644 --- a/internal/datastore/ydb/query.go +++ b/internal/datastore/ydb/query.go @@ -22,6 +22,11 @@ const ( tableNamespaceConfig = "namespace_config" colSerializedConfig = "serialized_config" colNamespace = "namespace" + + // caveat + tableCaveat = "caveat" + coltName = "name" + colDefinition = "definition" ) var ( diff --git a/internal/datastore/ydb/reader.go b/internal/datastore/ydb/reader.go index 5cdb6a7010..915fcafc77 100644 --- a/internal/datastore/ydb/reader.go +++ b/internal/datastore/ydb/reader.go @@ -20,19 +20,49 @@ type ydbReader struct { modifier queryModifier } -func (r *ydbReader) ReadCaveatByName(ctx context.Context, name string) (caveat *core.CaveatDefinition, lastWritten datastore.Revision, err error) { - // TODO implement me - panic("implement me") +func (r *ydbReader) ReadCaveatByName( + ctx context.Context, + name string, +) (*core.CaveatDefinition, datastore.Revision, error) { + loaded, version, err := r.loadCaveat(ctx, name) + if err != nil { + return nil, datastore.NoRevision, fmt.Errorf("filed to read caveat: %w", err) + } + return loaded, version, nil } func (r *ydbReader) ListAllCaveats(ctx context.Context) ([]datastore.RevisionedCaveat, error) { - // TODO implement me - panic("implement me") + caveatsWithRevisions, err := loadAllCaveats(ctx, r.tablePathPrefix, r.executor, r.modifier) + if err != nil { + return nil, fmt.Errorf("failed to list caveats: %w", err) + } + + return caveatsWithRevisions, err } func (r *ydbReader) LookupCaveatsWithNames(ctx context.Context, names []string) ([]datastore.RevisionedCaveat, error) { - // TODO implement me - panic("implement me") + if len(names) == 0 { + return nil, nil + } + + var clause yq.Or + for _, nsName := range names { + clause = append(clause, yq.Eq{coltName: nsName}) + } + + caveatsWithRevisions, err := loadAllCaveats( + ctx, + r.tablePathPrefix, + r.executor, + func(builder yq.SelectBuilder) yq.SelectBuilder { + return r.modifier(builder).Where(clause) + }, + ) + if err != nil { + return nil, fmt.Errorf("failed to list caveats with names: %w", err) + } + + return caveatsWithRevisions, err } func (r *ydbReader) QueryRelationships(ctx context.Context, filter datastore.RelationshipsFilter, options ...options.QueryOptionsOption) (datastore.RelationshipIterator, error) { @@ -164,3 +194,78 @@ func loadAllNamespaces( return nsDefs, nil } + +var readCaveatBuilder = yq.Select(colDefinition, colCreatedAtUnixNano).From(tableCaveat) + +func (r *ydbReader) loadCaveat( + ctx context.Context, + name string, +) (*core.CaveatDefinition, revisions.TimestampRevision, error) { + ctx, span := tracer.Start(ctx, "loadCaveat") + defer span.End() + + defs, err := loadAllCaveats( + ctx, + r.tablePathPrefix, + r.executor, + func(builder yq.SelectBuilder) yq.SelectBuilder { + return r.modifier(builder).Where(yq.Eq{coltName: name}) + }, + ) + if err != nil { + return nil, revisions.TimestampRevision(0), err + } + + if len(defs) < 1 { + return nil, revisions.TimestampRevision(0), datastore.NewCaveatNameNotFoundErr(name) + } + + return defs[0].Definition, defs[0].LastWrittenRevision.(revisions.TimestampRevision), nil +} + +func loadAllCaveats( + ctx context.Context, + tablePathPrefix string, + executor queryExecutor, + modifier queryModifier, +) ([]datastore.RevisionedCaveat, error) { + sql, args, err := modifier(readCaveatBuilder).ToYdbSql() + if err != nil { + return nil, err + } + + sql = common.AddTablePrefix(sql, tablePathPrefix) + res, err := executor.Execute(ctx, sql, table.NewQueryParameters(args...)) + if err != nil { + return nil, err + } + defer res.Close() + + var caveatDefs []datastore.RevisionedCaveat + for res.NextResultSet(ctx) { + for res.NextRow() { + var definition []byte + var createdAtUnixNano int64 + if err := res.Scan(&definition, &createdAtUnixNano); err != nil { + return nil, err + } + + var loaded core.CaveatDefinition + if err := loaded.UnmarshalVT(definition); err != nil { + return nil, fmt.Errorf("failed to read caveat definition: %w", err) + } + + revision := revisions.NewForTimestamp(createdAtUnixNano) + caveatDefs = append(caveatDefs, datastore.RevisionedCaveat{ + Definition: &loaded, + LastWrittenRevision: revision, + }) + } + } + + if err := res.Err(); err != nil { + return nil, err + } + + return caveatDefs, nil +} From 4c15b31ab38b03681e19f5f18443b803f64179e9 Mon Sep 17 00:00:00 2001 From: sashayakovtseva Date: Mon, 12 Feb 2024 18:35:00 +0800 Subject: [PATCH 3/3] Add caveats reader tests Signed-off-by: sashayakovtseva --- internal/datastore/ydb/reader_test.go | 208 +++++++++++++++++++++++++- 1 file changed, 204 insertions(+), 4 deletions(-) diff --git a/internal/datastore/ydb/reader_test.go b/internal/datastore/ydb/reader_test.go index e4ff933796..9f681ba585 100644 --- a/internal/datastore/ydb/reader_test.go +++ b/internal/datastore/ydb/reader_test.go @@ -123,8 +123,9 @@ func TestYDBReaderNamespaces(t *testing.T) { lastWrittenRevision datastore.Revision } - matchNamspace := func(t *testing.T, expect expectNs, actual datastore.RevisionedNamespace) { + matchNamespace := func(t *testing.T, expect expectNs, actual datastore.RevisionedNamespace) { require.Equal(t, expect.lastWrittenRevision, actual.LastWrittenRevision) + require.Equal(t, expect.name, actual.Definition.GetName()) actualRelations := lo.Map(actual.Definition.GetRelation(), func(item *core.Relation, index int) string { return item.GetName() }) @@ -138,7 +139,7 @@ func TestYDBReaderNamespaces(t *testing.T) { for _, ns := range nss { expectNs, ok := expectNs[ns.Definition.GetName()] require.True(t, ok) - matchNamspace(t, expectNs, ns) + matchNamespace(t, expectNs, ns) } } @@ -154,7 +155,7 @@ func TestYDBReaderNamespaces(t *testing.T) { for _, ns := range nss { expectNs, ok := expect[ns.Definition.GetName()] require.True(t, ok) - matchNamspace(t, expectNs, ns) + matchNamespace(t, expectNs, ns) } } @@ -165,7 +166,7 @@ func TestYDBReaderNamespaces(t *testing.T) { return } require.NoError(t, err) - matchNamspace(t, expect, datastore.RevisionedNamespace{ + matchNamespace(t, expect, datastore.RevisionedNamespace{ Definition: ns, LastWrittenRevision: lastWritten, }) @@ -222,3 +223,202 @@ func TestYDBReaderNamespaces(t *testing.T) { testLookupNamespacesWithNames(t, r, []string{"user", "document"}, nil) }) } + +func TestYDBReaderCaveats(t *testing.T) { + engine := testserverDatastore.RunYDBForTesting(t, "") + + ds := engine.NewDatastore(t, func(engine, dsn string) datastore.Datastore { + ds, err := NewYDBDatastore(context.Background(), dsn) + require.NoError(t, err) + + yDS, err := newYDBDatastore(context.Background(), dsn) + require.NoError(t, err) + t.Cleanup(func() { yDS.Close() }) + + err = yDS.driver.Table().Do(context.Background(), func(ctx context.Context, s table.Session) error { + testCaveat := []struct { + name string + configBase64 string + createdAt int64 + deletedAt *int64 + }{ + { + name: "one", + configBase64: "CgNvbmUScRIDb25lCmoSDAgCEggaBmVxdWFscxIHCAESAwoBdhoGCAMSAhgCGgYIAhICGAEaBggB" + + "EgIYAiIbEAIyFxIEXz09XxoHEAEiAwoBdhoGEAMaAhgBKhwSA29uZRoDFB0eIgQIAhAYIgQIAxAb" + + "IgQIARAWGgoKAXYSBQoDaW50KgA=", + createdAt: 1, + deletedAt: proto.Int64(7), + }, + { + name: "two", + configBase64: "CgN0d28SdBIDdHdvCm0SBwgBEgMKAXYSDAgCEggaBmVxdWFscxoGCAMSAhgCGgYIAhICGAEaBggB" + + "EgIYAiIbEAIyFxIEXz09XxoHEAEiAwoBdhoGEAMaAhgCKh8SA3R3bxoGMDEyMzw9IgQIARA1IgQI" + + "AhA3IgQIAxA6GgoKAXYSBQoDaW50KgIIAw==", + createdAt: 1, + deletedAt: proto.Int64(7), + }, + { + name: "three", + configBase64: "CgV0aHJlZRJ7EgV0aHJlZQpyEgcIARIDCgF2EgwIAhIIGgZlcXVhbHMaBggDEgIYAhoGCAISAhgB" + + "GgYIARICGAIiGxACMhcSBF89PV8aBxABIgMKAXYaBhADGgIYAyokEgV0aHJlZRoJTk9QUVJTVF1e" + + "IgQIARBWIgQIAhBYIgQIAxBbGgoKAXYSBQoDaW50KgIIBg==", + createdAt: 10, + }, + { + name: "four", + configBase64: "CgRmb3VyEnwSBGZvdXIKdBIHCAESAwoBdhIMCAISCBoGZXF1YWxzGgYIARICGAIaBggDEgIYAhoG" + + "CAISAhgBIhsQAjIXEgRfPT1fGgcQASIDCgF2GgYQAxoCGAQqJhIEZm91choMa2xtbm9wcXJzdH1+" + + "IgQIARB2IgQIAhB4IgQIAxB7GgoKAXYSBQoDaW50KgIICQ==", + createdAt: 10, + }, + } + + stmt, err := s.Prepare( + ctx, + common.AddTablePrefix(` + DECLARE $name AS Utf8; + DECLARE $config AS String; + DECLARE $createdAt AS Int64; + DECLARE $deletedAt AS Optional; + INSERT INTO + caveat(name, definition, created_at_unix_nano, deleted_at_unix_nano) + VALUES + ($name, $config, $createdAt, $deletedAt); + `, yDS.config.tablePathPrefix), + ) + if err != nil { + return err + } + + for i := 0; i < len(testCaveat); i++ { + conf, err := base64.StdEncoding.DecodeString(testCaveat[i].configBase64) + if err != nil { + return err + } + + _, _, err = stmt.Execute( + ctx, + table.DefaultTxControl(), + table.NewQueryParameters( + table.ValueParam("$name", types.UTF8Value(testCaveat[i].name)), + table.ValueParam("$config", types.BytesValue(conf)), + table.ValueParam("$createdAt", types.Int64Value(testCaveat[i].createdAt)), + table.ValueParam("$deletedAt", types.NullableInt64Value(testCaveat[i].deletedAt)), + ), + ) + if err != nil { + return err + } + } + + return nil + }) + require.NoError(t, err) + + return ds + }) + t.Cleanup(func() { ds.Close() }) + + type expectCaveat struct { + name string + lastWrittenRevision datastore.Revision + } + + matchCaveat := func(t *testing.T, expect expectCaveat, actual datastore.RevisionedCaveat) { + require.Equal(t, expect.lastWrittenRevision, actual.LastWrittenRevision) + require.Equal(t, expect.name, actual.Definition.GetName()) + } + + testListAllCaveats := func(t *testing.T, r datastore.Reader, expectCaveat map[string]expectCaveat) { + vv, err := r.ListAllCaveats(context.Background()) + require.NoError(t, err) + require.Len(t, vv, len(expectCaveat)) + for _, v := range vv { + expectCaveat, ok := expectCaveat[v.Definition.GetName()] + require.True(t, ok) + matchCaveat(t, expectCaveat, v) + } + } + + testLookupCaveatsWithNames := func( + t *testing.T, + r datastore.Reader, + in []string, + expect map[string]expectCaveat, + ) { + vv, err := r.LookupCaveatsWithNames(context.Background(), in) + require.NoError(t, err) + require.Len(t, vv, len(expect)) + for _, v := range vv { + expectCaveat, ok := expect[v.Definition.GetName()] + require.True(t, ok) + matchCaveat(t, expectCaveat, v) + } + } + + testReadCaveatByName := func(t *testing.T, r datastore.Reader, expect expectCaveat) { + v, lastWritten, err := r.ReadCaveatByName(context.Background(), expect.name) + if expect.name == "" { + require.ErrorAs(t, err, new(datastore.ErrNotFound)) + return + } + require.NoError(t, err) + matchCaveat(t, expect, datastore.RevisionedCaveat{ + Definition: v, + LastWrittenRevision: lastWritten, + }) + } + + t.Run("removed caveats not garbage collected", func(t *testing.T) { + testRevision := revisions.NewForTimestamp(6) + expectedNs := map[string]expectCaveat{ + "one": { + name: "one", + lastWrittenRevision: revisions.NewForTimestamp(1), + }, + "two": { + name: "two", + lastWrittenRevision: revisions.NewForTimestamp(1), + }, + } + r := ds.SnapshotReader(testRevision) + + testListAllCaveats(t, r, expectedNs) + testReadCaveatByName(t, r, expectedNs["onw"]) + testLookupCaveatsWithNames(t, r, + []string{"one", "unknown"}, + map[string]expectCaveat{"one": expectedNs["one"]}, + ) + }) + + t.Run("latest caveats", func(t *testing.T) { + testRevision := revisions.NewForTimestamp(10) + expectedNs := map[string]expectCaveat{ + "three": { + name: "three", + lastWrittenRevision: revisions.NewForTimestamp(10), + }, + "four": { + name: "four", + lastWrittenRevision: revisions.NewForTimestamp(10), + }, + } + r := ds.SnapshotReader(testRevision) + testListAllCaveats(t, r, expectedNs) + testReadCaveatByName(t, r, expectedNs["four"]) + testLookupCaveatsWithNames(t, r, + []string{"four", "unknown"}, + map[string]expectCaveat{"four": expectedNs["four"]}, + ) + }) + + t.Run("no caveats", func(t *testing.T) { + testRevision := revisions.NewForTimestamp(7) + expectedNs := map[string]expectCaveat{} + r := ds.SnapshotReader(testRevision) + testListAllCaveats(t, r, expectedNs) + testReadCaveatByName(t, r, expectedNs["one"]) + testLookupCaveatsWithNames(t, r, []string{"one", "two"}, nil) + }) +}