Skip to content

Commit

Permalink
terminate graph when there is a hard stop request
Browse files Browse the repository at this point in the history
  • Loading branch information
dsa0x committed Jan 8, 2025
1 parent 169d18e commit 562ef80
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 6 deletions.
14 changes: 11 additions & 3 deletions internal/backend/local/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,16 @@ func (runner *TestFileRunner) Test(file *moduletest.File) {

// walk and execute the graph
diags = runner.walkGraph(graph)

// If the graph walk was terminated, we don't want to add the diagnostics.
// The error the user receives will just be:
// Failure! 0 passed, 1 failed.
// exit status 1
if diags.HasErrors() && diags.Err().Error() == dag.GraphTerminatedError.Error() {
log.Printf("[TRACE] TestFileRunner: graph walk terminated for %s due to error: %s", file.Name, dag.GraphTerminatedError)
return
}

file.Diagnostics = file.Diagnostics.Append(diags)
}

Expand Down Expand Up @@ -363,6 +373,7 @@ func (runner *TestFileRunner) walkGraph(g *terraform.Graph) tfdiags.Diagnostics
// just mark the overall file status has having errored to indicate
// it was interrupted.
file.Status = file.Status.Merge(moduletest.Error)
g.AcyclicGraph.Terminate()
return
}

Expand All @@ -371,7 +382,6 @@ func (runner *TestFileRunner) walkGraph(g *terraform.Graph) tfdiags.Diagnostics
// following test as skipped, print the status, and move on.
run.Status = moduletest.Skip
runner.Suite.View.Run(run, file, moduletest.Complete, 0)
// continue
return
}

Expand All @@ -381,7 +391,6 @@ func (runner *TestFileRunner) walkGraph(g *terraform.Graph) tfdiags.Diagnostics
// skipped, print the status, and move on.
run.Status = moduletest.Skip
runner.Suite.View.Run(run, file, moduletest.Complete, 0)
// continue
return
}

Expand All @@ -405,7 +414,6 @@ func (runner *TestFileRunner) walkGraph(g *terraform.Graph) tfdiags.Diagnostics

run.Status = moduletest.Error
file.Status = moduletest.Error
// continue // Abort!
return
}

Expand Down
38 changes: 36 additions & 2 deletions internal/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package dag

import (
"context"
"errors"
"fmt"
"sort"
Expand All @@ -15,8 +16,12 @@ import (
// AcyclicGraph is a specialization of Graph that cannot have cycles.
type AcyclicGraph struct {
Graph

walker *Walker
}

var GraphTerminatedError = errors.New("graph walk terminated")

// WalkFunc is the callback used for walking the graph.
type WalkFunc func(Vertex) tfdiags.Diagnostics

Expand Down Expand Up @@ -273,9 +278,38 @@ func (g *AcyclicGraph) Cycles() [][]Vertex {
// This will walk nodes in parallel if it can. The resulting diagnostics
// contains problems from all graphs visited, in no particular order.
func (g *AcyclicGraph) Walk(cb WalkFunc) tfdiags.Diagnostics {
w := &Walker{Callback: cb, Reverse: true}
ctx, cancel := context.WithCancelCause(context.Background())
w := &Walker{Callback: cb, Reverse: true, walkContext: ctx, walkContextCancel: cancel}
g.walker = w

w.Update(g)
return w.Wait()
// Start a goroutine to wait for all vertices to return.
// This allows us to return immediately while the walk completes.
doneCh := make(chan tfdiags.Diagnostics)
go func() {
doneCh <- w.Wait()
close(doneCh)
}()

// we wait for either the walk to complete or the context to be cancelled
for {
select {
case diags := <-doneCh:
return diags
case <-w.walkContext.Done():
err := context.Cause(w.walkContext)
var diags tfdiags.Diagnostics
return diags.Append(err)
}
}
}

// Terminate is a hard stop for the ongoing graph walk. It will stop the walk immediately,
// and return a GraphTerminatedError.
func (g *AcyclicGraph) Terminate() {
if g.walker != nil {
g.walker.walkContextCancel(GraphTerminatedError)
}
}

// simple convenience helper for converting a dag.Set to a []Vertex
Expand Down
2 changes: 1 addition & 1 deletion internal/dag/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func newMarshalGraph(name string, g *Graph) *marshalGraph {

sort.Sort(edges(mg.Edges))

for _, c := range (&AcyclicGraph{*g}).Cycles() {
for _, c := range (&AcyclicGraph{Graph: *g}).Cycles() {
var cycle []*marshalVertex
for _, v := range c {
mv := newMarshalVertex(v)
Expand Down
7 changes: 7 additions & 0 deletions internal/dag/walk.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package dag

import (
"context"
"errors"
"log"
"sync"
Expand Down Expand Up @@ -65,6 +66,9 @@ type Walker struct {
diagsMap map[Vertex]tfdiags.Diagnostics
upstreamFailed map[Vertex]struct{}
diagsLock sync.Mutex

walkContext context.Context
walkContextCancel context.CancelCauseFunc
}

func (w *Walker) init() {
Expand Down Expand Up @@ -346,6 +350,9 @@ func (w *Walker) walkVertex(v Vertex, info *walkerVertex) {

case <-depsUpdateCh:
// New deps, reloop
case <-w.walkContext.Done():
// Context cancelled. return immediately.
return
}

// Check if we have updated dependencies. This can happen if the
Expand Down

0 comments on commit 562ef80

Please sign in to comment.