Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions runtime/parser/parse_alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/rilldata/rill/runtime/drivers/slack"
"github.com/rilldata/rill/runtime/pkg/duration"
"github.com/rilldata/rill/runtime/pkg/pbutil"
"golang.org/x/exp/maps"
"google.golang.org/protobuf/types/known/structpb"
)

Expand Down Expand Up @@ -138,11 +139,13 @@ func (p *Parser) parseAlert(node *Node) error {

if !isLegacyQuery {
var refs []ResourceName
resolver, resolverProps, refs, err = p.parseDataYAML(tmp.Data, node.Connector)
var connector string
resolver, resolverProps, connector, refs, err = p.parseDataYAML(tmp.Data, node.Connector)
if err != nil {
return fmt.Errorf(`failed to parse "data": %w`, err)
}
node.Refs = append(node.Refs, refs...)
node.addPostParseHook(connector, p.addConnectorRef(connector))

// Query for: validate only one of user_id, user_email, or attributes is set
n := 0
Expand Down Expand Up @@ -270,7 +273,7 @@ func (p *Parser) parseAlert(node *Node) error {
}

// Track alert
r, err := p.insertResource(ResourceKindAlert, node.Name, node.Paths, node.Refs...)
r, err := p.insertResource(ResourceKindAlert, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks))
if err != nil {
return err
}
Expand Down
6 changes: 4 additions & 2 deletions runtime/parser/parse_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"unicode"

"github.com/rilldata/rill/runtime/pkg/openapiutil"
"golang.org/x/exp/maps"
)

// APIYAML is the raw structure of a API resource defined in YAML (does not include common fields)
Expand Down Expand Up @@ -92,11 +93,12 @@ func (p *Parser) parseAPI(node *Node) error {
}

// Parse the resolver and its properties from the DataYAML
resolver, resolverProps, resolverRefs, err := p.parseDataYAML(&tmp.DataYAML, node.Connector)
resolver, resolverProps, connector, resolverRefs, err := p.parseDataYAML(&tmp.DataYAML, node.Connector)
if err != nil {
return err
}
node.Refs = append(node.Refs, resolverRefs...)
node.addPostParseHook(connector, p.addConnectorRef(connector))

securityRules, err := tmp.Security.Proto()
if err != nil {
Expand All @@ -108,7 +110,7 @@ func (p *Parser) parseAPI(node *Node) error {
}
}

r, err := p.insertResource(ResourceKindAPI, node.Name, node.Paths, node.Refs...)
r, err := p.insertResource(ResourceKindAPI, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks))
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions runtime/parser/parse_canvas.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (p *Parser) parseCanvas(node *Node) error {
}

// Track canvas
r, err := p.insertResource(ResourceKindCanvas, node.Name, node.Paths, node.Refs...)
r, err := p.insertResource(ResourceKindCanvas, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks))
if err != nil {
return err
}
Expand Down Expand Up @@ -287,7 +287,7 @@ func (p *Parser) parseCanvas(node *Node) error {

// Track inline components
for _, def := range inlineComponentDefs {
r, err := p.insertResource(ResourceKindComponent, def.name, node.Paths, def.refs...)
r, err := p.insertResource(ResourceKindComponent, def.name, node.Paths, def.refs, nil)
if err != nil {
// Normally we could return the error, but we can't do that here because we've already inserted the canvas.
// Since the component has been validated with insertDryRun in parseCanvasItemComponent, this error should never happen in practice.
Expand Down
3 changes: 2 additions & 1 deletion runtime/parser/parse_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/santhosh-tekuri/jsonschema/v5"
"golang.org/x/exp/maps"
"google.golang.org/protobuf/types/known/structpb"

_ "embed"
Expand Down Expand Up @@ -72,7 +73,7 @@ func (p *Parser) parseComponent(node *Node) error {
node.Refs = append(node.Refs, refs...)

// Track component
r, err := p.insertResource(ResourceKindComponent, node.Name, node.Paths, node.Refs...)
r, err := p.insertResource(ResourceKindComponent, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks))
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion runtime/parser/parse_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package parser
import (
"fmt"

"golang.org/x/exp/maps"
"google.golang.org/protobuf/types/known/structpb"
"gopkg.in/yaml.v3"
)
Expand Down Expand Up @@ -65,7 +66,7 @@ func (p *Parser) parseConnector(node *Node) error {
}

// Insert the connector
r, err := p.insertResource(ResourceKindConnector, node.Name, node.Paths, node.Refs...)
r, err := p.insertResource(ResourceKindConnector, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion runtime/parser/parse_explore.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (p *Parser) parseExplore(node *Node) error {
}

// Track explore
r, err := p.insertResource(ResourceKindExplore, node.Name, node.Paths, node.Refs...)
r, err := p.insertResource(ResourceKindExplore, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks))
if err != nil {
return err
}
Expand Down
26 changes: 23 additions & 3 deletions runtime/parser/parse_metrics_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,26 @@ func (p *Parser) parseMetricsView(node *Node) error {
return err
}
node.Refs = append(node.Refs, securityRefs...)
node.addPostParseHook(node.Connector, func(r *Resource) bool {
// check if the model is actually a resource in which case no need to add a ref to the connector
// model's connector can be different and a ref to model will ensure correct DAG link
if tmp.Model != "" {
_, ok := p.Resources[ResourceName{Kind: ResourceKindModel, Name: tmp.Model}.Normalized()]
if ok {
// clear older refs to connector, if any
for i, ref := range r.Refs {
if ref.Kind == ResourceKindConnector && ref.Name == node.Connector {
// okay to modify r.Refs here as we return immediately after
r.Refs = append(r.Refs[:i], r.Refs[i+1:]...)
return true
}
}
return false
}
}
f := p.addConnectorRef(node.Connector)
return f(r)
})

var cacheTTLDuration time.Duration
if tmp.Cache.KeyTTL != "" {
Expand All @@ -743,7 +763,7 @@ func (p *Parser) parseMetricsView(node *Node) error {
}

// insert metrics view resource immediately after parsing the inline explore as it inserts the explore resource so we should not return an error now
r, err := p.insertResource(ResourceKindMetricsView, node.Name, node.Paths, node.Refs...)
r, err := p.insertResource(ResourceKindMetricsView, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks))
if err != nil {
// If we fail to insert the metrics view, we must delete the inline explore if it was created.
if exploreRes != nil {
Expand Down Expand Up @@ -829,7 +849,7 @@ func (p *Parser) parseMetricsView(node *Node) error {
if tmp.DefaultTheme != "" {
refs = append(refs, ResourceName{Kind: ResourceKindTheme, Name: tmp.DefaultTheme})
}
e, err := p.insertResource(ResourceKindExplore, node.Name, node.Paths, refs...)
e, err := p.insertResource(ResourceKindExplore, node.Name, node.Paths, refs, maps.Values(node.postParseHooks))
if err != nil {
// We mustn't error because we have already emitted one resource.
// Since this probably means an explore has been defined separately, we can just ignore this error.
Expand Down Expand Up @@ -1014,7 +1034,7 @@ func (p *Parser) parseAndInsertInlineExplore(tmp *MetricsViewYAML, mvName string
name = tmp.Explore.Name
}
// Track explore
r, err := p.insertResource(ResourceKindExplore, name, mvPaths, refs...)
r, err := p.insertResource(ResourceKindExplore, name, mvPaths, refs, nil)
if err != nil {
return false, nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion runtime/parser/parse_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package parser

import (
"strings"

"golang.org/x/exp/maps"
)

// MigrationYAML is the raw structure of a Migration resource defined in YAML (does not include common fields)
Expand All @@ -19,7 +21,7 @@ func (p *Parser) parseMigration(node *Node) error {
}

// Add resource
r, err := p.insertResource(ResourceKindMigration, node.Name, node.Paths, node.Refs...)
r, err := p.insertResource(ResourceKindMigration, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks))
if err != nil {
return err
}
Expand Down
31 changes: 20 additions & 11 deletions runtime/parser/parse_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/pkg/duckdbsql"
"github.com/rilldata/rill/runtime/pkg/fileutil"
"golang.org/x/exp/maps"
"google.golang.org/protobuf/types/known/structpb"
"gopkg.in/yaml.v3"
)
Expand Down Expand Up @@ -116,6 +117,8 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error {
inputProps = map[string]any{}
}

node.addPostParseHook(inputConnector, p.addConnectorRef(inputConnector))

// Special handling for adding SQL to the input properties
if sql := strings.TrimSpace(node.SQL); sql != "" {
refs, err := p.inferSQLRefs(node)
Expand Down Expand Up @@ -154,6 +157,7 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error {
if outputConnector == "" {
outputConnector = p.defaultOLAPConnector()
}
node.addPostParseHook(outputConnector, p.addConnectorRef(outputConnector))
outputProps := tmp.Output.Properties

// Backwards compatibility: materialize can be specified outside of the output properties
Expand All @@ -178,11 +182,13 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error {
var incrementalStateResolverProps *structpb.Struct
if tmp.State != nil {
var refs []ResourceName
incrementalStateResolver, incrementalStateResolverProps, refs, err = p.parseDataYAML(tmp.State, outputConnector)
var connector string
incrementalStateResolver, incrementalStateResolverProps, connector, refs, err = p.parseDataYAML(tmp.State, outputConnector)
if err != nil {
return fmt.Errorf(`failed to parse "state": %w`, err)
}
node.Refs = append(node.Refs, refs...)
node.addPostParseHook(connector, p.addConnectorRef(connector))
}

// Parse partitions resolver
Expand All @@ -196,11 +202,13 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error {
}
if tmp.Partitions != nil {
var refs []ResourceName
partitionsResolver, partitionsResolverProps, refs, err = p.parseDataYAML(tmp.Partitions, inputConnector)
var connector string
partitionsResolver, partitionsResolverProps, connector, refs, err = p.parseDataYAML(tmp.Partitions, inputConnector)
if err != nil {
return fmt.Errorf(`failed to parse "partitions": %w`, err)
}
node.Refs = append(node.Refs, refs...)
node.addPostParseHook(connector, p.addConnectorRef(connector))

// As a small convenience, automatically set the watermark field for resolvers where we know a good default
if tmp.PartitionsWatermark == "" {
Expand All @@ -214,12 +222,13 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error {
var modelTests []*runtimev1.ModelTest
for i := range tmp.Tests {
t := tmp.Tests[i]
modelTest, refs, err := p.parseModelTest(t.Name, &t.DataYAML, outputConnector, node.Name, t.Assert)
modelTest, connector, refs, err := p.parseModelTest(t.Name, &t.DataYAML, outputConnector, node.Name, t.Assert)
if err != nil {
return fmt.Errorf(`failed to parse test %q: %w`, t.Name, err)
}
modelTests = append(modelTests, modelTest)
node.Refs = append(node.Refs, refs...)
node.addPostParseHook(connector, p.addConnectorRef(connector))
}

var retryDelay *uint32
Expand All @@ -233,7 +242,7 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error {
}

// Insert the model
r, err := p.insertResource(ResourceKindModel, node.Name, node.Paths, node.Refs...)
r, err := p.insertResource(ResourceKindModel, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks))
if err != nil {
return err
}
Expand Down Expand Up @@ -281,10 +290,10 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error {
}

// parseModelTests parses the model tests from the YAML file
func (p *Parser) parseModelTest(name string, data *DataYAML, connector, modelName, assert string) (*runtimev1.ModelTest, []ResourceName, error) {
func (p *Parser) parseModelTest(name string, data *DataYAML, connector, modelName, assert string) (*runtimev1.ModelTest, string, []ResourceName, error) {
// Validate required name field
if name == "" {
return nil, nil, fmt.Errorf(`test must have a "name" defined`)
return nil, "", nil, fmt.Errorf(`test must have a "name" defined`)
}

hasSQL := data.SQL != ""
Expand All @@ -293,24 +302,24 @@ func (p *Parser) parseModelTest(name string, data *DataYAML, connector, modelNam
// Validate that exactly one of "sql" or "assert" is provided
switch {
case hasSQL && hasAssertion:
return nil, nil, fmt.Errorf(`test %q must not have both "sql" and "assert" defined`, name)
return nil, "", nil, fmt.Errorf(`test %q must not have both "sql" and "assert" defined`, name)
case !hasSQL && !hasAssertion:
return nil, nil, fmt.Errorf(`test %q must have either "sql" or "assert" defined`, name)
return nil, "", nil, fmt.Errorf(`test %q must have either "sql" or "assert" defined`, name)
case hasAssertion:
// Wrap assertion condition in a SQL query following SQLMesh audit pattern
// Query for rows that violate the assertion (bad data)
data.SQL = fmt.Sprintf("SELECT * FROM %s WHERE NOT (%s)", modelName, assert)
}

resolver, props, refs, err := p.parseDataYAML(data, connector)
resolver, props, connector, refs, err := p.parseDataYAML(data, connector)
if err != nil {
return nil, nil, err
return nil, "", nil, err
}
return &runtimev1.ModelTest{
Name: name,
Resolver: resolver,
ResolverProperties: props,
}, refs, nil
}, connector, refs, nil
}

// inferSQLRefs attempts to infer table references from the node's SQL.
Expand Down
11 changes: 11 additions & 0 deletions runtime/parser/parse_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ type Node struct {
SQLPath string
SQLAnnotations map[string]any
SQLUsesTemplating bool

postParseHooks map[string]postParseHook
}

func (n *Node) addPostParseHook(key string, hook postParseHook) {
if n.postParseHooks == nil {
n.postParseHooks = make(map[string]postParseHook)
}
if _, ok := n.postParseHooks[key]; !ok {
n.postParseHooks[key] = hook
}
}

// parseNode multiplexes to the appropriate parse function based on the node kind.
Expand Down
19 changes: 10 additions & 9 deletions runtime/parser/parse_partial_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ type DataYAML struct {
// parseDataYAML parses a data resolver and its properties from a DataYAML.
// The contextualConnector argument is optional; if provided and the resolver supports a connector, it becomes the default connector for the resolver.
// It returns the resolver name, its properties, and refs found in the resolver props.
func (p *Parser) parseDataYAML(raw *DataYAML, contextualConnector string) (string, *structpb.Struct, []ResourceName, error) {
func (p *Parser) parseDataYAML(raw *DataYAML, contextualConnector string) (string, *structpb.Struct, string, []ResourceName, error) {
// Parse the resolver and its properties
var count int
var resolver string
var resolver, connector string
var refs []ResourceName
resolverProps := make(map[string]any)

Expand All @@ -35,10 +35,11 @@ func (p *Parser) parseDataYAML(raw *DataYAML, contextualConnector string) (strin
resolver = "sql"
resolverProps["sql"] = raw.SQL
if raw.Connector != "" {
resolverProps["connector"] = raw.Connector
connector = raw.Connector
} else if contextualConnector != "" {
resolverProps["connector"] = contextualConnector
connector = contextualConnector
}
resolverProps["connector"] = connector
}

// Handle metrics SQL resolver
Expand Down Expand Up @@ -69,7 +70,7 @@ func (p *Parser) parseDataYAML(raw *DataYAML, contextualConnector string) (strin
props = make(map[string]any)
err := raw.Glob.Decode(props)
if err != nil {
return "", nil, nil, fmt.Errorf("failed to parse glob properties: %w", err)
return "", nil, "", nil, fmt.Errorf("failed to parse glob properties: %w", err)
}
}

Expand All @@ -87,17 +88,17 @@ func (p *Parser) parseDataYAML(raw *DataYAML, contextualConnector string) (strin

// Validate there was exactly one resolver
if count == 0 {
return "", nil, nil, fmt.Errorf(`the API definition does not specify a resolver (for example, "sql:", "metrics_sql:", ...)`)
return "", nil, "", nil, fmt.Errorf(`the API definition does not specify a resolver (for example, "sql:", "metrics_sql:", ...)`)
}
if count > 1 {
return "", nil, nil, fmt.Errorf(`the API definition specifies more than one resolver`)
return "", nil, "", nil, fmt.Errorf(`the API definition specifies more than one resolver`)
}

// Convert resolver properties to structpb.Struct
resolverPropsPB, err := structpb.NewStruct(resolverProps)
if err != nil {
return "", nil, nil, fmt.Errorf("encountered invalid property type: %w", err)
return "", nil, "", nil, fmt.Errorf("encountered invalid property type: %w", err)
}

return resolver, resolverPropsPB, refs, nil
return resolver, resolverPropsPB, connector, refs, nil
}
Loading
Loading