Skip to content

Commit

Permalink
Merge pull request #795 from lcl533/dev_updaterootnodestatus
Browse files Browse the repository at this point in the history
Add switch function for one2cluster notready status upload
  • Loading branch information
duanmengkk authored Jan 6, 2025
2 parents dc4bc7e + d2e901f commit 849a934
Show file tree
Hide file tree
Showing 6 changed files with 351 additions and 16 deletions.
4 changes: 4 additions & 0 deletions cmd/clustertree/cluster-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ type Options struct {
BackoffOpts flags.BackoffOptions

SyncPeriod time.Duration

//Add notready status upload part for one2cluster in UpdateRootNodeStatus
UpdateRootNodeStatusNotready bool
}

func NewOptions() (*Options, error) {
Expand Down Expand Up @@ -87,6 +90,7 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
flags.StringSliceVar(&o.AutoCreateMCSPrefix, "auto-mcs-prefix", []string{}, "The prefix of namespace for service to auto create mcs resources")
flags.StringSliceVar(&o.ReservedNamespaces, "reserved-namespaces", []string{"kube-system"}, "The namespaces protected by Kosmos that the controller-manager will skip.")
flags.DurationVar(&o.SyncPeriod, "sync-period", 0, "the sync period for informer to resync.")
flags.BoolVar(&o.UpdateRootNodeStatusNotready, "Update-RootNode-Status-Notready", false, "Turn on or off add notready status upload part for one2cluster in UpdateRootNodeStatus")
o.RateLimiterOpts.AddFlags(flags)
o.BackoffOpts.AddFlags(flags)
options.BindLeaderElectionFlags(&o.LeaderElection, flags)
Expand Down
2 changes: 1 addition & 1 deletion pkg/clustertree/cluster-manager/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ func (c *ClusterController) Reconcile(ctx context.Context, request reconcile.Req
if err != nil {
return reconcile.Result{}, fmt.Errorf("new manager with err %v, cluster %s", err, cluster.Name)
}

leafModelHandler := leafUtils.NewLeafModelHandler(cluster, c.RootClientset, leafClient)
c.LeafModelHandler = leafModelHandler

Expand Down Expand Up @@ -286,6 +285,7 @@ func (c *ClusterController) setupControllers(
}

nodeLeaseController := controllers.NewNodeLeaseController(leafClientset, c.Root, nodes, leafNodeSelector, c.RootClientset, c.LeafModelHandler)
nodeLeaseController.Options = c.Options
if err := mgr.Add(nodeLeaseController); err != nil {
return fmt.Errorf("error starting %s: %v", controllers.NodeLeaseControllerName, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kosmos.io/kosmos/cmd/clustertree/cluster-manager/app/options"
kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils"
)
Expand All @@ -27,7 +28,8 @@ const (
DefaultLeaseDuration = 40
DefaultRenewIntervalFraction = 0.25

DefaultNodeStatusUpdateInterval = 1 * time.Minute
//DefaultNodeStatusUpdateInterval = 1 * time.Minute
leafClusterupdateInterval = 10 * time.Second
)

type NodeLeaseController struct {
Expand All @@ -42,6 +44,9 @@ type NodeLeaseController struct {
nodes []*corev1.Node
LeafNodeSelectors map[string]kosmosv1alpha1.NodeSelector
nodeLock sync.Mutex
Options *options.Options
// eventRecorder record.EventRecorder
// eventBroadcaster record.EventBroadcaster
}

func NewNodeLeaseController(leafClient kubernetes.Interface, root client.Client, nodes []*corev1.Node, LeafNodeSelectors map[string]kosmosv1alpha1.NodeSelector, rootClient kubernetes.Interface, LeafModelHandler leafUtils.LeafModelHandler) *NodeLeaseController {
Expand All @@ -53,7 +58,8 @@ func NewNodeLeaseController(leafClient kubernetes.Interface, root client.Client,
LeafModelHandler: LeafModelHandler,
LeafNodeSelectors: LeafNodeSelectors,
leaseInterval: getRenewInterval(),
statusInterval: DefaultNodeStatusUpdateInterval,
//statusInterval: DefaultNodeStatusUpdateInterval,
statusInterval: leafClusterupdateInterval,
}
return c
}
Expand All @@ -74,9 +80,18 @@ func (c *NodeLeaseController) syncNodeStatus(ctx context.Context) {
}
c.nodeLock.Unlock()

err := c.updateNodeStatus(ctx, nodes, c.LeafNodeSelectors)
if err != nil {
klog.Errorf(err.Error())
klog.Infof("Starting to update node status to notready in root cluster.")
if c.Options.UpdateRootNodeStatusNotready {
err := c.updateNodeStatusAddNotready(ctx, nodes, c.LeafNodeSelectors)
if err != nil {
klog.Errorf("Could not update node status in root cluster,Error: %v", err)
}
}
if !c.Options.UpdateRootNodeStatusNotready {
err := c.updateNodeStatus(ctx, nodes, c.LeafNodeSelectors)
if err != nil {
klog.Errorf(err.Error())
}
}
}

Expand All @@ -89,6 +104,15 @@ func (c *NodeLeaseController) updateNodeStatus(ctx context.Context, n []*corev1.
return nil
}

// nolint
func (c *NodeLeaseController) updateNodeStatusAddNotready(ctx context.Context, n []*corev1.Node, leafNodeSelector map[string]kosmosv1alpha1.NodeSelector) error {
err := c.LeafModelHandler.UpdateRootNodeStatusAddNotready(ctx, n, leafNodeSelector)
if err != nil {
klog.Errorf("Could not update node status add notreadyin root cluster,Error: %v", err)
}
return nil
}

func (c *NodeLeaseController) syncLease(ctx context.Context) {
nodes := make([]*corev1.Node, 0)
c.nodeLock.Lock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ func (c *NodeResourcesController) Reconcile(ctx context.Context, request reconci
}

clone := nodeInRoot.DeepCopy()
clone.Status.Conditions = utils.NodeConditions()
// When Clone.status.Conditions is empty, it is set to Utils.NodeConditions ()
if len(clone.Status.Conditions) == 0 {
clone.Status.Conditions = utils.NodeConditions()
}

// Node2Node mode should sync leaf node's labels and annotations to root nodeInRoot
if c.LeafModelHandler.GetLeafMode() == leafUtils.Node {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package utils

import (
"sync"
"time"

corev1 "k8s.io/api/core/v1"

kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
)

type clusterData struct {
// readyCondition is the last observed ready condition of the cluster.
readyCondition corev1.ConditionStatus
// thresholdStartTime is the time that the ready condition changed.
thresholdStartTime time.Time
}

func (c *clusterConditionStore) thresholdAdjustedReadyCondition(cluster *kosmosv1alpha1.Cluster, nodeInRoot *corev1.Node, observedReadyConditions []corev1.NodeCondition, clusterFailureThreshold time.Duration, clusterSuccessThreshold time.Duration) []corev1.NodeCondition {
c.successThreshold = clusterSuccessThreshold
c.failureThreshold = clusterFailureThreshold
//Find the ready condition of the node todo: optimize the code format
observedReadyCondition := FindStatusCondition(observedReadyConditions)
if observedReadyCondition == nil {
return observedReadyConditions
}
//Get the current status of the cluster (rootnode)
curReadyConditions := nodeInRoot.Status.Conditions
curReadyCondition := FindStatusCondition(curReadyConditions)
//Get the saved data
saved := c.get(cluster.Name)
//Check whether it is a newly added cluster
if saved == nil {
// the cluster is just joined
c.update(cluster.Name, &clusterData{
readyCondition: observedReadyCondition.Status,
})
return observedReadyConditions
}
//Check if the state has changed
now := time.Now()
if saved.readyCondition != observedReadyCondition.Status {
// ready condition status changed, record the threshold start time
saved = &clusterData{
readyCondition: observedReadyCondition.Status,
thresholdStartTime: now,
}
c.update(cluster.Name, saved)
}
//Setting time thresholds
var threshold time.Duration
if observedReadyCondition.Status == corev1.ConditionTrue {
threshold = c.successThreshold
} else {
threshold = c.failureThreshold
}
if ((observedReadyCondition.Status == corev1.ConditionTrue && curReadyCondition.Status != corev1.ConditionTrue) ||
(observedReadyCondition.Status != corev1.ConditionTrue && curReadyCondition.Status == corev1.ConditionTrue)) &&
now.Before(saved.thresholdStartTime.Add(threshold)) {
// retain old status until threshold exceeded to avoid network unstable problems.
return curReadyConditions
}
return observedReadyConditions
}

// FindStatusCondition finds the conditionType in conditions.
func FindStatusCondition(conditions []corev1.NodeCondition) *corev1.NodeCondition {
for i := range conditions {
if conditions[i].Type == "Ready" {
return &conditions[i]
}
}
return nil
}

func (c *clusterConditionStore) get(cluster string) *clusterData {
condition, ok := c.clusterDataMap.Load(cluster)
if !ok {
return nil
}
return condition.(*clusterData)
}

func (c *clusterConditionStore) update(cluster string, data *clusterData) {
// ready condition status changed, record the threshold start time
c.clusterDataMap.Store(cluster, data)
}

type clusterConditionStore struct {
clusterDataMap sync.Map
successThreshold time.Duration
// failureThreshold is the duration of failure for the cluster to be considered unhealthy.
failureThreshold time.Duration
}
Loading

0 comments on commit 849a934

Please sign in to comment.