Skip to content

Commit 94535ca

Browse files
authored
Merge pull request kosmos-io#178 from wangyizhi1/main
fix: clustertree bugs
2 parents 4e857b9 + c5e2bb5 commit 94535ca

File tree

10 files changed

+148
-91
lines changed

10 files changed

+148
-91
lines changed

cmd/clustertree/cluster-manager/app/manager.go

+20-15
Original file line numberDiff line numberDiff line change
@@ -117,30 +117,35 @@ func run(ctx context.Context, opts *options.Options) error {
117117
return fmt.Errorf("error starting %s: %v", clusterManager.ControllerName, err)
118118
}
119119

120-
// add serviceExport controller
121-
ServiceExportController := mcs.ServiceExportController{
122-
RootClient: mgr.GetClient(),
123-
EventRecorder: mgr.GetEventRecorderFor(mcs.ServiceExportControllerName),
124-
Logger: mgr.GetLogger(),
125-
}
126-
if err = ServiceExportController.SetupWithManager(mgr); err != nil {
127-
return fmt.Errorf("error starting %s: %v", mcs.ServiceExportControllerName, err)
120+
if opts.MultiClusterService {
121+
// add serviceExport controller
122+
ServiceExportController := mcs.ServiceExportController{
123+
RootClient: mgr.GetClient(),
124+
EventRecorder: mgr.GetEventRecorderFor(mcs.ServiceExportControllerName),
125+
Logger: mgr.GetLogger(),
126+
}
127+
if err = ServiceExportController.SetupWithManager(mgr); err != nil {
128+
return fmt.Errorf("error starting %s: %v", mcs.ServiceExportControllerName, err)
129+
}
128130
}
129131

130-
GlobalDaemonSetService := &GlobalDaemonSetService{
131-
opts: opts,
132-
ctx: ctx,
133-
defaultWorkNum: 1,
134-
}
135-
if err = GlobalDaemonSetService.SetupWithManager(mgr); err != nil {
136-
return fmt.Errorf("error starting global daemonset : %v", err)
132+
if opts.DaemonSetController {
133+
daemonSetController := &GlobalDaemonSetService{
134+
opts: opts,
135+
ctx: ctx,
136+
defaultWorkNum: 1,
137+
}
138+
if err = daemonSetController.SetupWithManager(mgr); err != nil {
139+
return fmt.Errorf("error starting global daemonset : %v", err)
140+
}
137141
}
138142

139143
// init rootPodController
140144
RootPodReconciler := podcontrollers.RootPodReconciler{
141145
GlobalLeafManager: globalleafManager,
142146
RootClient: mgr.GetClient(),
143147
DynamicRootClient: dynamicClient,
148+
Options: opts,
144149
}
145150
if err := RootPodReconciler.SetupWithManager(mgr); err != nil {
146151
return fmt.Errorf("error starting RootPodReconciler %s: %v", podcontrollers.RootPodControllerName, err)

cmd/clustertree/cluster-manager/app/options/options.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@ const (
1414
)
1515

1616
type Options struct {
17-
LeaderElection componentbaseconfig.LeaderElectionConfiguration
18-
KubernetesOptions KubernetesOptions
19-
ListenPort int32
17+
LeaderElection componentbaseconfig.LeaderElectionConfiguration
18+
KubernetesOptions KubernetesOptions
19+
ListenPort int32
20+
DaemonSetController bool
21+
MultiClusterService bool
2022
}
2123

2224
type KubernetesOptions struct {
@@ -43,4 +45,6 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
4345
flags.StringVar(&o.KubernetesOptions.KubeConfig, "kubeconfig", "", "Path for kubernetes kubeconfig file, if left blank, will use in cluster way.")
4446
flags.StringVar(&o.KubernetesOptions.Master, "master", "", "Used to generate kubeconfig for downloading, if not specified, will use host in kubeconfig.")
4547
flags.Int32Var(&o.ListenPort, "listen-port", 10250, "Listen port for requests from the kube-apiserver.")
48+
flags.BoolVar(&o.DaemonSetController, "daemonset-controller", false, "Turn on or off daemonset controller.")
49+
flags.BoolVar(&o.MultiClusterService, "multi-cluster-service", false, "Turn on or off mcs support.")
4650
}

pkg/clustertree/cluster-manager/cluster_controller.go

+26-28
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ type ClusterController struct {
6666
ControllerManagersLock sync.Mutex
6767

6868
RootResourceManager *utils.ResourceManager
69-
mgr manager.Manager
7069

7170
GlobalLeafManager leafUtils.LeafResourceManager
7271
}
@@ -116,9 +115,6 @@ func (c *ClusterController) SetupWithManager(mgr manager.Manager) error {
116115
c.ManagerCancelFuncs = make(map[string]*context.CancelFunc)
117116
c.ControllerManagers = make(map[string]manager.Manager)
118117
c.Logger = mgr.GetLogger()
119-
120-
// TODO this may not be a good idea
121-
c.mgr = mgr
122118
return controllerruntime.NewControllerManagedBy(mgr).
123119
Named(ControllerName).
124120
WithOptions(controller.Options{}).
@@ -246,12 +242,13 @@ func (c *ClusterController) clearClusterControllers(cluster *clusterlinkv1alpha1
246242
}
247243

248244
func (c *ClusterController) setupControllers(mgr manager.Manager, cluster *clusterlinkv1alpha1.Cluster, node *corev1.Node, clientDynamic *dynamic.DynamicClient, leafClient kubernetes.Interface, kosmosClient kosmosversioned.Interface) error {
249-
c.GlobalLeafManager.AddLeafResource(cluster.Name, &leafUtils.LeafResource{
245+
nodeName := fmt.Sprintf("%s%s", utils.KosmosNodePrefix, cluster.Name)
246+
c.GlobalLeafManager.AddLeafResource(nodeName, &leafUtils.LeafResource{
250247
Client: mgr.GetClient(),
251248
DynamicClient: clientDynamic,
252249
Clientset: leafClient,
253-
NodeName: cluster.Name,
254-
Namespace: cluster.Spec.Namespace,
250+
NodeName: nodeName,
251+
Namespace: "",
255252
IgnoreLabels: strings.Split("", ","),
256253
EnableServiceAccount: true,
257254
})
@@ -271,23 +268,24 @@ func (c *ClusterController) setupControllers(mgr manager.Manager, cluster *clust
271268
return fmt.Errorf("error starting %s: %v", controllers.NodeLeaseControllerName, err)
272269
}
273270

274-
serviceImportController := &mcs.ServiceImportController{
275-
LeafClient: mgr.GetClient(),
276-
RootClient: c.Root,
277-
RootKosmosClient: kosmosClient,
278-
EventRecorder: mgr.GetEventRecorderFor(mcs.LeafServiceImportControllerName),
279-
Logger: mgr.GetLogger(),
280-
LeafNodeName: cluster.Name,
281-
RootResourceManager: c.RootResourceManager,
282-
}
283-
284-
if err := serviceImportController.AddController(mgr); err != nil {
285-
return fmt.Errorf("error starting %s: %v", mcs.LeafServiceImportControllerName, err)
271+
if c.Options.MultiClusterService {
272+
serviceImportController := &mcs.ServiceImportController{
273+
LeafClient: mgr.GetClient(),
274+
RootClient: c.Root,
275+
RootKosmosClient: kosmosClient,
276+
EventRecorder: mgr.GetEventRecorderFor(mcs.LeafServiceImportControllerName),
277+
Logger: mgr.GetLogger(),
278+
LeafNodeName: nodeName,
279+
RootResourceManager: c.RootResourceManager,
280+
}
281+
if err := serviceImportController.AddController(mgr); err != nil {
282+
return fmt.Errorf("error starting %s: %v", mcs.LeafServiceImportControllerName, err)
283+
}
286284
}
287285

288286
leafPodController := podcontrollers.LeafPodReconciler{
289287
RootClient: c.Root,
290-
Namespace: cluster.Spec.Namespace,
288+
Namespace: "",
291289
}
292290

293291
if err := leafPodController.SetupWithManager(mgr); err != nil {
@@ -313,34 +311,34 @@ func (c *ClusterController) setupStorageControllers(mgr manager.Manager, node *c
313311
return fmt.Errorf("error starting leaf pvc controller %v", err)
314312
}
315313

316-
leafPVontroller := pv.LeafPVController{
314+
leafPVController := pv.LeafPVController{
317315
LeafClient: mgr.GetClient(),
318316
RootClient: c.Root,
319317
RootClientSet: c.RootClient,
320318
NodeName: node.Name,
321319
}
322-
if err := leafPVontroller.SetupWithManager(mgr); err != nil {
320+
if err := leafPVController.SetupWithManager(mgr); err != nil {
323321
return fmt.Errorf("error starting leaf pv controller %v", err)
324322
}
325-
326323
return nil
327324
}
328325

329326
func (c *ClusterController) createNode(ctx context.Context, cluster *clusterlinkv1alpha1.Cluster, leafClient kubernetes.Interface) (*corev1.Node, error) {
327+
nodeName := fmt.Sprintf("%s%s", utils.KosmosNodePrefix, cluster.Name)
330328
serverVersion, err := leafClient.Discovery().ServerVersion()
331329
if err != nil {
332-
klog.Errorf("create node failed, can not connect to leaf %s", cluster.Name)
330+
klog.Errorf("create node failed, can not connect to leaf %s", nodeName)
333331
return nil, err
334332
}
335333

336-
node, err := c.RootClient.CoreV1().Nodes().Get(ctx, cluster.Name, metav1.GetOptions{})
334+
node, err := c.RootClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
337335
if err != nil && !errors.IsNotFound(err) {
338-
klog.Errorf("create node failed, can not get node %s", cluster.Name)
336+
klog.Errorf("create node failed, can not get node %s", nodeName)
339337
return nil, err
340338
}
341339

342340
if err != nil && errors.IsNotFound(err) {
343-
node = utils.BuildNodeTemplate(cluster)
341+
node = utils.BuildNodeTemplate(nodeName)
344342
node.Status.NodeInfo.KubeletVersion = serverVersion.GitVersion
345343
node.Status.DaemonEndpoints = corev1.NodeDaemonEndpoints{
346344
KubeletEndpoint: corev1.DaemonEndpoint{
@@ -349,7 +347,7 @@ func (c *ClusterController) createNode(ctx context.Context, cluster *clusterlink
349347
}
350348
node, err = c.RootClient.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{})
351349
if err != nil && !errors.IsAlreadyExists(err) {
352-
klog.Errorf("create node %s failed, err: %v", cluster.Name, err)
350+
klog.Errorf("create node %s failed, err: %v", nodeName, err)
353351
return nil, err
354352
}
355353
}

pkg/clustertree/cluster-manager/controllers/pod/leaf_pod_controller.go

+21-9
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"github.com/pkg/errors"
99
corev1 "k8s.io/api/core/v1"
1010
apierrors "k8s.io/apimachinery/pkg/api/errors"
11-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1211
"k8s.io/klog/v2"
1312
ctrl "sigs.k8s.io/controller-runtime"
1413
"sigs.k8s.io/controller-runtime/pkg/builder"
@@ -74,11 +73,22 @@ func (dopt *rootDeleteOption) ApplyToDelete(opt *client.DeleteOptions) {
7473
}
7574

7675
func NewRootDeleteOption(pod *corev1.Pod) client.DeleteOption {
77-
gracePeriodSeconds := pod.DeletionGracePeriodSeconds
76+
// TODO
77+
//gracePeriodSeconds := pod.DeletionGracePeriodSeconds
78+
//
79+
//current := metav1.NewTime(time.Now())
80+
//if pod.DeletionTimestamp.Before(&current) {
81+
// gracePeriodSeconds = new(int64)
82+
//}
83+
return &rootDeleteOption{
84+
GracePeriodSeconds: new(int64),
85+
}
86+
}
7887

79-
current := metav1.NewTime(time.Now())
80-
if pod.DeletionTimestamp.Before(&current) {
81-
gracePeriodSeconds = new(int64)
88+
func NewLeafDeleteOption(pod *corev1.Pod) client.DeleteOption {
89+
gracePeriodSeconds := new(int64)
90+
if pod.DeletionGracePeriodSeconds != nil {
91+
gracePeriodSeconds = pod.DeletionGracePeriodSeconds
8292
}
8393

8494
return &rootDeleteOption{
@@ -124,7 +134,9 @@ func (r *LeafPodReconciler) SetupWithManager(mgr manager.Manager) error {
124134
if len(r.Namespace) > 0 && r.Namespace != obj.GetNamespace() {
125135
return false
126136
}
127-
return true
137+
138+
p := obj.(*corev1.Pod)
139+
return podutils.IsKosmosPod(p)
128140
}
129141

130142
return ctrl.NewControllerManagedBy(mgr).
@@ -133,14 +145,14 @@ func (r *LeafPodReconciler) SetupWithManager(mgr manager.Manager) error {
133145
For(&corev1.Pod{}, builder.WithPredicates(predicate.Funcs{
134146
CreateFunc: func(createEvent event.CreateEvent) bool {
135147
// ignore create event
136-
return false
148+
return skipFunc(createEvent.Object)
137149
},
138150
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
151+
pod1 := updateEvent.ObjectOld.(*corev1.Pod)
152+
pod2 := updateEvent.ObjectNew.(*corev1.Pod)
139153
if !skipFunc(updateEvent.ObjectNew) {
140154
return false
141155
}
142-
pod1 := updateEvent.ObjectOld.(*corev1.Pod)
143-
pod2 := updateEvent.ObjectNew.(*corev1.Pod)
144156
return !cmp.Equal(pod1.Status, pod2.Status)
145157
},
146158
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {

0 commit comments

Comments
 (0)