Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,14 @@ type TrafficRouterPlugin interface {
RemoveManagedRoutes(ro *v1alpha1.Rollout) RpcError
// Type returns the type of the traffic routing reconciler
Type() string
// CanScaleDown checks if it is safe to scale down the ReplicaSet identified by the given pod template hash.
// This allows traffic routing plugins to delay scale-down until external systems (e.g., long-running
// connections, worker versioning, message queue consumers) have completed draining.
// Returns:
// - ScaleDownVerified: scale-down is safe
// - ScaleDownNotVerified: scale-down is NOT safe yet, controller should retry later
// - ScaleDownNotImplemented: plugin does not implement this check, default behavior applies
CanScaleDown(rollout *v1alpha1.Rollout, podTemplateHash string) (RpcScaleDownVerified, RpcError)
}

type StepPlugin interface {
Expand Down
11 changes: 10 additions & 1 deletion rollout/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,16 @@ func (c *rolloutContext) scaleDownOldReplicaSetsForCanary(oldRSs []*appsv1.Repli
// and doesn't yet have scale down deadline. This happens when a user changes their
// mind in the middle of an V1 -> V2 update, and then applies a V3. We are deciding
// what to do with the defunct, intermediate V2 ReplicaSet right now.
// It is safe to scale the intermediate RS down, since no traffic is directed to it.
// Check if traffic routers allow scale-down (e.g., for drain completion).
// While no new traffic is directed to this RS, external systems may still
// have work running on it that needs to complete.
podTemplateHash := replicasetutil.GetPodTemplateHash(targetRS)
canScaleDown, _ := c.canScaleDownRS(podTemplateHash)
if canScaleDown != nil && !*canScaleDown {
c.log.Infof("Intermediate RS '%s' scale-down blocked by traffic router plugin", targetRS.Name)
desiredReplicaCount = *targetRS.Spec.Replicas
continue
}
c.log.Infof("scaling down intermediate RS '%s'", targetRS.Name)
}
}
Expand Down
30 changes: 30 additions & 0 deletions rollout/mocks/TrafficRoutingReconciler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 75 additions & 0 deletions rollout/replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (
"k8s.io/kubernetes/pkg/controller"

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/utils/conditions"
"github.com/argoproj/argo-rollouts/utils/defaults"
"github.com/argoproj/argo-rollouts/utils/record"
replicasetutil "github.com/argoproj/argo-rollouts/utils/replicaset"
serviceutil "github.com/argoproj/argo-rollouts/utils/service"
timeutil "github.com/argoproj/argo-rollouts/utils/time"
Expand Down Expand Up @@ -322,6 +324,16 @@ func (c *rolloutContext) scaleDownDelayHelper(rs *appsv1.ReplicaSet, annotatione
annotationedRSs++
if annotationedRSs > scaleDownRevisionLimit {
c.log.Infof("At ScaleDownDelayRevisionLimit (%d) and scaling down the rest", scaleDownRevisionLimit)
// Even when exceeding revision limit, check if traffic routers block scale-down
podTemplateHash := replicasetutil.GetPodTemplateHash(rs)
canScaleDown, _ := c.canScaleDownRS(podTemplateHash)
if canScaleDown != nil && !*canScaleDown {
c.log.Infof("RS '%s' scale-down blocked by traffic router plugin (exceeds revision limit)", rs.Name)
logCtx := logutil.WithRollout(c.rollout)
logCtx.Info("rollout enqueue due to traffic router blocking scale-down")
c.enqueueRolloutAfter(c.rollout, defaults.GetRolloutVerifyRetryInterval())
desiredReplicaCount = rolloutReplicas
}
} else {
remainingTime, err := replicasetutil.GetTimeRemainingBeforeScaleDownDeadline(rs)
if err != nil {
Expand All @@ -334,13 +346,76 @@ func (c *rolloutContext) scaleDownDelayHelper(rs *appsv1.ReplicaSet, annotatione
c.enqueueRolloutAfter(c.rollout, *remainingTime)
}
desiredReplicaCount = rolloutReplicas
} else {
// Scale-down deadline has passed, but check if traffic routers block scale-down
podTemplateHash := replicasetutil.GetPodTemplateHash(rs)
canScaleDown, _ := c.canScaleDownRS(podTemplateHash)
if canScaleDown != nil && !*canScaleDown {
c.log.Infof("RS '%s' scale-down blocked by traffic router plugin", rs.Name)
// Requeue to check again later
logCtx := logutil.WithRollout(c.rollout)
logCtx.Info("rollout enqueue due to traffic router blocking scale-down")
c.enqueueRolloutAfter(c.rollout, defaults.GetRolloutVerifyRetryInterval())
desiredReplicaCount = rolloutReplicas
}
}
}
}

return annotationedRSs, desiredReplicaCount, nil
}

// canScaleDownRS checks if any traffic routing plugins block scale-down for the given pod template hash.
// Returns:
// - nil: no traffic routers are configured, or all return "not implemented" (default behavior)
// - *true: all traffic routers that implement the check report scale-down is safe
// - *false: at least one traffic router reports scale-down is NOT safe
func (c *rolloutContext) canScaleDownRS(podTemplateHash string) (*bool, error) {
if c.rollout.Spec.Strategy.Canary == nil || c.rollout.Spec.Strategy.Canary.TrafficRouting == nil {
return nil, nil
}

reconcilers, err := c.newTrafficRoutingReconciler(c)
if err != nil {
c.recorder.Warnf(c.rollout, record.EventOptions{EventReason: conditions.ScaleDownCheckErrorReason}, conditions.ScaleDownCheckErrorMessage, err)
return nil, nil
}
if len(reconcilers) == 0 {
return nil, nil
}

var hasImplementation bool
for _, reconciler := range reconcilers {
canScaleDown, err := reconciler.CanScaleDown(podTemplateHash)
if err != nil {
if canScaleDown != nil {
// Plugin returned a result with an error - report it as an event
c.recorder.Warnf(c.rollout, record.EventOptions{EventReason: conditions.ScaleDownCheckErrorReason}, conditions.ScaleDownCheckErrorMessage, err)
}
// For backwards compatibility (old plugins) or errors, continue with next reconciler
continue
}
if canScaleDown == nil {
// This traffic router doesn't implement the check, skip
continue
}
hasImplementation = true
if !*canScaleDown {
// At least one traffic router blocks scale-down
return canScaleDown, nil
}
}

if !hasImplementation {
// No traffic router implements the check, use default behavior
return nil, nil
}

// All implementing traffic routers allow scale-down
canScaleDown := true
return &canScaleDown, nil
}

// isReplicaSetReferenced returns if the given ReplicaSet is still being referenced by any of
// the current, stable, blue-green services. Used to determine if the ReplicaSet can
// safely be scaled to zero, or deleted.
Expand Down
5 changes: 5 additions & 0 deletions rollout/trafficrouting/alb/alb.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ func (r *Reconciler) Type() string {
return Type
}

// CanScaleDown returns nil (not implemented) as ALB does not support scale-down checks
func (r *Reconciler) CanScaleDown(podTemplateHash string) (*bool, error) {
return nil, nil
}

// SetWeight modifies ALB Ingress resources to reach desired state
func (r *Reconciler) SetWeight(desiredWeight int32, additionalDestinations ...v1alpha1.WeightDestination) error {
if ingresses := r.cfg.Rollout.Spec.Strategy.Canary.TrafficRouting.ALB.Ingresses; ingresses != nil {
Expand Down
5 changes: 5 additions & 0 deletions rollout/trafficrouting/ambassador/ambassador.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ func (r *Reconciler) Type() string {
return Type
}

// CanScaleDown returns nil (not implemented) as Ambassador does not support scale-down checks
func (r *Reconciler) CanScaleDown(podTemplateHash string) (*bool, error) {
return nil, nil
}

func setMappingWeight(obj *unstructured.Unstructured, weight int32) {
unstructured.SetNestedField(obj.Object, int64(weight), "spec", "weight")
}
Expand Down
5 changes: 5 additions & 0 deletions rollout/trafficrouting/apisix/apisix.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,11 @@ func (r *Reconciler) Type() string {
return Type
}

// CanScaleDown returns nil (not implemented) as Apisix does not support scale-down checks
func (r *Reconciler) CanScaleDown(podTemplateHash string) (*bool, error) {
return nil, nil
}

func (r *Reconciler) SetMirrorRoute(setMirrorRoute *v1alpha1.SetMirrorRoute) error {
return nil
}
Expand Down
5 changes: 5 additions & 0 deletions rollout/trafficrouting/appmesh/appmesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,11 @@ func (r *Reconciler) Type() string {
return Type
}

// CanScaleDown returns nil (not implemented) as AppMesh does not support scale-down checks
func (r *Reconciler) CanScaleDown(podTemplateHash string) (*bool, error) {
return nil, nil
}

func getPodSelectorMatchLabels(vnode *unstructured.Unstructured) (map[string]any, error) {
m, found, err := unstructured.NestedMap(vnode.Object, "spec", "podSelector", "matchLabels")
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions rollout/trafficrouting/istio/istio.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,11 @@ func (r *Reconciler) Type() string {
return Type
}

// CanScaleDown returns nil (not implemented) as Istio does not support scale-down checks
func (r *Reconciler) CanScaleDown(podTemplateHash string) (*bool, error) {
return nil, nil
}

// SetWeight modifies Istio resources to reach desired state
func (r *Reconciler) SetWeight(desiredWeight int32, additionalDestinations ...v1alpha1.WeightDestination) error {
ctx := context.TODO()
Expand Down
5 changes: 5 additions & 0 deletions rollout/trafficrouting/nginx/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ func (r *Reconciler) Type() string {
return Type
}

// CanScaleDown returns nil (not implemented) as Nginx does not support scale-down checks
func (r *Reconciler) CanScaleDown(podTemplateHash string) (*bool, error) {
return nil, nil
}

func (r *Reconciler) buildCanaryIngress(stableIngress *networkingv1.Ingress, name string, desiredWeight int32) (*ingressutil.Ingress, error) {
stableIngressName := r.cfg.Rollout.Spec.Strategy.Canary.TrafficRouting.Nginx.StableIngress
stableServiceName := r.cfg.Rollout.Spec.Strategy.Canary.StableService
Expand Down
9 changes: 9 additions & 0 deletions rollout/trafficrouting/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,12 @@ func (r *Reconciler) RemoveManagedRoutes() error {
}
return nil
}

// CanScaleDown checks if it is safe to scale down the ReplicaSet identified by the given pod template hash
func (r *Reconciler) CanScaleDown(podTemplateHash string) (*bool, error) {
canScaleDown, errResp := r.TrafficRouterPlugin.CanScaleDown(r.Rollout, podTemplateHash)
if errResp.HasError() {
return canScaleDown.CanScaleDown(), fmt.Errorf("failed to check can scale down via plugin: %w", errResp)
}
return canScaleDown.CanScaleDown(), nil
}
39 changes: 39 additions & 0 deletions rollout/trafficrouting/plugin/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,28 @@ type RemoveManagedRoutesArgs struct {
Rollout v1alpha1.Rollout
}

type CanScaleDownArgs struct {
Rollout v1alpha1.Rollout
PodTemplateHash string
}

type VerifyWeightResponse struct {
Verified types.RpcVerified
Err types.RpcError
}

type CanScaleDownResponse struct {
CanScaleDown types.RpcScaleDownVerified
Err types.RpcError
}

func init() {
gob.RegisterName("UpdateHashArgs", new(UpdateHashArgs))
gob.RegisterName("SetWeightAndVerifyWeightArgs", new(SetWeightAndVerifyWeightArgs))
gob.RegisterName("SetHeaderArgs", new(SetHeaderArgs))
gob.RegisterName("SetMirrorArgs", new(SetMirrorArgs))
gob.RegisterName("RemoveManagedRoutesArgs", new(RemoveManagedRoutesArgs))
gob.RegisterName("CanScaleDownArgs", new(CanScaleDownArgs))
}

// TrafficRouterPlugin is the interface that we're exposing as a plugin. It needs to match metricproviders.Providers but we can
Expand Down Expand Up @@ -172,6 +183,20 @@ func (g *TrafficRouterPluginRPC) RemoveManagedRoutes(rollout *v1alpha1.Rollout)
return resp
}

// CanScaleDown checks if it is safe to scale down the ReplicaSet identified by the given pod template hash
func (g *TrafficRouterPluginRPC) CanScaleDown(rollout *v1alpha1.Rollout, podTemplateHash string) (types.RpcScaleDownVerified, types.RpcError) {
var resp CanScaleDownResponse
var args any = CanScaleDownArgs{
Rollout: *rollout,
PodTemplateHash: podTemplateHash,
}
err := g.client.Call("Plugin.CanScaleDown", &args, &resp)
if err != nil {
return types.ScaleDownNotImplemented, types.RpcError{ErrorString: fmt.Sprintf("CanScaleDown rpc call error: %s", err)}
}
return resp.CanScaleDown, resp.Err
}

// TrafficRouterRPCServer Here is the RPC server that MetricsPluginRPC talks to, conforming to
// the requirements of net/rpc
type TrafficRouterRPCServer struct {
Expand Down Expand Up @@ -257,6 +282,20 @@ func (s *TrafficRouterRPCServer) RemoveManagedRoutes(args any, resp *types.RpcEr
return nil
}

// CanScaleDown checks if it is safe to scale down the ReplicaSet identified by the given pod template hash
func (s *TrafficRouterRPCServer) CanScaleDown(args any, resp *CanScaleDownResponse) error {
canScaleDownArgs, ok := args.(*CanScaleDownArgs)
if !ok {
return fmt.Errorf("invalid args %s", args)
}
canScaleDown, err := s.Impl.CanScaleDown(&canScaleDownArgs.Rollout, canScaleDownArgs.PodTemplateHash)
*resp = CanScaleDownResponse{
CanScaleDown: canScaleDown,
Err: err,
}
return nil
}

// RpcTrafficRouterPlugin This is the implementation of plugin.Plugin so we can serve/consume
//
// This has two methods: Server must return an RPC server for this plugin
Expand Down
4 changes: 4 additions & 0 deletions rollout/trafficrouting/plugin/rpc/rpc_test_implementation.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,7 @@ func (r *testRpcPlugin) RemoveManagedRoutes(ro *v1alpha1.Rollout) types.RpcError
func (r *testRpcPlugin) Type() string {
return "TestRPCPlugin"
}

func (r *testRpcPlugin) CanScaleDown(ro *v1alpha1.Rollout, podTemplateHash string) (types.RpcScaleDownVerified, types.RpcError) {
return types.ScaleDownNotImplemented, types.RpcError{}
}
5 changes: 5 additions & 0 deletions rollout/trafficrouting/smi/smi.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ func (r *Reconciler) Type() string {
return Type
}

// CanScaleDown returns nil (not implemented) as SMI does not support scale-down checks
func (r *Reconciler) CanScaleDown(podTemplateHash string) (*bool, error) {
return nil, nil
}

// SetWeight creates and modifies traffic splits based on the desired weight
func (r *Reconciler) SetWeight(desiredWeight int32, additionalDestinations ...v1alpha1.WeightDestination) error {
// If TrafficSplitName not set, then set to Rollout name
Expand Down
5 changes: 5 additions & 0 deletions rollout/trafficrouting/traefik/traefik.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ func (r *Reconciler) Type() string {
return Type
}

// CanScaleDown returns nil (not implemented) as Traefik does not support scale-down checks
func (r *Reconciler) CanScaleDown(podTemplateHash string) (*bool, error) {
return nil, nil
}

func (r *Reconciler) SetMirrorRoute(setMirrorRoute *v1alpha1.SetMirrorRoute) error {
return nil
}
Expand Down
7 changes: 7 additions & 0 deletions rollout/trafficrouting/trafficroutingutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,11 @@ type TrafficRoutingReconciler interface {
RemoveManagedRoutes() error
// Type returns the type of the traffic routing reconciler
Type() string
// CanScaleDown checks if it is safe to scale down the ReplicaSet identified by the given pod template hash.
// This allows traffic routing plugins to delay scale-down until external systems have completed draining.
// Returns:
// - *true: scale-down is safe
// - *false: scale-down is NOT safe yet
// - nil: not implemented, default behavior applies
CanScaleDown(podTemplateHash string) (*bool, error)
}
Loading
Loading