Skip to content

Commit

Permalink
fix: make mcs runnable
Browse files Browse the repository at this point in the history
Signed-off-by: wangyizhi1 <[email protected]>
  • Loading branch information
duanmengkk authored and wangyizhi1 committed Oct 27, 2023
1 parent 204fbf7 commit 5418354
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 137 deletions.
22 changes: 12 additions & 10 deletions cmd/clustertree/cluster-manager/app/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/kosmos.io/kosmos/cmd/clustertree/cluster-manager/app/options"
clusterManager "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager"
"github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/mcs"
"github.com/kosmos.io/kosmos/pkg/scheme"
"github.com/kosmos.io/kosmos/pkg/sharedcli/klogflag"
"github.com/kosmos.io/kosmos/pkg/utils"
Expand Down Expand Up @@ -62,6 +63,7 @@ func run(ctx context.Context, opts *options.Options) error {
config.QPS = opts.KubernetesOptions.QPS
config.Burst = opts.KubernetesOptions.Burst
}

// init root client
rootClient, err := utils.NewClientFromConfigPath(opts.KubernetesOptions.KubeConfig, configOptFunc)
if err != nil {
Expand Down Expand Up @@ -96,25 +98,25 @@ func run(ctx context.Context, opts *options.Options) error {

// add cluster controller
ClusterController := clusterManager.ClusterController{
Root: mgr.GetClient(),
RootDynamic: dynamicClient,
RootClient: rootClient,
EventRecorder: mgr.GetEventRecorderFor(clusterManager.ControllerName),
ConfigOptFunc: configOptFunc,
Options: opts,
Root: mgr.GetClient(),
RootDynamic: dynamicClient,
RootClient: rootClient,
EventRecorder: mgr.GetEventRecorderFor(clusterManager.ControllerName),
Options: opts,
RootResourceManager: rootResourceManager,
}
if err = ClusterController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting %s: %v", clusterManager.ControllerName, err)
}

// add serviceExport controller
ServiceExportController := clusterManager.ServiceExportController{
Master: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(clusterManager.ServiceExportControllerName),
ServiceExportController := mcs.ServiceExportController{
RootClient: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(mcs.ServiceExportControllerName),
Logger: mgr.GetLogger(),
}
if err = ServiceExportController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting %s: %v", clusterManager.ServiceExportControllerName, err)
return fmt.Errorf("error starting %s: %v", mcs.ServiceExportControllerName, err)
}

GlobalDaemonSetService := &GlobalDaemonSetService{
Expand Down
22 changes: 12 additions & 10 deletions pkg/clustertree/cluster-manager/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/kosmos.io/kosmos/cmd/clustertree/cluster-manager/app/options"
clusterlinkv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers"
"github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/mcs"
podcontrollers "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pod"
kosmosversioned "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned"
"github.com/kosmos.io/kosmos/pkg/scheme"
Expand All @@ -56,13 +57,13 @@ type ClusterController struct {
EventRecorder record.EventRecorder
Logger logr.Logger
Options *options.Options
ConfigOptFunc func(config *rest.Config)

ControllerManagers map[string]*manager.Manager
ManagerCancelFuncs map[string]*context.CancelFunc
ControllerManagersLock sync.Mutex

mgr *manager.Manager
mgr *manager.Manager
RootResourceManager *utils.ResourceManager
}

func isRootCluster(cluster *clusterlinkv1alpha1.Cluster) bool {
Expand Down Expand Up @@ -254,17 +255,18 @@ func (c *ClusterController) setupControllers(m *manager.Manager, cluster *cluste
return fmt.Errorf("error starting %s: %v", controllers.NodeLeaseControllerName, err)
}

serviceImportController := &controllers.ServiceImportController{
Client: mgr.GetClient(),
Master: c.Root,
EventRecorder: mgr.GetEventRecorderFor(controllers.MemberServiceImportControllerName),
Logger: mgr.GetLogger(),
ClusterNodeName: cluster.Name,
KosmosClient: kosmosClient,
serviceImportController := &mcs.ServiceImportController{
LeafClient: mgr.GetClient(),
RootClient: c.Root,
RootKosmosClient: kosmosClient,
EventRecorder: mgr.GetEventRecorderFor(mcs.LeafServiceImportControllerName),
Logger: mgr.GetLogger(),
LeafNodeName: cluster.Name,
RootResourceManager: c.RootResourceManager,
}

if err := serviceImportController.AddController(mgr); err != nil {
return fmt.Errorf("error starting %s: %v", controllers.MemberServiceImportControllerName, err)
return fmt.Errorf("error starting %s: %v", mcs.LeafServiceImportControllerName, err)
}

// TODO Consider moving up to the same level as cluster-controller, add controllers after mgr is started may cause problems ?
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package clusterManager
package mcs

import (
"context"
Expand Down Expand Up @@ -29,9 +29,9 @@ import (

const ServiceExportControllerName = "service-export-controller"

// ServiceExportController watches serviceExport in master and annotated the endpointSlice
// ServiceExportController watches serviceExport in root cluster and annotated the endpointSlice
type ServiceExportController struct {
Master client.Client
RootClient client.Client
EventRecorder record.EventRecorder
Logger logr.Logger
}
Expand All @@ -43,7 +43,7 @@ func (c *ServiceExportController) Reconcile(ctx context.Context, request reconci
}()

serviceExport := &mcsv1alpha1.ServiceExport{}
if err := c.Master.Get(ctx, request.NamespacedName, serviceExport); err != nil {
if err := c.RootClient.Get(ctx, request.NamespacedName, serviceExport); err != nil {
// The serviceExport no longer exist, in which case we stop processing.
if apierrors.IsNotFound(err) {
return controllerruntime.Result{}, nil
Expand Down Expand Up @@ -115,7 +115,7 @@ func (c *ServiceExportController) removeAnnotation(ctx context.Context, export *
},
)
epsList := &discoveryv1.EndpointSliceList{}
err = c.Master.List(ctx, epsList, &client.ListOptions{
err = c.RootClient.List(ctx, epsList, &client.ListOptions{
Namespace: export.Namespace,
LabelSelector: selector,
})
Expand All @@ -132,7 +132,7 @@ func (c *ServiceExportController) removeAnnotation(ctx context.Context, export *
continue
}
helper.RemoveAnnotation(newEps, utils.ServiceExportLabelKey)
err = c.updateEndpointSlice(ctx, newEps, c.Master)
err = c.updateEndpointSlice(ctx, newEps, c.RootClient)
if err != nil {
klog.Errorf("Update endpointSlice (%s/%s) failed, Error: %v", export.Namespace, newEps.Name, err)
return err
Expand All @@ -142,9 +142,9 @@ func (c *ServiceExportController) removeAnnotation(ctx context.Context, export *
return nil
}

func (c *ServiceExportController) updateEndpointSlice(ctx context.Context, eps *discoveryv1.EndpointSlice, master client.Client) error {
func (c *ServiceExportController) updateEndpointSlice(ctx context.Context, eps *discoveryv1.EndpointSlice, rootClient client.Client) error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
updateErr := master.Update(ctx, eps)
updateErr := rootClient.Update(ctx, eps)
if updateErr == nil {
return nil
}
Expand All @@ -154,7 +154,7 @@ func (c *ServiceExportController) updateEndpointSlice(ctx context.Context, eps *
Namespace: eps.Namespace,
Name: eps.Name,
}
getErr := master.Get(ctx, key, newEps)
getErr := rootClient.Get(ctx, key, newEps)
if getErr == nil {
//Make a copy, so we don't mutate the shared cache
eps = newEps.DeepCopy()
Expand All @@ -174,7 +174,7 @@ func (c *ServiceExportController) syncServiceExport(ctx context.Context, export
},
)
epsList := &discoveryv1.EndpointSliceList{}
err = c.Master.List(ctx, epsList, &client.ListOptions{
err = c.RootClient.List(ctx, epsList, &client.ListOptions{
Namespace: export.Namespace,
LabelSelector: selector,
})
Expand All @@ -191,7 +191,7 @@ func (c *ServiceExportController) syncServiceExport(ctx context.Context, export
continue
}
helper.AddEndpointSliceAnnotation(newEps, utils.ServiceExportLabelKey, utils.MCSLabelValue)
err = c.updateEndpointSlice(ctx, newEps, c.Master)
err = c.updateEndpointSlice(ctx, newEps, c.RootClient)
if err != nil {
klog.Errorf("Update endpointSlice (%s/%s) failed, Error: %v", export.Namespace, newEps.Name, err)
return err
Expand Down
Loading

0 comments on commit 5418354

Please sign in to comment.