Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add graph-based pod stop #25169

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 29 additions & 25 deletions libpod/container_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,34 +85,23 @@ func (c *Container) initUnlocked(ctx context.Context, recursive bool) error {
// Start requires that all dependency containers (e.g. pod infra containers) are
// running before starting the container. The recursive parameter, if set, will start all
// dependencies before starting this container.
func (c *Container) Start(ctx context.Context, recursive bool) (finalErr error) {
if !c.batched {
c.lock.Lock()
defer c.lock.Unlock()

// defer's are executed LIFO so we are locked here
// as long as we call this after the defer unlock()
defer func() {
if finalErr != nil {
if err := saveContainerError(c, finalErr); err != nil {
logrus.Debug(err)
}
}
}()

if err := c.syncContainer(); err != nil {
return err
func (c *Container) Start(ctx context.Context, recursive bool) error {
// Have to lock the pod the container is a part of.
// This prevents running `podman start` at the same time a
// `podman pod stop` is running, which could lead to weird races.
// Pod locks come before container locks, so do this first.
if c.config.Pod != "" {
// If we get an error, the pod was probably removed.
// So we get an expected ErrCtrRemoved instead of ErrPodRemoved,
// just ignore this and move on to syncing the container.
pod, _ := c.runtime.state.Pod(c.config.Pod)
if pod != nil {
pod.lock.Lock()
defer pod.lock.Unlock()
}
}
if err := c.prepareToStart(ctx, recursive); err != nil {
return err
}

// Start the container
if err := c.start(); err != nil {
return err
}
return c.waitForHealthy(ctx)
return c.startNoPodLock(ctx, recursive)
}

// Update updates the given container.
Expand Down Expand Up @@ -294,6 +283,21 @@ func (c *Container) Stop() error {
// manually. If timeout is 0, SIGKILL will be used immediately to kill the
// container.
func (c *Container) StopWithTimeout(timeout uint) (finalErr error) {
// Have to lock the pod the container is a part of.
// This prevents running `podman stop` at the same time a
// `podman pod start` is running, which could lead to weird races.
// Pod locks come before container locks, so do this first.
if c.config.Pod != "" {
// If we get an error, the pod was probably removed.
// So we get an expected ErrCtrRemoved instead of ErrPodRemoved,
// just ignore this and move on to syncing the container.
pod, _ := c.runtime.state.Pod(c.config.Pod)
if pod != nil {
pod.lock.Lock()
defer pod.lock.Unlock()
}
}

if !c.batched {
c.lock.Lock()
defer c.lock.Unlock()
Expand Down
243 changes: 196 additions & 47 deletions libpod/container_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@ package libpod

import (
"context"
"errors"
"fmt"
"strings"
"sync"

"github.com/containers/podman/v5/libpod/define"
"github.com/containers/podman/v5/pkg/parallel"
"github.com/sirupsen/logrus"
)

type containerNode struct {
lock sync.Mutex
id string
container *Container
dependsOn []*containerNode
Expand Down Expand Up @@ -284,99 +288,244 @@ func startNode(ctx context.Context, node *containerNode, setError bool, ctrError
}
}

// Visit a node on the container graph and remove it, or set an error if it
// failed to remove. Only intended for use in pod removal; do *not* use when
// removing individual containers.
// All containers are assumed to be *UNLOCKED* on running this function.
// Container locks will be acquired as necessary.
// Pod and infraID are optional. If a pod is given it must be *LOCKED*.
func removeNode(ctx context.Context, node *containerNode, pod *Pod, force bool, timeout *uint, setError bool, ctrErrors map[string]error, ctrsVisited map[string]bool, ctrNamedVolumes map[string]*ContainerNamedVolume) {
// Contains all details required for traversing the container graph.
type nodeTraversal struct {
// Protects reads and writes to the two maps.
lock sync.Mutex
// Optional. but *MUST* be locked.
// Should NOT be changed once a traversal is started.
pod *Pod
// Function to execute on the individual container being acted on.
// Should NOT be changed once a traversal is started.
actionFunc func(ctr *Container, pod *Pod) error
// Shared list of errors for all containers currently acted on.
ctrErrors map[string]error
// Shared list of what containers have been visited.
ctrsVisited map[string]bool
}

// Perform a traversal of the graph in an inwards direction - meaning from nodes
// with no dependencies, recursing inwards to the nodes they depend on.
// Safe to run in parallel on multiple nodes.
func traverseNodeInwards(node *containerNode, nodeDetails *nodeTraversal, setError bool) {
node.lock.Lock()

// If we already visited this node, we're done.
if ctrsVisited[node.id] {
nodeDetails.lock.Lock()
visited := nodeDetails.ctrsVisited[node.id]
nodeDetails.lock.Unlock()
if visited {
node.lock.Unlock()
return
}

// Someone who depends on us failed.
// Mark us as failed and recurse.
if setError {
ctrsVisited[node.id] = true
ctrErrors[node.id] = fmt.Errorf("a container that depends on container %s could not be removed: %w", node.id, define.ErrCtrStateInvalid)
nodeDetails.lock.Lock()
nodeDetails.ctrsVisited[node.id] = true
nodeDetails.ctrErrors[node.id] = fmt.Errorf("a container that depends on container %s could not be stopped: %w", node.id, define.ErrCtrStateInvalid)
nodeDetails.lock.Unlock()

node.lock.Unlock()

// Hit anyone who depends on us, set errors there as well.
for _, successor := range node.dependsOn {
removeNode(ctx, successor, pod, force, timeout, true, ctrErrors, ctrsVisited, ctrNamedVolumes)
traverseNodeInwards(successor, nodeDetails, true)
}

return
}

// Does anyone still depend on us?
// Cannot remove if true. Once all our dependencies have been removed,
// we will be removed.
// Cannot stop if true. Once all our dependencies have been stopped,
// we will be stopped.
for _, dep := range node.dependedOn {
// The container that depends on us hasn't been removed yet.
// OK to continue on
if ok := ctrsVisited[dep.id]; !ok {
nodeDetails.lock.Lock()
ok := nodeDetails.ctrsVisited[dep.id]
nodeDetails.lock.Unlock()
if !ok {
node.lock.Unlock()
return
}
}

// Going to try to remove the node, mark us as visited
ctrsVisited[node.id] = true

ctrErrored := false
if err := nodeDetails.actionFunc(node.container, nodeDetails.pod); err != nil {
ctrErrored = true
nodeDetails.lock.Lock()
nodeDetails.ctrErrors[node.id] = err
nodeDetails.lock.Unlock()
}

// Verify that all that depend on us are gone.
// Graph traversal should guarantee this is true, but this isn't that
// expensive, and it's better to be safe.
for _, dep := range node.dependedOn {
if _, err := node.container.runtime.GetContainer(dep.id); err == nil {
ctrErrored = true
ctrErrors[node.id] = fmt.Errorf("a container that depends on container %s still exists: %w", node.id, define.ErrDepExists)
}
// Mark as visited *only after* finished with operation.
// This ensures that the operation has completed, one way or the other.
// If an error was set, only do this after the viral ctrErrored
// propagates in traverseNodeInwards below.
// Same with the node lock - we don't want to release it until we are
// marked as visited.
if !ctrErrored {
nodeDetails.lock.Lock()
nodeDetails.ctrsVisited[node.id] = true
nodeDetails.lock.Unlock()

node.lock.Unlock()
}

// Lock the container
node.container.lock.Lock()
// Recurse to anyone who we depend on and work on them
for _, successor := range node.dependsOn {
traverseNodeInwards(successor, nodeDetails, ctrErrored)
}

// Gate all subsequent bits behind a ctrErrored check - we don't want to
// proceed if a previous step failed.
if !ctrErrored {
if err := node.container.syncContainer(); err != nil {
ctrErrored = true
ctrErrors[node.id] = err
// If we propagated an error, finally mark us as visited here, after
// all nodes we traverse to have already been marked failed.
// If we don't do this, there is a race condition where a node could try
// and perform its operation before it was marked failed by the
// traverseNodeInwards triggered by this process.
if ctrErrored {
nodeDetails.lock.Lock()
nodeDetails.ctrsVisited[node.id] = true
nodeDetails.lock.Unlock()

node.lock.Unlock()
}
}

// Stop all containers in the given graph, assumed to be a graph of pod.
// Pod is mandatory and should be locked.
func stopContainerGraph(ctx context.Context, graph *ContainerGraph, pod *Pod, timeout *uint, cleanup bool) (map[string]error, error) {
// Are there actually any containers in the graph?
// If not, return immediately.
if len(graph.nodes) == 0 {
return map[string]error{}, nil
}

nodeDetails := new(nodeTraversal)
nodeDetails.pod = pod
nodeDetails.ctrErrors = make(map[string]error)
nodeDetails.ctrsVisited = make(map[string]bool)

traversalFunc := func(ctr *Container, pod *Pod) error {
ctr.lock.Lock()

if err := ctr.syncContainer(); err != nil {
ctr.lock.Unlock()
return err
}

realTimeout := ctr.config.StopTimeout
if timeout != nil {
realTimeout = *timeout
}

if err := ctr.stop(realTimeout); err != nil && !errors.Is(err, define.ErrCtrStateInvalid) && !errors.Is(err, define.ErrCtrStopped) {
ctr.lock.Unlock()
return err
}

ctr.lock.Unlock()

if cleanup {
return ctr.Cleanup(ctx, false)
}

return nil
}
nodeDetails.actionFunc = traversalFunc

if !ctrErrored {
for _, vol := range node.container.config.NamedVolumes {
doneChans := make([]<-chan error, 0, len(graph.notDependedOnNodes))

// Parallel enqueue jobs for all our starting nodes.
if len(graph.notDependedOnNodes) == 0 {
return nil, fmt.Errorf("no containers in pod %s are not dependencies of other containers, unable to stop", pod.ID())
}
for _, node := range graph.notDependedOnNodes {
doneChan := parallel.Enqueue(ctx, func() error {
traverseNodeInwards(node, nodeDetails, false)
return nil
})
doneChans = append(doneChans, doneChan)
}

// We don't care about the returns values, these functions always return nil
// But we do need all of the parallel jobs to terminate.
for _, doneChan := range doneChans {
<-doneChan
}

return nodeDetails.ctrErrors, nil
}

// Remove all containers in the given graph
// Pod is optional, and must be locked if given.
func removeContainerGraph(ctx context.Context, graph *ContainerGraph, pod *Pod, timeout *uint, force bool) (map[string]*ContainerNamedVolume, map[string]bool, map[string]error, error) {
// Are there actually any containers in the graph?
// If not, return immediately.
if len(graph.nodes) == 0 {
return nil, nil, nil, nil
}

nodeDetails := new(nodeTraversal)
nodeDetails.pod = pod
nodeDetails.ctrErrors = make(map[string]error)
nodeDetails.ctrsVisited = make(map[string]bool)

ctrNamedVolumes := make(map[string]*ContainerNamedVolume)

traversalFunc := func(ctr *Container, pod *Pod) error {
ctr.lock.Lock()
defer ctr.lock.Unlock()

if err := ctr.syncContainer(); err != nil {
return err
}

for _, vol := range ctr.config.NamedVolumes {
ctrNamedVolumes[vol.Name] = vol
}

if pod != nil && pod.state.InfraContainerID == node.id {
if pod != nil && pod.state.InfraContainerID == ctr.ID() {
pod.state.InfraContainerID = ""
if err := pod.save(); err != nil {
ctrErrored = true
ctrErrors[node.id] = fmt.Errorf("error removing infra container %s from pod %s: %w", node.id, pod.ID(), err)
return fmt.Errorf("error removing infra container %s from pod %s: %w", ctr.ID(), pod.ID(), err)
}
}
}

if !ctrErrored {
opts := ctrRmOpts{
Force: force,
RemovePod: true,
Timeout: timeout,
}

if _, _, err := node.container.runtime.removeContainer(ctx, node.container, opts); err != nil {
ctrErrored = true
ctrErrors[node.id] = err
if _, _, err := ctr.runtime.removeContainer(ctx, ctr, opts); err != nil {
return err
}

return nil
}
nodeDetails.actionFunc = traversalFunc

node.container.lock.Unlock()
doneChans := make([]<-chan error, 0, len(graph.notDependedOnNodes))

// Recurse to anyone who we depend on and remove them
for _, successor := range node.dependsOn {
removeNode(ctx, successor, pod, force, timeout, ctrErrored, ctrErrors, ctrsVisited, ctrNamedVolumes)
// Parallel enqueue jobs for all our starting nodes.
if len(graph.notDependedOnNodes) == 0 {
return nil, nil, nil, fmt.Errorf("no containers in graph are not dependencies of other containers, unable to stop")
}
for _, node := range graph.notDependedOnNodes {
doneChan := parallel.Enqueue(ctx, func() error {
traverseNodeInwards(node, nodeDetails, false)
return nil
})
doneChans = append(doneChans, doneChan)
}

// We don't care about the returns values, these functions always return nil
// But we do need all of the parallel jobs to terminate.
for _, doneChan := range doneChans {
<-doneChan
}

return ctrNamedVolumes, nodeDetails.ctrsVisited, nodeDetails.ctrErrors, nil
}
Loading