Skip to content

Commit ce0eaaf

Browse files
committed
feature: support one-to-many model for the leaf node feature
Signed-off-by: qiuming520 <[email protected]>
1 parent 11c74e5 commit ce0eaaf

File tree

5 files changed

+135
-75
lines changed

5 files changed

+135
-75
lines changed

pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"sigs.k8s.io/controller-runtime/pkg/reconcile"
2828

2929
"github.com/kosmos.io/kosmos/cmd/clustertree/cluster-manager/app/options"
30+
kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
3031
"github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/extensions/daemonset"
3132
leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils"
3233
"github.com/kosmos.io/kosmos/pkg/utils"
@@ -198,7 +199,7 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req
198199
// create pod in leaf
199200
if err != nil {
200201
if errors.IsNotFound(err) {
201-
if err := r.CreatePodInLeafCluster(ctx, lr, &rootpod); err != nil {
202+
if err := r.CreatePodInLeafCluster(ctx, lr, &rootpod, r.GlobalLeafManager.GetClusterNode(rootpod.Spec.NodeName).LeafNodeSelector); err != nil {
202203
klog.Errorf("create pod inleaf error, err: %s", err)
203204
return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil
204205
} else {
@@ -700,7 +701,7 @@ func (r *RootPodReconciler) createVolumes(ctx context.Context, lr *leafUtils.Lea
700701
return nil
701702
}
702703

703-
func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, pod *corev1.Pod) error {
704+
func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, pod *corev1.Pod, nodeSelector kosmosv1alpha1.NodeSelector) error {
704705
if err := podutils.PopulateEnvironmentVariables(ctx, pod, r.envResourceManager); err != nil {
705706
// span.SetStatus(err)
706707
return err
@@ -711,7 +712,7 @@ func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leaf
711712
return fmt.Errorf("clusternode info is nil , name: %s", pod.Spec.NodeName)
712713
}
713714

714-
basicPod := podutils.FitPod(pod, lr.IgnoreLabels, clusterNodeInfo.LeafMode == leafUtils.ALL)
715+
basicPod := podutils.FitPod(pod, lr.IgnoreLabels, clusterNodeInfo.LeafMode, nodeSelector)
715716
klog.V(4).Infof("Creating pod %v/%+v", pod.Namespace, pod.Name)
716717

717718
// create ns

pkg/clustertree/cluster-manager/utils/leaf_model_handler.go

+38-47
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"k8s.io/apimachinery/pkg/api/errors"
1010
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1111
"k8s.io/apimachinery/pkg/types"
12-
"k8s.io/apimachinery/pkg/util/uuid"
1312
"k8s.io/client-go/kubernetes"
1413
"k8s.io/client-go/util/retry"
1514
"k8s.io/klog/v2"
@@ -295,47 +294,33 @@ func (h ClassificationModelHandler) GetLeafModelType() LeafModelType {
295294
// GetLeafNodes returns nodes in leaf cluster by the rootNode
296295
func (h ClassificationModelHandler) GetLeafNodes(ctx context.Context, _ *corev1.Node, selector kosmosv1alpha1.NodeSelector) (*corev1.NodeList, error) {
297296
nodesInLeaf, err := h.LeafClientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{
298-
LabelSelector: selector.LabelSelector.String(),
297+
LabelSelector: metav1.FormatLabelSelector(selector.LabelSelector),
299298
})
300299
if err != nil {
301300
return nil, err
302301
}
303302
return nodesInLeaf, nil
304303
}
305304

306-
func join(nodeList *corev1.NodeList, sep string) string {
307-
nodes := nodeList.Items
308-
if len(nodes) == 0 {
309-
return ""
310-
}
311-
312-
result := nodes[0].Name
313-
for _, node := range nodes[1:] {
314-
result += sep + node.Name
315-
}
316-
317-
return result
318-
}
319-
320305
// GetLeafPods returns pods in leaf cluster by the rootNode
321306
func (h ClassificationModelHandler) GetLeafPods(ctx context.Context, rootNode *corev1.Node, selector kosmosv1alpha1.NodeSelector) (pods *corev1.PodList, err error) {
322307
nodesInLeafs, err := h.GetLeafNodes(ctx, rootNode, selector)
323308
if err != nil {
324309
return nil, err
325310
}
326311

327-
fieldSelector := join(nodesInLeafs, ",")
328-
if fieldSelector == "" {
329-
klog.Warningf("have leaf node in rootNode :v%", rootNode.Name)
330-
return
331-
}
332-
333-
fieldSelector = "spec.nodeName in (" + join(nodesInLeafs, ",") + ")"
334-
pods, err = h.LeafClientset.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{
335-
FieldSelector: fieldSelector,
336-
})
337-
if err != nil {
338-
return nil, err
312+
for _, node := range nodesInLeafs.Items {
313+
podsInNode, err := h.LeafClientset.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{
314+
FieldSelector: fmt.Sprintf("spec.nodeName=%s", node.Name),
315+
})
316+
if err != nil {
317+
return nil, err
318+
}
319+
if pods == nil {
320+
pods = podsInNode
321+
} else {
322+
pods.Items = append(pods.Items, podsInNode.Items...)
323+
}
339324
}
340325

341326
return pods, nil
@@ -368,13 +353,16 @@ func (h ClassificationModelHandler) UpdateNodeStatus(ctx context.Context, nodes
368353
}
369354

370355
nodesInLeaf, err := h.LeafClientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{
371-
LabelSelector: selector.LabelSelector.String(),
356+
LabelSelector: metav1.FormatLabelSelector(selector.LabelSelector),
372357
})
373358
if err != nil {
374359
// TODO: If a node is accidentally deleted, recreate it
375360
return fmt.Errorf("cannot get node in leaf cluster while update node status %s, err: %v", nodeCopy.Name, err)
376361
}
377362

363+
if len(nodesInLeaf.Items) == 0 {
364+
return fmt.Errorf("cannot get node in leaf cluster while update node status, leaf node item is 0")
365+
}
378366
rootCopy := nodeRoot.DeepCopy()
379367

380368
// TODO: Aggregation the resources of the leaf nodes
@@ -402,23 +390,25 @@ func (h ClassificationModelHandler) UpdateNodeStatus(ctx context.Context, nodes
402390
// CreateNodeInRoot creates the node in root cluster
403391
func (h ClassificationModelHandler) CreateNodeInRoot(ctx context.Context, cluster *kosmosv1alpha1.Cluster, listenPort int32, gitVersion string) ([]*corev1.Node, map[string]kosmosv1alpha1.NodeSelector, error) {
404392
nodes := make([]*corev1.Node, 0)
405-
406393
leafNodeSelectors := make(map[string]kosmosv1alpha1.NodeSelector)
407-
for _, leafModel := range cluster.Spec.ClusterTreeOptions.LeafModels {
408-
if reflect.DeepEqual(leafModel.NodeSelector.LabelSelector, metav1.LabelSelector{}) {
394+
395+
for i, leafModel := range cluster.Spec.ClusterTreeOptions.LeafModels {
396+
if !reflect.DeepEqual(leafModel.NodeSelector.LabelSelector, metav1.LabelSelector{}) {
409397
var nodeName string
410398
if leafModel.NodeSelector.NodeName != "" {
411-
nodeName = leafModel.NodeSelector.NodeName
399+
nodeName = fmt.Sprintf("%v%v%v%v", utils.KosmosNodePrefix, leafModel.NodeSelector.NodeName, "-", i)
412400
} else {
413-
nodeName = fmt.Sprintf("%s%s%s%s", utils.KosmosNodePrefix, cluster.Name, "_", string(uuid.NewUUID()))
401+
nodeName = fmt.Sprintf("%v%v%v%v", utils.KosmosNodePrefix, cluster.Name, "-", i)
414402
}
415403

404+
if len(nodeName) > 63 {
405+
nodeName = nodeName[:63]
406+
}
416407
node, err := h.RootClientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
417408
if err != nil {
418409
if !errors.IsNotFound(err) {
419410
return nil, nil, err
420411
}
421-
422412
node = utils.BuildNodeTemplate(nodeName)
423413
nodeAnnotations := node.GetAnnotations()
424414
if nodeAnnotations == nil {
@@ -427,14 +417,14 @@ func (h ClassificationModelHandler) CreateNodeInRoot(ctx context.Context, cluste
427417
nodeAnnotations[utils.KosmosNodeOwnedByClusterAnnotations] = cluster.Name
428418
node.SetAnnotations(nodeAnnotations)
429419

430-
if leafModel.NodeSelector.LabelSelector.MatchLabels != nil {
431-
nodeLables := node.GetLabels()
432-
if nodeLables == nil {
433-
nodeLables = make(map[string]string)
434-
}
435-
nodeLables[utils.KosmosNodeOwnedByClusterAnnotations] = cluster.Name
436-
node.SetLabels(leafModel.NodeSelector.LabelSelector.MatchLabels)
420+
nodeLabels := node.GetLabels()
421+
if nodeLabels == nil {
422+
nodeLabels = make(map[string]string, 1)
437423
}
424+
for key, value := range leafModel.NodeSelector.LabelSelector.MatchLabels {
425+
nodeLabels[key] = value
426+
}
427+
node.SetLabels(nodeLabels)
438428

439429
node.Status.NodeInfo.KubeletVersion = gitVersion
440430
node.Status.DaemonEndpoints = corev1.NodeDaemonEndpoints{
@@ -451,6 +441,7 @@ func (h ClassificationModelHandler) CreateNodeInRoot(ctx context.Context, cluste
451441
}
452442
}
453443
nodes = append(nodes, node)
444+
leafNodeSelectors[nodeName] = leafModel.NodeSelector
454445
}
455446
}
456447
return nodes, leafNodeSelectors, nil
@@ -459,18 +450,18 @@ func (h ClassificationModelHandler) CreateNodeInRoot(ctx context.Context, cluste
459450
// NewLeafModelHandler create a LeafModelHandler for Cluster
460451
func NewLeafModelHandler(cluster *kosmosv1alpha1.Cluster, root, leafClient client.Client, rootClientset, leafClientset kubernetes.Interface) LeafModelHandler {
461452
leafModels := cluster.Spec.ClusterTreeOptions.LeafModels
462-
if leafModels != nil {
463-
if reflect.DeepEqual(leafModels[0].NodeSelector, kosmosv1alpha1.NodeSelector{}) {
464-
return &DispersionModelHandler{
453+
if leafModels != nil && !reflect.DeepEqual(leafModels[0].NodeSelector, kosmosv1alpha1.NodeSelector{}) {
454+
if leafModels[0].NodeSelector.LabelSelector != nil {
455+
// support nodeSelector mode
456+
return &ClassificationModelHandler{
465457
Cluster: cluster,
466458
LeafClient: leafClient,
467459
RootClient: root,
468460
RootClientset: rootClientset,
469461
LeafClientset: leafClientset,
470462
}
471463
} else {
472-
// support nodeSelector mode
473-
return &ClassificationModelHandler{
464+
return &DispersionModelHandler{
474465
Cluster: cluster,
475466
LeafClient: leafClient,
476467
RootClient: root,

pkg/clustertree/cluster-manager/utils/leaf_resource_manager.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ const (
3030
)
3131

3232
type ClusterNode struct {
33-
NodeName string
34-
LeafMode LeafMode
33+
NodeName string
34+
LeafMode LeafMode
35+
LeafNodeSelector kosmosv1alpha1.NodeSelector
3536
}
3637

3738
type LeafResource struct {
@@ -106,8 +107,9 @@ func (l *leafResourceManager) AddLeafResource(lptr *LeafResource, cluster *kosmo
106107
if leafModels != nil && leafModels[i].NodeSelector.LabelSelector != nil {
107108
// TODO: support labelselector
108109
clusterNodes = append(clusterNodes, ClusterNode{
109-
NodeName: trimNamePrefix(n.Name),
110-
LeafMode: Party,
110+
NodeName: trimNamePrefix(n.Name),
111+
LeafMode: Party,
112+
LeafNodeSelector: leafModels[i].NodeSelector,
111113
})
112114
} else if leafModels != nil && len(leafModels[i].NodeSelector.NodeName) > 0 {
113115
clusterNodes = append(clusterNodes, ClusterNode{

pkg/utils/podutils/pod.go

+66-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
1111
"k8s.io/klog"
1212

13+
kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
14+
clustertreeutil "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils"
1315
"github.com/kosmos.io/kosmos/pkg/utils"
1416
)
1517

@@ -141,7 +143,7 @@ func FitUnstructuredObjMeta(unstructuredObj *unstructured.Unstructured) {
141143
}
142144
}
143145

144-
func FitPod(pod *corev1.Pod, ignoreLabels []string, cleanNodeName bool) *corev1.Pod {
146+
func FitPod(pod *corev1.Pod, ignoreLabels []string, leafMode clustertreeutil.LeafMode, nodeSelector kosmosv1alpha1.NodeSelector) *corev1.Pod {
145147
vols := []corev1.Volume{}
146148
for _, v := range pod.Spec.Volumes {
147149
if strings.HasPrefix(v.Name, "default-token") {
@@ -170,10 +172,14 @@ func FitPod(pod *corev1.Pod, ignoreLabels []string, cleanNodeName bool) *corev1.
170172
podCopy.Spec.SchedulerName = ""
171173
}
172174

173-
if cleanNodeName {
175+
if leafMode != clustertreeutil.Node {
174176
podCopy.Spec.NodeName = ""
175177
}
176178

179+
if leafMode == clustertreeutil.Party {
180+
podCopy.Spec.Affinity = fitNodeAffinity(pod.Spec.Affinity, nodeSelector)
181+
}
182+
177183
tripped := FitLabels(podCopy.ObjectMeta.Labels, ignoreLabels)
178184
if tripped != nil {
179185
trippedStr, err := json.Marshal(tripped)
@@ -186,6 +192,64 @@ func FitPod(pod *corev1.Pod, ignoreLabels []string, cleanNodeName bool) *corev1.
186192
return podCopy
187193
}
188194

195+
func fitNodeAffinity(affinity *corev1.Affinity, nodeSelector kosmosv1alpha1.NodeSelector) (cpAffinity *corev1.Affinity) {
196+
nodeSelectorTerms := make([]corev1.NodeSelectorTerm, 0)
197+
nodeSelectorTerm := corev1.NodeSelectorTerm{
198+
MatchExpressions: make([]corev1.NodeSelectorRequirement, 0),
199+
}
200+
if nodeSelector.LabelSelector.MatchLabels != nil {
201+
for key, value := range nodeSelector.LabelSelector.MatchLabels {
202+
selector := corev1.NodeSelectorRequirement{
203+
Key: key,
204+
Operator: corev1.NodeSelectorOpIn,
205+
Values: []string{value},
206+
}
207+
nodeSelectorTerm.MatchExpressions = append(nodeSelectorTerm.MatchExpressions, selector)
208+
}
209+
}
210+
211+
if nodeSelector.LabelSelector.MatchExpressions != nil {
212+
for _, item := range nodeSelector.LabelSelector.MatchExpressions {
213+
selector := corev1.NodeSelectorRequirement{
214+
Key: item.Key,
215+
Operator: corev1.NodeSelectorOperator(item.Operator),
216+
Values: item.Values,
217+
}
218+
nodeSelectorTerm.MatchExpressions = append(nodeSelectorTerm.MatchExpressions, selector)
219+
}
220+
}
221+
nodeSelectorTerms = append(nodeSelectorTerms, nodeSelectorTerm)
222+
223+
if affinity == nil {
224+
cpAffinity = &corev1.Affinity{
225+
NodeAffinity: &corev1.NodeAffinity{
226+
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
227+
NodeSelectorTerms: nodeSelectorTerms,
228+
},
229+
},
230+
}
231+
} else {
232+
cpAffinity = affinity.DeepCopy()
233+
if cpAffinity.NodeAffinity == nil {
234+
cpAffinity.NodeAffinity = &corev1.NodeAffinity{
235+
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
236+
NodeSelectorTerms: nodeSelectorTerms,
237+
},
238+
}
239+
} else if cpAffinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
240+
cpAffinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = &corev1.NodeSelector{
241+
NodeSelectorTerms: nodeSelectorTerms,
242+
}
243+
} else if cpAffinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms == nil {
244+
cpAffinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = nodeSelectorTerms
245+
} else {
246+
//nodeSelectorTerms = append(nodeSelectorTerms, cpAffinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms...)
247+
cpAffinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = nodeSelectorTerms
248+
}
249+
}
250+
return cpAffinity
251+
}
252+
189253
func fitContainers(containers []corev1.Container) []corev1.Container {
190254
var newContainers []corev1.Container
191255

pkg/utils/resources.go

+21-19
Original file line numberDiff line numberDiff line change
@@ -52,26 +52,28 @@ func SubResourceList(base, list corev1.ResourceList) {
5252
// lifted from https://github.com/kubernetes/kubernetes/blob/v1.21.8/staging/src/k8s.io/kubectl/pkg/describe/describe.go#L4051
5353
func GetPodsTotalRequestsAndLimits(podList *corev1.PodList) (reqs corev1.ResourceList, limits corev1.ResourceList) {
5454
reqs, limits = corev1.ResourceList{}, corev1.ResourceList{}
55-
for _, p := range podList.Items {
56-
pod := p
57-
if IsVirtualPod(&pod) {
58-
continue
59-
}
60-
podReqs, podLimits := v1resource.PodRequestsAndLimits(&pod)
61-
for podReqName, podReqValue := range podReqs {
62-
if value, ok := reqs[podReqName]; !ok {
63-
reqs[podReqName] = podReqValue.DeepCopy()
64-
} else {
65-
value.Add(podReqValue)
66-
reqs[podReqName] = value
55+
if podList.Items != nil {
56+
for _, p := range podList.Items {
57+
pod := p
58+
if IsVirtualPod(&pod) {
59+
continue
6760
}
68-
}
69-
for podLimitName, podLimitValue := range podLimits {
70-
if value, ok := limits[podLimitName]; !ok {
71-
limits[podLimitName] = podLimitValue.DeepCopy()
72-
} else {
73-
value.Add(podLimitValue)
74-
limits[podLimitName] = value
61+
podReqs, podLimits := v1resource.PodRequestsAndLimits(&pod)
62+
for podReqName, podReqValue := range podReqs {
63+
if value, ok := reqs[podReqName]; !ok {
64+
reqs[podReqName] = podReqValue.DeepCopy()
65+
} else {
66+
value.Add(podReqValue)
67+
reqs[podReqName] = value
68+
}
69+
}
70+
for podLimitName, podLimitValue := range podLimits {
71+
if value, ok := limits[podLimitName]; !ok {
72+
limits[podLimitName] = podLimitValue.DeepCopy()
73+
} else {
74+
value.Add(podLimitValue)
75+
limits[podLimitName] = value
76+
}
7577
}
7678
}
7779
}

0 commit comments

Comments
 (0)