Skip to content

Commit

Permalink
feat: query plan for subscriptions (#1008)
Browse files Browse the repository at this point in the history
  • Loading branch information
thisisnithin authored Dec 10, 2024
1 parent c4b8c3b commit 34cc4fa
Show file tree
Hide file tree
Showing 4 changed files with 328 additions and 3 deletions.
46 changes: 46 additions & 0 deletions v2/pkg/engine/postprocess/postprocess.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package postprocess

import (
"encoding/json"
"fmt"
"slices"

"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan"
Expand Down Expand Up @@ -142,6 +144,7 @@ func (p *Processor) Process(pre plan.Plan) plan.Plan {
p.processResponseTree[i].ProcessSubscription(t.Response.Response.Data)
}
p.createFetchTree(t.Response.Response)
p.appendTriggerToFetchTree(t.Response)
p.dedupe.ProcessFetchTree(t.Response.Response.Fetches)
p.resolveInputTemplates.ProcessFetchTree(t.Response.Response.Fetches)
p.resolveInputTemplates.ProcessTrigger(&t.Response.Trigger)
Expand Down Expand Up @@ -184,3 +187,46 @@ func (p *Processor) createFetchTree(res *resolve.GraphQLResponse) {
ChildNodes: children,
}
}

func (p *Processor) appendTriggerToFetchTree(res *resolve.GraphQLSubscription) {
var input struct {
Body struct {
Query string `json:"query"`
} `json:"body"`
}

err := json.Unmarshal(res.Trigger.Input, &input)
if err != nil {
fmt.Println("error decoding subscription input", err)
return
}

rootData := res.Response.Data
if rootData == nil || len(rootData.Fields) == 0 {
return
}

info := rootData.Fields[0].Info
if info == nil {
return
}

res.Response.Fetches.Trigger = &resolve.FetchTreeNode{
Kind: resolve.FetchTreeNodeKindTrigger,
Item: &resolve.FetchItem{
Fetch: &resolve.SingleFetch{
FetchDependencies: resolve.FetchDependencies{
FetchID: info.FetchID,
},
Info: &resolve.FetchInfo{
DataSourceID: info.Source.IDs[0],
DataSourceName: info.Source.Names[0],
QueryPlan: &resolve.QueryPlan{
Query: input.Body.Query,
},
},
},
ResponsePath: info.Name,
},
}
}
25 changes: 22 additions & 3 deletions v2/pkg/engine/resolve/fetchtree.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
)

type FetchTreeNode struct {
Kind FetchTreeNodeKind `json:"kind"`
Item *FetchItem `json:"item"`
ChildNodes []*FetchTreeNode `json:"child_nodes"`
Kind FetchTreeNodeKind `json:"kind"`
// Only set for subscription
Trigger *FetchTreeNode `json:"trigger"`
Item *FetchItem `json:"item"`
ChildNodes []*FetchTreeNode `json:"child_nodes"`
}

type FetchTreeNodeKind string
Expand All @@ -17,6 +19,7 @@ const (
FetchTreeNodeKindSingle FetchTreeNodeKind = "Single"
FetchTreeNodeKindSequence FetchTreeNodeKind = "Sequence"
FetchTreeNodeKindParallel FetchTreeNodeKind = "Parallel"
FetchTreeNodeKindTrigger FetchTreeNodeKind = "Trigger"
)

func Sequence(children ...*FetchTreeNode) *FetchTreeNode {
Expand Down Expand Up @@ -146,6 +149,7 @@ func (n *FetchTreeNode) Trace() *FetchTreeTraceNode {
type FetchTreeQueryPlanNode struct {
Version string `json:"version,omitempty"`
Kind FetchTreeNodeKind `json:"kind"`
Trigger *FetchTreeQueryPlan `json:"trigger,omitempty"`
Children []*FetchTreeQueryPlanNode `json:"children,omitempty"`
Fetch *FetchTreeQueryPlan `json:"fetch,omitempty"`
}
Expand All @@ -165,8 +169,23 @@ func (n *FetchTreeNode) QueryPlan() *FetchTreeQueryPlanNode {
if n == nil {
return nil
}

plan := n.queryPlan()
plan.Version = "1"

if n.Trigger != nil && n.Trigger.Item != nil && n.Trigger.Item.Fetch != nil {
if f, ok := n.Trigger.Item.Fetch.(*SingleFetch); ok {
plan.Trigger = &FetchTreeQueryPlan{
Kind: "Trigger",
Path: n.Trigger.Item.ResponsePath,
SubgraphName: f.Info.DataSourceName,
SubgraphID: f.Info.DataSourceID,
FetchID: f.FetchDependencies.FetchID,
Query: f.Info.QueryPlan.Query,
}
}
}

return plan
}

Expand Down
56 changes: 56 additions & 0 deletions v2/pkg/engine/resolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,34 @@ func (r *Resolver) ResolveGraphQLSubscription(ctx *Context, subscription *GraphQ
msg := []byte(`{"errors":[{"message":"invalid input"}]}`)
return writeFlushComplete(writer, msg)
}

// If SkipLoader is enabled, we skip retrieving actual data. For example, this is useful when requesting a query plan.
// By returning early, we avoid starting a subscription and resolve with empty data instead.
if ctx.ExecutionOptions.SkipLoader {
t := newTools(r.options, r.allowedErrorExtensionFields, r.allowedErrorFields)

err = t.resolvable.InitSubscription(ctx, nil, subscription.Trigger.PostProcessing)
if err != nil {
return err
}

buf := &bytes.Buffer{}
err = t.resolvable.Resolve(ctx.ctx, subscription.Response.Data, subscription.Response.Fetches, buf)
if err != nil {
return err
}

if _, err = writer.Write(buf.Bytes()); err != nil {
return err
}
if err = writer.Flush(); err != nil {
return err
}
writer.Complete()

return nil
}

xxh := pool.Hash64.Get()
defer pool.Hash64.Put(xxh)
err = subscription.Trigger.Source.UniqueRequestID(ctx, input, xxh)
Expand Down Expand Up @@ -921,6 +949,34 @@ func (r *Resolver) AsyncResolveGraphQLSubscription(ctx *Context, subscription *G
msg := []byte(`{"errors":[{"message":"invalid input"}]}`)
return writeFlushComplete(writer, msg)
}

// If SkipLoader is enabled, we skip retrieving actual data. For example, this is useful when requesting a query plan.
// By returning early, we avoid starting a subscription and resolve with empty data instead.
if ctx.ExecutionOptions.SkipLoader {
t := newTools(r.options, r.allowedErrorExtensionFields, r.allowedErrorFields)

err = t.resolvable.InitSubscription(ctx, nil, subscription.Trigger.PostProcessing)
if err != nil {
return err
}

buf := &bytes.Buffer{}
err = t.resolvable.Resolve(ctx.ctx, subscription.Response.Data, subscription.Response.Fetches, buf)
if err != nil {
return err
}

if _, err = writer.Write(buf.Bytes()); err != nil {
return err
}
if err = writer.Flush(); err != nil {
return err
}
writer.Complete()

return nil
}

xxh := pool.Hash64.Get()
defer pool.Hash64.Put(xxh)
err = subscription.Trigger.Source.UniqueRequestID(ctx, input, xxh)
Expand Down
Loading

0 comments on commit 34cc4fa

Please sign in to comment.