Skip to content

Commit b7a02b7

Browse files
authored
Merge pull request kosmos-io#287 from qiuming520/main
feature: support one-to-many model for the leaf node feature
2 parents 244dd33 + 6fa9352 commit b7a02b7

13 files changed

+862
-297
lines changed

pkg/clustertree/cluster-manager/cluster_controller.go

+11-9
Original file line numberDiff line numberDiff line change
@@ -187,10 +187,10 @@ func (c *ClusterController) Reconcile(ctx context.Context, request reconcile.Req
187187
return reconcile.Result{}, fmt.Errorf("new manager with err %v, cluster %s", err, cluster.Name)
188188
}
189189

190-
leafModelHandler := leafUtils.NewLeafModelHandler(cluster, c.Root, mgr.GetClient(), c.RootClientset, leafClient)
190+
leafModelHandler := leafUtils.NewLeafModelHandler(cluster, c.RootClientset, leafClient)
191191
c.LeafModelHandler = leafModelHandler
192192

193-
nodes, err := c.createNode(ctx, cluster, leafClient)
193+
nodes, leafNodeSelectors, err := c.createNode(ctx, cluster, leafClient)
194194
if err != nil {
195195
return reconcile.Result{RequeueAfter: RequeueTime}, fmt.Errorf("create node with err %v, cluster %s", err, cluster.Name)
196196
}
@@ -206,7 +206,7 @@ func (c *ClusterController) Reconcile(ctx context.Context, request reconcile.Req
206206
c.ManagerCancelFuncs[cluster.Name] = &cancel
207207
c.ControllerManagersLock.Unlock()
208208

209-
if err = c.setupControllers(mgr, cluster, nodes, leafDynamic, leafClient, kosmosClient, config); err != nil {
209+
if err = c.setupControllers(mgr, cluster, nodes, leafDynamic, leafNodeSelectors, leafClient, kosmosClient, config); err != nil {
210210
return reconcile.Result{}, fmt.Errorf("failed to setup cluster %s controllers: %v", cluster.Name, err)
211211
}
212212

@@ -240,6 +240,7 @@ func (c *ClusterController) setupControllers(
240240
cluster *kosmosv1alpha1.Cluster,
241241
nodes []*corev1.Node,
242242
clientDynamic *dynamic.DynamicClient,
243+
leafNodeSelector map[string]kosmosv1alpha1.NodeSelector,
243244
leafClientset kubernetes.Interface,
244245
kosmosClient kosmosversioned.Interface,
245246
leafRestConfig *rest.Config) error {
@@ -262,14 +263,15 @@ func (c *ClusterController) setupControllers(
262263
Root: c.Root,
263264
RootClientset: c.RootClientset,
264265
Nodes: nodes,
266+
LeafNodeSelectors: leafNodeSelector,
265267
LeafModelHandler: c.LeafModelHandler,
266268
Cluster: cluster,
267269
}
268270
if err := nodeResourcesController.SetupWithManager(mgr); err != nil {
269271
return fmt.Errorf("error starting %s: %v", controllers.NodeResourcesControllerName, err)
270272
}
271273

272-
nodeLeaseController := controllers.NewNodeLeaseController(leafClientset, c.Root, nodes, c.RootClientset, c.LeafModelHandler)
274+
nodeLeaseController := controllers.NewNodeLeaseController(leafClientset, c.Root, nodes, leafNodeSelector, c.RootClientset, c.LeafModelHandler)
273275
if err := mgr.Add(nodeLeaseController); err != nil {
274276
return fmt.Errorf("error starting %s: %v", controllers.NodeLeaseControllerName, err)
275277
}
@@ -334,19 +336,19 @@ func (c *ClusterController) setupStorageControllers(mgr manager.Manager, isOne2O
334336
return nil
335337
}
336338

337-
func (c *ClusterController) createNode(ctx context.Context, cluster *kosmosv1alpha1.Cluster, leafClient kubernetes.Interface) ([]*corev1.Node, error) {
339+
func (c *ClusterController) createNode(ctx context.Context, cluster *kosmosv1alpha1.Cluster, leafClient kubernetes.Interface) ([]*corev1.Node, map[string]kosmosv1alpha1.NodeSelector, error) {
338340
serverVersion, err := leafClient.Discovery().ServerVersion()
339341
if err != nil {
340342
klog.Errorf("create node failed, can not connect to leaf %s", cluster.Name)
341-
return nil, err
343+
return nil, nil, err
342344
}
343345

344-
nodes, err := c.LeafModelHandler.CreateNodeInRoot(ctx, cluster, c.Options.ListenPort, serverVersion.GitVersion)
346+
nodes, leafNodeSelectors, err := c.LeafModelHandler.CreateRootNode(ctx, c.Options.ListenPort, serverVersion.GitVersion)
345347
if err != nil {
346348
klog.Errorf("create node for cluster %s failed, err: %v", cluster.Name, err)
347-
return nil, err
349+
return nil, nil, err
348350
}
349-
return nodes, nil
351+
return nodes, leafNodeSelectors, nil
350352
}
351353

352354
func (c *ClusterController) deleteNode(ctx context.Context, cluster *kosmosv1alpha1.Cluster) error {

pkg/clustertree/cluster-manager/controllers/node_lease_controller.go

+16-13
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"k8s.io/utils/pointer"
1818
"sigs.k8s.io/controller-runtime/pkg/client"
1919

20+
kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
2021
leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils"
2122
)
2223

@@ -38,19 +39,21 @@ type NodeLeaseController struct {
3839
leaseInterval time.Duration
3940
statusInterval time.Duration
4041

41-
nodes []*corev1.Node
42-
nodeLock sync.Mutex
42+
nodes []*corev1.Node
43+
LeafNodeSelectors map[string]kosmosv1alpha1.NodeSelector
44+
nodeLock sync.Mutex
4345
}
4446

45-
func NewNodeLeaseController(leafClient kubernetes.Interface, root client.Client, nodes []*corev1.Node, rootClient kubernetes.Interface, LeafModelHandler leafUtils.LeafModelHandler) *NodeLeaseController {
47+
func NewNodeLeaseController(leafClient kubernetes.Interface, root client.Client, nodes []*corev1.Node, LeafNodeSelectors map[string]kosmosv1alpha1.NodeSelector, rootClient kubernetes.Interface, LeafModelHandler leafUtils.LeafModelHandler) *NodeLeaseController {
4648
c := &NodeLeaseController{
47-
leafClient: leafClient,
48-
rootClient: rootClient,
49-
root: root,
50-
nodes: nodes,
51-
LeafModelHandler: LeafModelHandler,
52-
leaseInterval: getRenewInterval(),
53-
statusInterval: DefaultNodeStatusUpdateInterval,
49+
leafClient: leafClient,
50+
rootClient: rootClient,
51+
root: root,
52+
nodes: nodes,
53+
LeafModelHandler: LeafModelHandler,
54+
LeafNodeSelectors: LeafNodeSelectors,
55+
leaseInterval: getRenewInterval(),
56+
statusInterval: DefaultNodeStatusUpdateInterval,
5457
}
5558
return c
5659
}
@@ -71,15 +74,15 @@ func (c *NodeLeaseController) syncNodeStatus(ctx context.Context) {
7174
}
7275
c.nodeLock.Unlock()
7376

74-
err := c.updateNodeStatus(ctx, nodes)
77+
err := c.updateNodeStatus(ctx, nodes, c.LeafNodeSelectors)
7578
if err != nil {
7679
klog.Errorf(err.Error())
7780
}
7881
}
7982

8083
// nolint
81-
func (c *NodeLeaseController) updateNodeStatus(ctx context.Context, n []*corev1.Node) error {
82-
err := c.LeafModelHandler.UpdateNodeStatus(ctx, n)
84+
func (c *NodeLeaseController) updateNodeStatus(ctx context.Context, n []*corev1.Node, leafNodeSelector map[string]kosmosv1alpha1.NodeSelector) error {
85+
err := c.LeafModelHandler.UpdateRootNodeStatus(ctx, n, leafNodeSelector)
8386
if err != nil {
8487
klog.Errorf("Could not update node status in root cluster,Error: %v", err)
8588
}

pkg/clustertree/cluster-manager/controllers/node_resources_controller.go

+9-8
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,11 @@ type NodeResourcesController struct {
3939
GlobalLeafManager leafUtils.LeafResourceManager
4040
RootClientset kubernetes.Interface
4141

42-
Nodes []*corev1.Node
43-
LeafModelHandler leafUtils.LeafModelHandler
44-
Cluster *kosmosv1alpha1.Cluster
45-
EventRecorder record.EventRecorder
42+
Nodes []*corev1.Node
43+
LeafNodeSelectors map[string]kosmosv1alpha1.NodeSelector
44+
LeafModelHandler leafUtils.LeafModelHandler
45+
Cluster *kosmosv1alpha1.Cluster
46+
EventRecorder record.EventRecorder
4647
}
4748

4849
var predicatesFunc = predicate.Funcs{
@@ -110,15 +111,15 @@ func (c *NodeResourcesController) Reconcile(ctx context.Context, request reconci
110111
}, fmt.Errorf("cannot get node while update nodeInRoot resources %s, err: %v", rootNode.Name, err)
111112
}
112113

113-
nodesInLeaf, err := c.LeafModelHandler.GetLeafNodes(ctx, rootNode)
114+
nodesInLeaf, err := c.LeafModelHandler.GetLeafNodes(ctx, rootNode, c.LeafNodeSelectors[rootNode.Name])
114115
if err != nil {
115116
klog.Errorf("Could not get node in leaf cluster %s,Error: %v", c.Cluster.Name, err)
116117
return controllerruntime.Result{
117118
RequeueAfter: RequeueTime,
118119
}, err
119120
}
120121

121-
pods, err := c.LeafModelHandler.GetLeafPods(ctx, rootNode)
122+
pods, err := c.LeafModelHandler.GetLeafPods(ctx, rootNode, c.LeafNodeSelectors[rootNode.Name])
122123
if err != nil {
123124
klog.Errorf("Could not list pod in leaf cluster %s,Error: %v", c.Cluster.Name, err)
124125
return controllerruntime.Result{
@@ -130,7 +131,7 @@ func (c *NodeResourcesController) Reconcile(ctx context.Context, request reconci
130131
clone.Status.Conditions = utils.NodeConditions()
131132

132133
// Node2Node mode should sync leaf node's labels and annotations to root nodeInRoot
133-
if c.LeafModelHandler.GetLeafModelType() == leafUtils.DispersionModel {
134+
if c.LeafModelHandler.GetLeafMode() == leafUtils.Node {
134135
getNode := func(nodes *corev1.NodeList) *corev1.Node {
135136
for _, nodeInLeaf := range nodes.Items {
136137
if nodeInLeaf.Name == rootNode.Name {
@@ -156,7 +157,7 @@ func (c *NodeResourcesController) Reconcile(ctx context.Context, request reconci
156157
}
157158
}
158159
}
159-
160+
// TODO ggregation Labels and Annotations for classificationModel
160161
clusterResources := utils.CalculateClusterResources(nodesInLeaf, pods)
161162
clone.Status.Allocatable = clusterResources
162163
clone.Status.Capacity = clusterResources

pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go

+19-14
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"sigs.k8s.io/controller-runtime/pkg/reconcile"
2828

2929
"github.com/kosmos.io/kosmos/cmd/clustertree/cluster-manager/app/options"
30+
kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
3031
"github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/extensions/daemonset"
3132
leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils"
3233
"github.com/kosmos.io/kosmos/pkg/utils"
@@ -198,7 +199,7 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req
198199
// create pod in leaf
199200
if err != nil {
200201
if errors.IsNotFound(err) {
201-
if err := r.CreatePodInLeafCluster(ctx, lr, &rootpod); err != nil {
202+
if err := r.CreatePodInLeafCluster(ctx, lr, &rootpod, r.GlobalLeafManager.GetClusterNode(rootpod.Spec.NodeName).LeafNodeSelector); err != nil {
202203
klog.Errorf("create pod inleaf error, err: %s", err)
203204
return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil
204205
} else {
@@ -212,7 +213,7 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req
212213

213214
// update pod in leaf
214215
if podutils.ShouldEnqueue(leafPod, &rootpod) {
215-
if err := r.UpdatePodInLeafCluster(ctx, lr, &rootpod, leafPod); err != nil {
216+
if err := r.UpdatePodInLeafCluster(ctx, lr, &rootpod, leafPod, r.GlobalLeafManager.GetClusterNode(rootpod.Spec.NodeName).LeafNodeSelector); err != nil {
216217
return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil
217218
}
218219
}
@@ -698,7 +699,7 @@ func (r *RootPodReconciler) createVolumes(ctx context.Context, lr *leafUtils.Lea
698699
return nil
699700
}
700701

701-
func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, pod *corev1.Pod) error {
702+
func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, pod *corev1.Pod, nodeSelector kosmosv1alpha1.NodeSelector) error {
702703
if err := podutils.PopulateEnvironmentVariables(ctx, pod, r.envResourceManager); err != nil {
703704
// span.SetStatus(err)
704705
return err
@@ -709,7 +710,7 @@ func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leaf
709710
return fmt.Errorf("clusternode info is nil , name: %s", pod.Spec.NodeName)
710711
}
711712

712-
basicPod := podutils.FitPod(pod, lr.IgnoreLabels, clusterNodeInfo.LeafMode == leafUtils.ALL)
713+
basicPod := podutils.FitPod(pod, lr.IgnoreLabels, clusterNodeInfo.LeafMode, nodeSelector)
713714
klog.V(4).Infof("Creating pod %v/%+v", pod.Namespace, pod.Name)
714715

715716
// create ns
@@ -763,24 +764,28 @@ func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leaf
763764
return nil
764765
}
765766

766-
func (r *RootPodReconciler) UpdatePodInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, rootpod *corev1.Pod, leafpod *corev1.Pod) error {
767+
func (r *RootPodReconciler) UpdatePodInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, rootPod *corev1.Pod, leafPod *corev1.Pod, nodeSelector kosmosv1alpha1.NodeSelector) error {
767768
// TODO: update env
768769
// TODO: update config secret pv pvc ...
769-
klog.V(4).Infof("Updating pod %v/%+v", rootpod.Namespace, rootpod.Name)
770+
klog.V(4).Infof("Updating pod %v/%+v", rootPod.Namespace, rootPod.Name)
770771

771-
if !podutils.IsKosmosPod(leafpod) {
772+
if !podutils.IsKosmosPod(leafPod) {
772773
klog.V(4).Info("Pod is not created by kosmos tree, ignore")
773774
return nil
774775
}
775776
// not used
776-
podutils.FitLabels(leafpod.ObjectMeta.Labels, lr.IgnoreLabels)
777-
podCopy := leafpod.DeepCopy()
777+
podutils.FitLabels(leafPod.ObjectMeta.Labels, lr.IgnoreLabels)
778+
podCopy := leafPod.DeepCopy()
778779
// util.GetUpdatedPod update PodCopy container image, annotations, labels.
779780
// recover toleration, affinity, tripped ignore labels.
780-
podutils.GetUpdatedPod(podCopy, rootpod, lr.IgnoreLabels)
781-
if reflect.DeepEqual(leafpod.Spec, podCopy.Spec) &&
782-
reflect.DeepEqual(leafpod.Annotations, podCopy.Annotations) &&
783-
reflect.DeepEqual(leafpod.Labels, podCopy.Labels) {
781+
clusterNodeInfo := r.GlobalLeafManager.GetClusterNode(rootPod.Spec.NodeName)
782+
if clusterNodeInfo == nil {
783+
return fmt.Errorf("clusternode info is nil , name: %s", rootPod.Spec.NodeName)
784+
}
785+
podutils.GetUpdatedPod(podCopy, rootPod, lr.IgnoreLabels, clusterNodeInfo.LeafMode, nodeSelector)
786+
if reflect.DeepEqual(leafPod.Spec, podCopy.Spec) &&
787+
reflect.DeepEqual(leafPod.Annotations, podCopy.Annotations) &&
788+
reflect.DeepEqual(leafPod.Labels, podCopy.Labels) {
784789
return nil
785790
}
786791

@@ -796,7 +801,7 @@ func (r *RootPodReconciler) UpdatePodInLeafCluster(ctx context.Context, lr *leaf
796801
if err != nil {
797802
return fmt.Errorf("could not update pod: %v", err)
798803
}
799-
klog.V(4).Infof("Update pod %v/%+v success ", rootpod.Namespace, rootpod.Name)
804+
klog.V(4).Infof("Update pod %v/%+v success ", rootPod.Namespace, rootPod.Name)
800805
return nil
801806
}
802807

0 commit comments

Comments
 (0)