diff --git a/cmd/clustertree/cluster-manager/app/manager.go b/cmd/clustertree/cluster-manager/app/manager.go index 3e9cd5b90..3d1dd85e5 100644 --- a/cmd/clustertree/cluster-manager/app/manager.go +++ b/cmd/clustertree/cluster-manager/app/manager.go @@ -15,6 +15,7 @@ import ( "github.com/kosmos.io/kosmos/cmd/clustertree/cluster-manager/app/options" clusterManager "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager" + "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/mcs" "github.com/kosmos.io/kosmos/pkg/scheme" "github.com/kosmos.io/kosmos/pkg/sharedcli/klogflag" "github.com/kosmos.io/kosmos/pkg/utils" @@ -62,6 +63,7 @@ func run(ctx context.Context, opts *options.Options) error { config.QPS = opts.KubernetesOptions.QPS config.Burst = opts.KubernetesOptions.Burst } + // init root client rootClient, err := utils.NewClientFromConfigPath(opts.KubernetesOptions.KubeConfig, configOptFunc) if err != nil { @@ -96,25 +98,25 @@ func run(ctx context.Context, opts *options.Options) error { // add cluster controller ClusterController := clusterManager.ClusterController{ - Root: mgr.GetClient(), - RootDynamic: dynamicClient, - RootClient: rootClient, - EventRecorder: mgr.GetEventRecorderFor(clusterManager.ControllerName), - ConfigOptFunc: configOptFunc, - Options: opts, + Root: mgr.GetClient(), + RootDynamic: dynamicClient, + RootClient: rootClient, + EventRecorder: mgr.GetEventRecorderFor(clusterManager.ControllerName), + Options: opts, + RootResourceManager: rootResourceManager, } if err = ClusterController.SetupWithManager(mgr); err != nil { return fmt.Errorf("error starting %s: %v", clusterManager.ControllerName, err) } // add serviceExport controller - ServiceExportController := clusterManager.ServiceExportController{ - Master: mgr.GetClient(), - EventRecorder: mgr.GetEventRecorderFor(clusterManager.ServiceExportControllerName), + ServiceExportController := mcs.ServiceExportController{ + RootClient: mgr.GetClient(), + EventRecorder: mgr.GetEventRecorderFor(mcs.ServiceExportControllerName), Logger: mgr.GetLogger(), } if err = ServiceExportController.SetupWithManager(mgr); err != nil { - return fmt.Errorf("error starting %s: %v", clusterManager.ServiceExportControllerName, err) + return fmt.Errorf("error starting %s: %v", mcs.ServiceExportControllerName, err) } GlobalDaemonSetService := &GlobalDaemonSetService{ diff --git a/pkg/clustertree/cluster-manager/cluster_controller.go b/pkg/clustertree/cluster-manager/cluster_controller.go index ddf71e980..0a6649937 100644 --- a/pkg/clustertree/cluster-manager/cluster_controller.go +++ b/pkg/clustertree/cluster-manager/cluster_controller.go @@ -30,6 +30,7 @@ import ( "github.com/kosmos.io/kosmos/cmd/clustertree/cluster-manager/app/options" clusterlinkv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers" + "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/mcs" podcontrollers "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pod" kosmosversioned "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/scheme" @@ -56,13 +57,13 @@ type ClusterController struct { EventRecorder record.EventRecorder Logger logr.Logger Options *options.Options - ConfigOptFunc func(config *rest.Config) ControllerManagers map[string]*manager.Manager ManagerCancelFuncs map[string]*context.CancelFunc ControllerManagersLock sync.Mutex - mgr *manager.Manager + mgr *manager.Manager + RootResourceManager *utils.ResourceManager } func isRootCluster(cluster *clusterlinkv1alpha1.Cluster) bool { @@ -254,17 +255,18 @@ func (c *ClusterController) setupControllers(m *manager.Manager, cluster *cluste return fmt.Errorf("error starting %s: %v", controllers.NodeLeaseControllerName, err) } - serviceImportController := &controllers.ServiceImportController{ - Client: mgr.GetClient(), - Master: c.Root, - EventRecorder: mgr.GetEventRecorderFor(controllers.MemberServiceImportControllerName), - Logger: mgr.GetLogger(), - ClusterNodeName: cluster.Name, - KosmosClient: kosmosClient, + serviceImportController := &mcs.ServiceImportController{ + LeafClient: mgr.GetClient(), + RootClient: c.Root, + RootKosmosClient: kosmosClient, + EventRecorder: mgr.GetEventRecorderFor(mcs.LeafServiceImportControllerName), + Logger: mgr.GetLogger(), + LeafNodeName: cluster.Name, + RootResourceManager: c.RootResourceManager, } if err := serviceImportController.AddController(mgr); err != nil { - return fmt.Errorf("error starting %s: %v", controllers.MemberServiceImportControllerName, err) + return fmt.Errorf("error starting %s: %v", mcs.LeafServiceImportControllerName, err) } // TODO Consider moving up to the same level as cluster-controller, add controllers after mgr is started may cause problems ? diff --git a/pkg/clustertree/cluster-manager/serviceexport_controller.go b/pkg/clustertree/cluster-manager/controllers/mcs/serviceexport_controller.go similarity index 90% rename from pkg/clustertree/cluster-manager/serviceexport_controller.go rename to pkg/clustertree/cluster-manager/controllers/mcs/serviceexport_controller.go index 4b4b262f9..9c522122c 100644 --- a/pkg/clustertree/cluster-manager/serviceexport_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/mcs/serviceexport_controller.go @@ -1,4 +1,4 @@ -package clusterManager +package mcs import ( "context" @@ -29,9 +29,9 @@ import ( const ServiceExportControllerName = "service-export-controller" -// ServiceExportController watches serviceExport in master and annotated the endpointSlice +// ServiceExportController watches serviceExport in root cluster and annotated the endpointSlice type ServiceExportController struct { - Master client.Client + RootClient client.Client EventRecorder record.EventRecorder Logger logr.Logger } @@ -43,7 +43,7 @@ func (c *ServiceExportController) Reconcile(ctx context.Context, request reconci }() serviceExport := &mcsv1alpha1.ServiceExport{} - if err := c.Master.Get(ctx, request.NamespacedName, serviceExport); err != nil { + if err := c.RootClient.Get(ctx, request.NamespacedName, serviceExport); err != nil { // The serviceExport no longer exist, in which case we stop processing. if apierrors.IsNotFound(err) { return controllerruntime.Result{}, nil @@ -115,7 +115,7 @@ func (c *ServiceExportController) removeAnnotation(ctx context.Context, export * }, ) epsList := &discoveryv1.EndpointSliceList{} - err = c.Master.List(ctx, epsList, &client.ListOptions{ + err = c.RootClient.List(ctx, epsList, &client.ListOptions{ Namespace: export.Namespace, LabelSelector: selector, }) @@ -132,7 +132,7 @@ func (c *ServiceExportController) removeAnnotation(ctx context.Context, export * continue } helper.RemoveAnnotation(newEps, utils.ServiceExportLabelKey) - err = c.updateEndpointSlice(ctx, newEps, c.Master) + err = c.updateEndpointSlice(ctx, newEps, c.RootClient) if err != nil { klog.Errorf("Update endpointSlice (%s/%s) failed, Error: %v", export.Namespace, newEps.Name, err) return err @@ -142,9 +142,9 @@ func (c *ServiceExportController) removeAnnotation(ctx context.Context, export * return nil } -func (c *ServiceExportController) updateEndpointSlice(ctx context.Context, eps *discoveryv1.EndpointSlice, master client.Client) error { +func (c *ServiceExportController) updateEndpointSlice(ctx context.Context, eps *discoveryv1.EndpointSlice, rootClient client.Client) error { return retry.RetryOnConflict(retry.DefaultRetry, func() error { - updateErr := master.Update(ctx, eps) + updateErr := rootClient.Update(ctx, eps) if updateErr == nil { return nil } @@ -154,7 +154,7 @@ func (c *ServiceExportController) updateEndpointSlice(ctx context.Context, eps * Namespace: eps.Namespace, Name: eps.Name, } - getErr := master.Get(ctx, key, newEps) + getErr := rootClient.Get(ctx, key, newEps) if getErr == nil { //Make a copy, so we don't mutate the shared cache eps = newEps.DeepCopy() @@ -174,7 +174,7 @@ func (c *ServiceExportController) syncServiceExport(ctx context.Context, export }, ) epsList := &discoveryv1.EndpointSliceList{} - err = c.Master.List(ctx, epsList, &client.ListOptions{ + err = c.RootClient.List(ctx, epsList, &client.ListOptions{ Namespace: export.Namespace, LabelSelector: selector, }) @@ -191,7 +191,7 @@ func (c *ServiceExportController) syncServiceExport(ctx context.Context, export continue } helper.AddEndpointSliceAnnotation(newEps, utils.ServiceExportLabelKey, utils.MCSLabelValue) - err = c.updateEndpointSlice(ctx, newEps, c.Master) + err = c.updateEndpointSlice(ctx, newEps, c.RootClient) if err != nil { klog.Errorf("Update endpointSlice (%s/%s) failed, Error: %v", export.Namespace, newEps.Name, err) return err diff --git a/pkg/clustertree/cluster-manager/controllers/serviceimport_controller.go b/pkg/clustertree/cluster-manager/controllers/mcs/serviceimport_controller.go similarity index 76% rename from pkg/clustertree/cluster-manager/controllers/serviceimport_controller.go rename to pkg/clustertree/cluster-manager/controllers/mcs/serviceimport_controller.go index 07156e180..d9c69b95a 100644 --- a/pkg/clustertree/cluster-manager/controllers/serviceimport_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/mcs/serviceimport_controller.go @@ -1,4 +1,4 @@ -package controllers +package mcs import ( "context" @@ -28,36 +28,37 @@ import ( "github.com/kosmos.io/kosmos/pkg/utils/keys" ) -const MemberServiceImportControllerName = "member-service-import-controller" +const LeafServiceImportControllerName = "leaf-service-import-controller" -// ServiceImportController watches serviceImport in member node and sync service and endpointSlice in master +// ServiceImportController watches serviceImport in leaf node and sync service and endpointSlice in root cluster type ServiceImportController struct { - Client client.Client - Master client.Client - KosmosClient kosmosversioned.Interface - ClusterNodeName string - EventRecorder record.EventRecorder - Logger logr.Logger - processor utils.AsyncWorker - masterResourceManager *utils.ResourceManager - ctx context.Context + LeafClient client.Client + // TODO should not use client.Client for root cluster,because it will create a new informerFactory + RootClient client.Client + RootKosmosClient kosmosversioned.Interface + LeafNodeName string + EventRecorder record.EventRecorder + Logger logr.Logger + processor utils.AsyncWorker + RootResourceManager *utils.ResourceManager + ctx context.Context } func (c *ServiceImportController) AddController(mgr manager.Manager) error { if err := mgr.Add(c); err != nil { - klog.Errorf("Unable to create %s Error: %v", MemberServiceImportControllerName, err) + klog.Errorf("Unable to create %s Error: %v", LeafServiceImportControllerName, err) } return nil } func (c *ServiceImportController) Start(ctx context.Context) error { - klog.Infof("Starting %s", MemberServiceImportControllerName) - defer klog.Infof("Stop %s as process done.", MemberServiceImportControllerName) + klog.Infof("Starting %s", LeafServiceImportControllerName) + defer klog.Infof("Stop %s as process done.", LeafServiceImportControllerName) opt := utils.Options{ - Name: MemberServiceImportControllerName, + Name: LeafServiceImportControllerName, KeyFunc: func(obj interface{}) (utils.QueueKey, error) { - // 不关心队列中的GVK + // Don't care about the GVK in the queue return keys.NamespaceWideKeyFunc(obj) }, ReconcileFunc: c.Reconcile, @@ -65,7 +66,7 @@ func (c *ServiceImportController) Start(ctx context.Context) error { c.processor = utils.NewAsyncWorker(opt) c.ctx = ctx - serviceImportInformerFactory := externalversions.NewSharedInformerFactory(c.KosmosClient, 0) + serviceImportInformerFactory := externalversions.NewSharedInformerFactory(c.RootKosmosClient, 0) serviceImportInformer := serviceImportInformerFactory.Multicluster().V1alpha1().ServiceImports() _, err := serviceImportInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.OnAdd, @@ -76,7 +77,7 @@ func (c *ServiceImportController) Start(ctx context.Context) error { return err } - _, err = c.masterResourceManager.EndpointSliceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + _, err = c.RootResourceManager.EndpointSliceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.OnEpsAdd, UpdateFunc: c.OnEpsUpdate, DeleteFunc: c.OnEpsDelete, @@ -100,18 +101,18 @@ func (c *ServiceImportController) Reconcile(key utils.QueueKey) error { klog.Error("invalid key") return fmt.Errorf("invalid key") } - klog.V(4).Infof("============ %s starts to reconcile %s in cluster %s ============", MemberServiceImportControllerName, clusterWideKey.NamespaceKey(), c.ClusterNodeName) + klog.V(4).Infof("============ %s starts to reconcile %s in cluster %s ============", LeafServiceImportControllerName, clusterWideKey.NamespaceKey(), c.LeafNodeName) defer func() { - klog.V(4).Infof("============ %s has been reconciled in cluster %s =============", clusterWideKey.NamespaceKey(), c.ClusterNodeName) + klog.V(4).Infof("============ %s has been reconciled in cluster %s =============", clusterWideKey.NamespaceKey(), c.LeafNodeName) }() serviceImport := &mcsv1alpha1.ServiceImport{} - if err := c.Client.Get(c.ctx, types.NamespacedName{Namespace: clusterWideKey.Namespace, Name: clusterWideKey.Name}, serviceImport); err != nil { + if err := c.LeafClient.Get(c.ctx, types.NamespacedName{Namespace: clusterWideKey.Namespace, Name: clusterWideKey.Name}, serviceImport); err != nil { // The serviceImport no longer exist, in which case we stop processing. if apierrors.IsNotFound(err) { return nil } - klog.Errorf("Get %s in cluster %s failed, Error: %v", clusterWideKey.NamespaceKey(), c.ClusterNodeName, err) + klog.Errorf("Get %s in cluster %s failed, Error: %v", clusterWideKey.NamespaceKey(), c.LeafNodeName, err) return err } @@ -132,12 +133,12 @@ func (c *ServiceImportController) Reconcile(key utils.QueueKey) error { func (c *ServiceImportController) cleanupServiceAndEndpointSlice(ctx context.Context, namespace, name string) error { service := &corev1.Service{} - if err := c.Client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, service); err != nil { + if err := c.LeafClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, service); err != nil { if apierrors.IsNotFound(err) { klog.V(4).Infof("ServiceImport %s/%s is deleting and Service %s/%s is not found, ignore it", namespace, name, namespace, name) return nil } - klog.Errorf("ServiceImport %s/%s is deleting but clean up service in cluster %s failed, Error: %v", namespace, name, c.ClusterNodeName, err) + klog.Errorf("ServiceImport %s/%s is deleting but clean up service in cluster %s failed, Error: %v", namespace, name, c.LeafNodeName, err) return err } @@ -146,17 +147,17 @@ func (c *ServiceImportController) cleanupServiceAndEndpointSlice(ctx context.Con return nil } - if err := c.Client.Delete(ctx, service); err != nil { + if err := c.LeafClient.Delete(ctx, service); err != nil { if apierrors.IsNotFound(err) { klog.V(4).Infof("ServiceImport %s/%s is deleting and Service %s/%s is not found, ignore it", namespace, name, namespace, name) return nil } - klog.Errorf("ServiceImport %s/%s is deleting but clean up service in cluster %s failed, Error: %v", namespace, name, c.ClusterNodeName, err) + klog.Errorf("ServiceImport %s/%s is deleting but clean up service in cluster %s failed, Error: %v", namespace, name, c.LeafNodeName, err) return err } endpointSlice := &discoveryv1.EndpointSlice{} - err := c.Client.DeleteAllOf(ctx, endpointSlice, &client.DeleteAllOfOptions{ + err := c.LeafClient.DeleteAllOf(ctx, endpointSlice, &client.DeleteAllOfOptions{ ListOptions: client.ListOptions{ Namespace: namespace, LabelSelector: labels.SelectorFromSet(map[string]string{ @@ -169,30 +170,30 @@ func (c *ServiceImportController) cleanupServiceAndEndpointSlice(ctx context.Con klog.V(4).Infof("ServiceImport %s/%s is deleting and Service %s/%s is not found, ignore it", namespace, name, namespace, name) return nil } - klog.Errorf("ServiceImport %s/%s is deleting but clean up service in cluster %s failed, Error: %v", namespace, name, c.ClusterNodeName, err) + klog.Errorf("ServiceImport %s/%s is deleting but clean up service in cluster %s failed, Error: %v", namespace, name, c.LeafNodeName, err) return err } return nil } func (c *ServiceImportController) syncServiceImport(ctx context.Context, serviceImport *mcsv1alpha1.ServiceImport) error { - masterService := &corev1.Service{} - if err := c.Master.Get(ctx, types.NamespacedName{Namespace: serviceImport.Namespace, Name: serviceImport.Name}, masterService); err != nil { + rootService := &corev1.Service{} + if err := c.RootClient.Get(ctx, types.NamespacedName{Namespace: serviceImport.Namespace, Name: serviceImport.Name}, rootService); err != nil { if apierrors.IsNotFound(err) { - klog.V(4).Infof("Service %s/%s is not found in master, ignore it", serviceImport.Namespace, serviceImport.Name) + klog.V(4).Infof("Service %s/%s is not found in root cluster, ignore it", serviceImport.Namespace, serviceImport.Name) return nil } - klog.Errorf("Get Service %s/%s failed from master", serviceImport.Namespace, serviceImport.Name, err) + klog.Errorf("Get Service %s/%s failed from root cluster", serviceImport.Namespace, serviceImport.Name, err) return err } - if err := c.importServiceHandler(ctx, masterService, serviceImport); err != nil { - klog.Errorf("Create or update service %s/%s in client cluster %s failed, error: %v", serviceImport.Namespace, serviceImport.Name, c.ClusterNodeName, err) + if err := c.importServiceHandler(ctx, rootService, serviceImport); err != nil { + klog.Errorf("Create or update service %s/%s in client cluster %s failed, error: %v", serviceImport.Namespace, serviceImport.Name, c.LeafNodeName, err) return err } epsList := &discoveryv1.EndpointSliceList{} - err := c.Master.List(ctx, epsList, &client.ListOptions{ + err := c.RootClient.List(ctx, epsList, &client.ListOptions{ LabelSelector: labels.SelectorFromSet(map[string]string{utils.ServiceKey: serviceImport.Name}), Namespace: serviceImport.Namespace, }) @@ -220,7 +221,7 @@ func (c *ServiceImportController) syncServiceImport(ctx context.Context, service addressString := strings.Join(addresses, ",") helper.AddServiceImportAnnotation(serviceImport, utils.ServiceEndpointsKey, addressString) if err = c.updateServiceImport(ctx, serviceImport, addressString); err != nil { - klog.Errorf("Update serviceImport (%s/%s) annotation in cluster %s failed, Error: %v", serviceImport.Namespace, serviceImport.Name, c.ClusterNodeName, err) + klog.Errorf("Update serviceImport (%s/%s) annotation in cluster %s failed, Error: %v", serviceImport.Namespace, serviceImport.Name, c.LeafNodeName, err) return err } @@ -241,16 +242,16 @@ func (c *ServiceImportController) importEndpointSliceHandler(ctx context.Context func (c *ServiceImportController) createOrUpdateEndpointSliceInClient(ctx context.Context, endpointSlice *discoveryv1.EndpointSlice, serviceName string) error { newSlice := retainEndpointSlice(endpointSlice, serviceName) - if err := c.Client.Create(ctx, endpointSlice); err != nil { + if err := c.LeafClient.Create(ctx, newSlice); err != nil { if apierrors.IsAlreadyExists(err) { err = c.updateEndpointSlice(ctx, newSlice) if err != nil { - klog.Errorf("Update endpointSlice(%s/%s) in cluster %s failed, Error: %v", newSlice.Namespace, newSlice.Name, c.ClusterNodeName, err) + klog.Errorf("Update endpointSlice(%s/%s) in cluster %s failed, Error: %v", newSlice.Namespace, newSlice.Name, c.LeafNodeName, err) return err } return nil } - klog.Errorf("Create endpointSlice(%s/%s) in cluster %s failed, Error: %v", newSlice.Namespace, newSlice.Name, c.ClusterNodeName, err) + klog.Errorf("Create endpointSlice(%s/%s) in cluster %s failed, Error: %v", newSlice.Namespace, newSlice.Name, c.LeafNodeName, err) return err } return nil @@ -259,18 +260,18 @@ func (c *ServiceImportController) createOrUpdateEndpointSliceInClient(ctx contex func (c *ServiceImportController) updateEndpointSlice(ctx context.Context, endpointSlice *discoveryv1.EndpointSlice) error { newEps := endpointSlice.DeepCopy() return retry.RetryOnConflict(retry.DefaultRetry, func() error { - updateErr := c.Client.Update(ctx, newEps) + updateErr := c.LeafClient.Update(ctx, newEps) if updateErr == nil { return nil } updated := &discoveryv1.EndpointSlice{} - getErr := c.Client.Get(ctx, types.NamespacedName{Namespace: newEps.Namespace, Name: newEps.Name}, updated) + getErr := c.LeafClient.Get(ctx, types.NamespacedName{Namespace: newEps.Namespace, Name: newEps.Name}, updated) if getErr == nil { //Make a copy, so we don't mutate the shared cache newEps = updated.DeepCopy() } else { - klog.Errorf("Failed to get updated endpointSlice %s/%s in cluster %s: %v", endpointSlice.Namespace, endpointSlice.Name, c.ClusterNodeName, getErr) + klog.Errorf("Failed to get updated endpointSlice %s/%s in cluster %s: %v", endpointSlice.Namespace, endpointSlice.Name, c.LeafNodeName, getErr) } return updateErr @@ -312,8 +313,8 @@ func clearEndpointSlice(slice *discoveryv1.EndpointSlice, disconnectedAddress [] slice.Endpoints = newEndpoints } -func (c *ServiceImportController) importServiceHandler(ctx context.Context, masterService *corev1.Service, serviceImport *mcsv1alpha1.ServiceImport) error { - clientService := generateService(masterService, serviceImport) +func (c *ServiceImportController) importServiceHandler(ctx context.Context, rootService *corev1.Service, serviceImport *mcsv1alpha1.ServiceImport) error { + clientService := generateService(rootService, serviceImport) err := c.createOrUpdateServiceInClient(ctx, clientService) if err != nil { return err @@ -323,24 +324,24 @@ func (c *ServiceImportController) importServiceHandler(ctx context.Context, mast func (c *ServiceImportController) createOrUpdateServiceInClient(ctx context.Context, service *corev1.Service) error { oldService := &corev1.Service{} - if err := c.Client.Get(ctx, types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, oldService); err != nil { + if err := c.LeafClient.Get(ctx, types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, oldService); err != nil { if apierrors.IsNotFound(err) { - if err = c.Client.Create(ctx, service); err != nil { - klog.Errorf("Create serviceImport service(%s/%s) in client cluster %s failed, Error: %v", service.Namespace, service.Name, c.ClusterNodeName, err) + if err = c.LeafClient.Create(ctx, service); err != nil { + klog.Errorf("Create serviceImport service(%s/%s) in client cluster %s failed, Error: %v", service.Namespace, service.Name, c.LeafNodeName, err) return err } else { return nil } } - klog.Errorf("Get service(%s/%s) from in cluster %s failed, Error: %v", service.Namespace, service.Name, c.ClusterNodeName, err) + klog.Errorf("Get service(%s/%s) from in cluster %s failed, Error: %v", service.Namespace, service.Name, c.LeafNodeName, err) return err } retainServiceFields(oldService, service) - if err := c.Client.Update(ctx, service); err != nil { + if err := c.LeafClient.Update(ctx, service); err != nil { if err != nil { - klog.Errorf("Update serviceImport service(%s/%s) in cluster %s failed, Error: %v", service.Namespace, service.Name, c.ClusterNodeName, err) + klog.Errorf("Update serviceImport service(%s/%s) in cluster %s failed, Error: %v", service.Namespace, service.Name, c.LeafNodeName, err) return err } } @@ -350,18 +351,18 @@ func (c *ServiceImportController) createOrUpdateServiceInClient(ctx context.Cont func (c *ServiceImportController) updateServiceImport(ctx context.Context, serviceImport *mcsv1alpha1.ServiceImport, addresses string) error { newImport := serviceImport.DeepCopy() return retry.RetryOnConflict(retry.DefaultRetry, func() error { - updateErr := c.Client.Update(ctx, newImport) + updateErr := c.LeafClient.Update(ctx, newImport) if updateErr == nil { return nil } updated := &mcsv1alpha1.ServiceImport{} - getErr := c.Client.Get(ctx, types.NamespacedName{Namespace: newImport.Namespace, Name: newImport.Name}, updated) + getErr := c.LeafClient.Get(ctx, types.NamespacedName{Namespace: newImport.Namespace, Name: newImport.Name}, updated) if getErr == nil { // Make a copy, so we don't mutate the shared cache newImport = updated.DeepCopy() helper.AddServiceImportAnnotation(newImport, utils.ServiceEndpointsKey, addresses) } else { - klog.Errorf("Failed to get updated serviceImport %s/%s in cluster %s,Error : %v", newImport.Namespace, serviceImport.Name, c.ClusterNodeName, getErr) + klog.Errorf("Failed to get updated serviceImport %s/%s in cluster %s,Error : %v", newImport.Namespace, serviceImport.Name, c.LeafNodeName, getErr) } return updateErr }) @@ -394,7 +395,7 @@ func (c *ServiceImportController) OnDelete(obj interface{}) { func (c *ServiceImportController) OnEpsAdd(obj interface{}) { eps := obj.(*discoveryv1.EndpointSlice) if helper.HasAnnotation(eps.ObjectMeta, utils.ServiceExportLabelKey) { - serviceExportName, _ := helper.GetAnnotationValue(eps.ObjectMeta, utils.ServiceKey) + serviceExportName := helper.GetLabelOrAnnotationValue(eps.GetLabels(), utils.ServiceKey) key := keys.ClusterWideKey{} key.Namespace = eps.Namespace key.Name = serviceExportName @@ -407,7 +408,7 @@ func (c *ServiceImportController) OnEpsUpdate(old interface{}, new interface{}) oldSlice := old.(*discoveryv1.EndpointSlice) isRemoveAnnotationEvent := helper.HasAnnotation(oldSlice.ObjectMeta, utils.ServiceExportLabelKey) && !helper.HasAnnotation(newSlice.ObjectMeta, utils.ServiceExportLabelKey) if helper.HasAnnotation(newSlice.ObjectMeta, utils.ServiceExportLabelKey) || isRemoveAnnotationEvent { - serviceExportName, _ := helper.GetAnnotationValue(newSlice.ObjectMeta, utils.ServiceKey) + serviceExportName := helper.GetLabelOrAnnotationValue(newSlice.GetLabels(), utils.ServiceKey) key := keys.ClusterWideKey{} key.Namespace = newSlice.Namespace key.Name = serviceExportName @@ -418,7 +419,7 @@ func (c *ServiceImportController) OnEpsUpdate(old interface{}, new interface{}) func (c *ServiceImportController) OnEpsDelete(obj interface{}) { eps := obj.(*discoveryv1.EndpointSlice) if helper.HasAnnotation(eps.ObjectMeta, utils.ServiceExportLabelKey) { - serviceExportName, _ := helper.GetAnnotationValue(eps.ObjectMeta, utils.ServiceKey) + serviceExportName := helper.GetLabelOrAnnotationValue(eps.GetLabels(), utils.ServiceKey) key := keys.ClusterWideKey{} key.Namespace = eps.Namespace key.Name = serviceExportName diff --git a/pkg/utils/k8s.go b/pkg/utils/k8s.go index fcc35cc62..15a069e21 100644 --- a/pkg/utils/k8s.go +++ b/pkg/utils/k8s.go @@ -3,9 +3,6 @@ package utils import ( "encoding/json" "fmt" - "os" - "os/signal" - "syscall" jsonpatch "github.com/evanphx/json-patch" jsonpatch1 "github.com/mattbaird/jsonpatch" @@ -63,24 +60,6 @@ func CreateJSONPatch(original, new interface{}) ([]byte, error) { return patchBytes, nil } -var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} -var onlyOneSignalHandler = make(chan struct{}) -var shutdownHandler chan os.Signal - -func SetupSignalHandler() <-chan struct{} { - close(onlyOneSignalHandler) // panics when called twice - shutdownHandler = make(chan os.Signal, 2) - stop := make(chan struct{}) - signal.Notify(shutdownHandler, shutdownSignals...) - go func() { - <-shutdownHandler - close(stop) - <-shutdownHandler - os.Exit(1) // second signal. Exit directly. - }() - return stop -} - type Opts func(*rest.Config) func NewClient(configPath string, opts ...Opts) (kubernetes.Interface, error) { @@ -105,7 +84,7 @@ func NewClient(configPath string, opts ...Opts) (kubernetes.Interface, error) { client, err := kubernetes.NewForConfig(config) if err != nil { - return nil, fmt.Errorf("could not create client for master cluster: %v", err) + return nil, fmt.Errorf("could not create client for root cluster: %v", err) } return client, nil } @@ -134,7 +113,7 @@ func NewClientFromByte(kubeConfig []byte, opts ...Opts) (kubernetes.Interface, e client, err := kubernetes.NewForConfig(config) if err != nil { - return nil, fmt.Errorf("could not create client for master cluster: %v", err) + return nil, fmt.Errorf("could not create client for root cluster: %v", err) } return client, nil } @@ -292,7 +271,7 @@ func NewMetricClient(configPath string, opts ...Opts) (versioned.Interface, erro metricClient, err := versioned.NewForConfig(config) if err != nil { - return nil, fmt.Errorf("could not create client for master cluster: %v", err) + return nil, fmt.Errorf("could not create client for root cluster: %v", err) } return metricClient, nil } @@ -321,7 +300,7 @@ func NewMetricClientFromByte(kubeConfig []byte, opts ...Opts) (versioned.Interfa metricClient, err := versioned.NewForConfig(config) if err != nil { - return nil, fmt.Errorf("could not create client for master cluster: %v", err) + return nil, fmt.Errorf("could not create client for root cluster: %v", err) } return metricClient, nil } diff --git a/pkg/utils/node_client.go b/pkg/utils/node_client.go index 1e87c9352..4460aa8de 100644 --- a/pkg/utils/node_client.go +++ b/pkg/utils/node_client.go @@ -29,12 +29,7 @@ type ClusterDynamicClient struct { } // NewClusterKubeClient create a kube client for a member cluster -func NewClusterKubeClient(client client.Client, ClusterName string, opts Opts) (*ClusterKubeClient, error) { - config, err := buildConfig(client, ClusterName, opts) - if err != nil { - return nil, err - } - +func NewClusterKubeClient(config *rest.Config, ClusterName string, opts Opts) (*ClusterKubeClient, error) { kubeClient, err := kubernetes.NewForConfig(config) if err != nil { return nil, err @@ -47,12 +42,7 @@ func NewClusterKubeClient(client client.Client, ClusterName string, opts Opts) ( } // NewClusterKosmosClient create a dynamic client for a member cluster -func NewClusterKosmosClient(client client.Client, ClusterName string, opts Opts) (*ClusterKosmosClient, error) { - config, err := buildConfig(client, ClusterName, opts) - if err != nil { - return nil, err - } - +func NewClusterKosmosClient(config *rest.Config, ClusterName string, opts Opts) (*ClusterKosmosClient, error) { kosmosClient, err := kosmosversioned.NewForConfig(config) if err != nil { return nil, err @@ -65,12 +55,7 @@ func NewClusterKosmosClient(client client.Client, ClusterName string, opts Opts) } // NewClusterDynamicClient create a kosmos crd client for a member cluster -func NewClusterDynamicClient(client client.Client, ClusterName string, opts Opts) (*ClusterDynamicClient, error) { - config, err := buildConfig(client, ClusterName, opts) - if err != nil { - return nil, err - } - +func NewClusterDynamicClient(config *rest.Config, ClusterName string, opts Opts) (*ClusterDynamicClient, error) { dynamicClient, err := dynamic.NewForConfig(config) if err != nil { return nil, err @@ -82,12 +67,7 @@ func NewClusterDynamicClient(client client.Client, ClusterName string, opts Opts }, nil } -func buildConfig(client client.Client, ClusterName string, opts Opts) (*rest.Config, error) { - cluster, err := GetCluster(client, ClusterName) - if err != nil { - return nil, err - } - +func BuildConfig(cluster *kosmosv1alpha1.Cluster, opts Opts) (*rest.Config, error) { config, err := NewConfigFromBytes(cluster.Spec.Kubeconfig, opts) if err != nil { return nil, err