Skip to content

Commit

Permalink
Implement caveats reader (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
sashayakovtseva authored Feb 16, 2024
1 parent 682711a commit 660f281
Showing 3 changed files with 322 additions and 13 deletions.
5 changes: 5 additions & 0 deletions internal/datastore/ydb/query.go
Original file line number Diff line number Diff line change
@@ -22,6 +22,11 @@ const (
tableNamespaceConfig = "namespace_config"
colSerializedConfig = "serialized_config"
colNamespace = "namespace"

// caveat
tableCaveat = "caveat"
coltName = "name"
colDefinition = "definition"
)

var (
122 changes: 113 additions & 9 deletions internal/datastore/ydb/reader.go
Original file line number Diff line number Diff line change
@@ -4,7 +4,6 @@ import (
"context"
"fmt"

sq "github.com/Masterminds/squirrel"
yq "github.com/flymedllva/ydb-go-qb/yqb"
"github.com/ydb-platform/ydb-go-sdk/v3/table"

@@ -21,19 +20,49 @@ type ydbReader struct {
modifier queryModifier
}

func (r *ydbReader) ReadCaveatByName(ctx context.Context, name string) (caveat *core.CaveatDefinition, lastWritten datastore.Revision, err error) {
// TODO implement me
panic("implement me")
func (r *ydbReader) ReadCaveatByName(
ctx context.Context,
name string,
) (*core.CaveatDefinition, datastore.Revision, error) {
loaded, version, err := r.loadCaveat(ctx, name)
if err != nil {
return nil, datastore.NoRevision, fmt.Errorf("filed to read caveat: %w", err)
}
return loaded, version, nil
}

func (r *ydbReader) ListAllCaveats(ctx context.Context) ([]datastore.RevisionedCaveat, error) {
// TODO implement me
panic("implement me")
caveatsWithRevisions, err := loadAllCaveats(ctx, r.tablePathPrefix, r.executor, r.modifier)
if err != nil {
return nil, fmt.Errorf("failed to list caveats: %w", err)
}

return caveatsWithRevisions, err
}

func (r *ydbReader) LookupCaveatsWithNames(ctx context.Context, names []string) ([]datastore.RevisionedCaveat, error) {
// TODO implement me
panic("implement me")
if len(names) == 0 {
return nil, nil
}

var clause yq.Or
for _, nsName := range names {
clause = append(clause, yq.Eq{coltName: nsName})
}

caveatsWithRevisions, err := loadAllCaveats(
ctx,
r.tablePathPrefix,
r.executor,
func(builder yq.SelectBuilder) yq.SelectBuilder {
return r.modifier(builder).Where(clause)
},
)
if err != nil {
return nil, fmt.Errorf("failed to list caveats with names: %w", err)
}

return caveatsWithRevisions, err
}

func (r *ydbReader) QueryRelationships(ctx context.Context, filter datastore.RelationshipsFilter, options ...options.QueryOptionsOption) (datastore.RelationshipIterator, error) {
@@ -108,7 +137,7 @@ func (r *ydbReader) loadNamespace(
r.tablePathPrefix,
r.executor,
func(builder yq.SelectBuilder) yq.SelectBuilder {
return r.modifier(builder).Where(sq.Eq{colNamespace: namespace})
return r.modifier(builder).Where(yq.Eq{colNamespace: namespace})
},
)
if err != nil {
@@ -165,3 +194,78 @@ func loadAllNamespaces(

return nsDefs, nil
}

var readCaveatBuilder = yq.Select(colDefinition, colCreatedAtUnixNano).From(tableCaveat)

func (r *ydbReader) loadCaveat(
ctx context.Context,
name string,
) (*core.CaveatDefinition, revisions.TimestampRevision, error) {
ctx, span := tracer.Start(ctx, "loadCaveat")
defer span.End()

defs, err := loadAllCaveats(
ctx,
r.tablePathPrefix,
r.executor,
func(builder yq.SelectBuilder) yq.SelectBuilder {
return r.modifier(builder).Where(yq.Eq{coltName: name})
},
)
if err != nil {
return nil, revisions.TimestampRevision(0), err
}

if len(defs) < 1 {
return nil, revisions.TimestampRevision(0), datastore.NewCaveatNameNotFoundErr(name)
}

return defs[0].Definition, defs[0].LastWrittenRevision.(revisions.TimestampRevision), nil
}

func loadAllCaveats(
ctx context.Context,
tablePathPrefix string,
executor queryExecutor,
modifier queryModifier,
) ([]datastore.RevisionedCaveat, error) {
sql, args, err := modifier(readCaveatBuilder).ToYdbSql()
if err != nil {
return nil, err
}

sql = common.AddTablePrefix(sql, tablePathPrefix)
res, err := executor.Execute(ctx, sql, table.NewQueryParameters(args...))
if err != nil {
return nil, err
}
defer res.Close()

var caveatDefs []datastore.RevisionedCaveat
for res.NextResultSet(ctx) {
for res.NextRow() {
var definition []byte
var createdAtUnixNano int64
if err := res.Scan(&definition, &createdAtUnixNano); err != nil {
return nil, err
}

var loaded core.CaveatDefinition
if err := loaded.UnmarshalVT(definition); err != nil {
return nil, fmt.Errorf("failed to read caveat definition: %w", err)
}

revision := revisions.NewForTimestamp(createdAtUnixNano)
caveatDefs = append(caveatDefs, datastore.RevisionedCaveat{
Definition: &loaded,
LastWrittenRevision: revision,
})
}
}

if err := res.Err(); err != nil {
return nil, err
}

return caveatDefs, nil
}
Loading

0 comments on commit 660f281

Please sign in to comment.