Skip to content

Commit 00efe11

Browse files
committed
feature: support one-to-many model for the leaf node feature
Signed-off-by: qiuming520 <[email protected]>
1 parent 99bf98a commit 00efe11

File tree

8 files changed

+363
-88
lines changed

8 files changed

+363
-88
lines changed

pkg/clustertree/cluster-manager/cluster_controller.go

+10-8
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ func (c *ClusterController) Reconcile(ctx context.Context, request reconcile.Req
190190
leafModelHandler := leafUtils.NewLeafModelHandler(cluster, c.Root, mgr.GetClient(), c.RootClientset, leafClient)
191191
c.LeafModelHandler = leafModelHandler
192192

193-
nodes, err := c.createNode(ctx, cluster, leafClient)
193+
nodes, leafNodeSelectors, err := c.createNode(ctx, cluster, leafClient)
194194
if err != nil {
195195
return reconcile.Result{RequeueAfter: RequeueTime}, fmt.Errorf("create node with err %v, cluster %s", err, cluster.Name)
196196
}
@@ -206,7 +206,7 @@ func (c *ClusterController) Reconcile(ctx context.Context, request reconcile.Req
206206
c.ManagerCancelFuncs[cluster.Name] = &cancel
207207
c.ControllerManagersLock.Unlock()
208208

209-
if err = c.setupControllers(mgr, cluster, nodes, leafDynamic, leafClient, kosmosClient, config); err != nil {
209+
if err = c.setupControllers(mgr, cluster, nodes, leafDynamic, leafNodeSelectors, leafClient, kosmosClient, config); err != nil {
210210
return reconcile.Result{}, fmt.Errorf("failed to setup cluster %s controllers: %v", cluster.Name, err)
211211
}
212212

@@ -240,6 +240,7 @@ func (c *ClusterController) setupControllers(
240240
cluster *kosmosv1alpha1.Cluster,
241241
nodes []*corev1.Node,
242242
clientDynamic *dynamic.DynamicClient,
243+
leafNodeSelector map[string]kosmosv1alpha1.NodeSelector,
243244
leafClientset kubernetes.Interface,
244245
kosmosClient kosmosversioned.Interface,
245246
leafRestConfig *rest.Config) error {
@@ -262,14 +263,15 @@ func (c *ClusterController) setupControllers(
262263
Root: c.Root,
263264
RootClientset: c.RootClientset,
264265
Nodes: nodes,
266+
LeafNodeSelectors: leafNodeSelector,
265267
LeafModelHandler: c.LeafModelHandler,
266268
Cluster: cluster,
267269
}
268270
if err := nodeResourcesController.SetupWithManager(mgr); err != nil {
269271
return fmt.Errorf("error starting %s: %v", controllers.NodeResourcesControllerName, err)
270272
}
271273

272-
nodeLeaseController := controllers.NewNodeLeaseController(leafClientset, c.Root, nodes, c.RootClientset, c.LeafModelHandler)
274+
nodeLeaseController := controllers.NewNodeLeaseController(leafClientset, c.Root, nodes, leafNodeSelector, c.RootClientset, c.LeafModelHandler)
273275
if err := mgr.Add(nodeLeaseController); err != nil {
274276
return fmt.Errorf("error starting %s: %v", controllers.NodeLeaseControllerName, err)
275277
}
@@ -333,19 +335,19 @@ func (c *ClusterController) setupStorageControllers(mgr manager.Manager, isOne2O
333335
return nil
334336
}
335337

336-
func (c *ClusterController) createNode(ctx context.Context, cluster *kosmosv1alpha1.Cluster, leafClient kubernetes.Interface) ([]*corev1.Node, error) {
338+
func (c *ClusterController) createNode(ctx context.Context, cluster *kosmosv1alpha1.Cluster, leafClient kubernetes.Interface) ([]*corev1.Node, map[string]kosmosv1alpha1.NodeSelector, error) {
337339
serverVersion, err := leafClient.Discovery().ServerVersion()
338340
if err != nil {
339341
klog.Errorf("create node failed, can not connect to leaf %s", cluster.Name)
340-
return nil, err
342+
return nil, nil, err
341343
}
342344

343-
nodes, err := c.LeafModelHandler.CreateNodeInRoot(ctx, cluster, c.Options.ListenPort, serverVersion.GitVersion)
345+
nodes, leafNodeSelectors, err := c.LeafModelHandler.CreateNodeInRoot(ctx, cluster, c.Options.ListenPort, serverVersion.GitVersion)
344346
if err != nil {
345347
klog.Errorf("create node for cluster %s failed, err: %v", cluster.Name, err)
346-
return nil, err
348+
return nil, nil, err
347349
}
348-
return nodes, nil
350+
return nodes, leafNodeSelectors, nil
349351
}
350352

351353
func (c *ClusterController) deleteNode(ctx context.Context, cluster *kosmosv1alpha1.Cluster) error {

pkg/clustertree/cluster-manager/controllers/node_lease_controller.go

+16-13
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"k8s.io/utils/pointer"
1818
"sigs.k8s.io/controller-runtime/pkg/client"
1919

20+
kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
2021
leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils"
2122
)
2223

@@ -38,19 +39,21 @@ type NodeLeaseController struct {
3839
leaseInterval time.Duration
3940
statusInterval time.Duration
4041

41-
nodes []*corev1.Node
42-
nodeLock sync.Mutex
42+
nodes []*corev1.Node
43+
LeafNodeSelectors map[string]kosmosv1alpha1.NodeSelector
44+
nodeLock sync.Mutex
4345
}
4446

45-
func NewNodeLeaseController(leafClient kubernetes.Interface, root client.Client, nodes []*corev1.Node, rootClient kubernetes.Interface, LeafModelHandler leafUtils.LeafModelHandler) *NodeLeaseController {
47+
func NewNodeLeaseController(leafClient kubernetes.Interface, root client.Client, nodes []*corev1.Node, LeafNodeSelectors map[string]kosmosv1alpha1.NodeSelector, rootClient kubernetes.Interface, LeafModelHandler leafUtils.LeafModelHandler) *NodeLeaseController {
4648
c := &NodeLeaseController{
47-
leafClient: leafClient,
48-
rootClient: rootClient,
49-
root: root,
50-
nodes: nodes,
51-
LeafModelHandler: LeafModelHandler,
52-
leaseInterval: getRenewInterval(),
53-
statusInterval: DefaultNodeStatusUpdateInterval,
49+
leafClient: leafClient,
50+
rootClient: rootClient,
51+
root: root,
52+
nodes: nodes,
53+
LeafModelHandler: LeafModelHandler,
54+
LeafNodeSelectors: LeafNodeSelectors,
55+
leaseInterval: getRenewInterval(),
56+
statusInterval: DefaultNodeStatusUpdateInterval,
5457
}
5558
return c
5659
}
@@ -71,15 +74,15 @@ func (c *NodeLeaseController) syncNodeStatus(ctx context.Context) {
7174
}
7275
c.nodeLock.Unlock()
7376

74-
err := c.updateNodeStatus(ctx, nodes)
77+
err := c.updateNodeStatus(ctx, nodes, c.LeafNodeSelectors)
7578
if err != nil {
7679
klog.Errorf(err.Error())
7780
}
7881
}
7982

8083
// nolint
81-
func (c *NodeLeaseController) updateNodeStatus(ctx context.Context, n []*corev1.Node) error {
82-
err := c.LeafModelHandler.UpdateNodeStatus(ctx, n)
84+
func (c *NodeLeaseController) updateNodeStatus(ctx context.Context, n []*corev1.Node, leafNodeSelector map[string]kosmosv1alpha1.NodeSelector) error {
85+
err := c.LeafModelHandler.UpdateNodeStatus(ctx, n, leafNodeSelector)
8386
if err != nil {
8487
klog.Errorf("Could not update node status in root cluster,Error: %v", err)
8588
}

pkg/clustertree/cluster-manager/controllers/node_resources_controller.go

+10-6
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,11 @@ type NodeResourcesController struct {
3939
GlobalLeafManager leafUtils.LeafResourceManager
4040
RootClientset kubernetes.Interface
4141

42-
Nodes []*corev1.Node
43-
LeafModelHandler leafUtils.LeafModelHandler
44-
Cluster *kosmosv1alpha1.Cluster
45-
EventRecorder record.EventRecorder
42+
Nodes []*corev1.Node
43+
LeafNodeSelectors map[string]kosmosv1alpha1.NodeSelector
44+
LeafModelHandler leafUtils.LeafModelHandler
45+
Cluster *kosmosv1alpha1.Cluster
46+
EventRecorder record.EventRecorder
4647
}
4748

4849
var predicatesFunc = predicate.Funcs{
@@ -110,15 +111,15 @@ func (c *NodeResourcesController) Reconcile(ctx context.Context, request reconci
110111
}, fmt.Errorf("cannot get node while update nodeInRoot resources %s, err: %v", rootNode.Name, err)
111112
}
112113

113-
nodesInLeaf, err := c.LeafModelHandler.GetLeafNodes(ctx, rootNode)
114+
nodesInLeaf, err := c.LeafModelHandler.GetLeafNodes(ctx, rootNode, c.LeafNodeSelectors[rootNode.Name])
114115
if err != nil {
115116
klog.Errorf("Could not get node in leaf cluster %s,Error: %v", c.Cluster.Name, err)
116117
return controllerruntime.Result{
117118
RequeueAfter: RequeueTime,
118119
}, err
119120
}
120121

121-
pods, err := c.LeafModelHandler.GetLeafPods(ctx, rootNode)
122+
pods, err := c.LeafModelHandler.GetLeafPods(ctx, rootNode, c.LeafNodeSelectors[rootNode.Name])
122123
if err != nil {
123124
klog.Errorf("Could not list pod in leaf cluster %s,Error: %v", c.Cluster.Name, err)
124125
return controllerruntime.Result{
@@ -156,6 +157,9 @@ func (c *NodeResourcesController) Reconcile(ctx context.Context, request reconci
156157
}
157158
}
158159
}
160+
//else if c.LeafModelHandler.GetLeafModelType() == leafUtils.ClassificationModel {
161+
// // TODO ggregation Labels and Annotations for classificationModel
162+
//}
159163

160164
clusterResources := utils.CalculateClusterResources(nodesInLeaf, pods)
161165
clone.Status.Allocatable = clusterResources

pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"sigs.k8s.io/controller-runtime/pkg/reconcile"
2828

2929
"github.com/kosmos.io/kosmos/cmd/clustertree/cluster-manager/app/options"
30+
kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
3031
"github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/extensions/daemonset"
3132
leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils"
3233
"github.com/kosmos.io/kosmos/pkg/utils"
@@ -198,7 +199,7 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req
198199
// create pod in leaf
199200
if err != nil {
200201
if errors.IsNotFound(err) {
201-
if err := r.CreatePodInLeafCluster(ctx, lr, &rootpod); err != nil {
202+
if err := r.CreatePodInLeafCluster(ctx, lr, &rootpod, r.GlobalLeafManager.GetClusterNode(rootpod.Spec.NodeName).LeafNodeSelector); err != nil {
202203
klog.Errorf("create pod inleaf error, err: %s", err)
203204
return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil
204205
} else {
@@ -700,7 +701,7 @@ func (r *RootPodReconciler) createVolumes(ctx context.Context, lr *leafUtils.Lea
700701
return nil
701702
}
702703

703-
func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, pod *corev1.Pod) error {
704+
func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, pod *corev1.Pod, nodeSelector kosmosv1alpha1.NodeSelector) error {
704705
if err := podutils.PopulateEnvironmentVariables(ctx, pod, r.envResourceManager); err != nil {
705706
// span.SetStatus(err)
706707
return err
@@ -711,7 +712,7 @@ func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leaf
711712
return fmt.Errorf("clusternode info is nil , name: %s", pod.Spec.NodeName)
712713
}
713714

714-
basicPod := podutils.FitPod(pod, lr.IgnoreLabels, clusterNodeInfo.LeafMode == leafUtils.ALL)
715+
basicPod := podutils.FitPod(pod, lr.IgnoreLabels, clusterNodeInfo.LeafMode, nodeSelector)
715716
klog.V(4).Infof("Creating pod %v/%+v", pod.Namespace, pod.Name)
716717

717718
// create ns

0 commit comments

Comments
 (0)