From 71f742c27947891dda5ac2ae578d77f1d8c8d7a9 Mon Sep 17 00:00:00 2001 From: wangdepeng Date: Thu, 22 Aug 2024 20:01:42 +0800 Subject: [PATCH] feature: fix drain virtual cluster node error and fix api-server-external-service update failed Signed-off-by: wangdepeng --- .../global_node_controller.go | 10 ++++- .../workflow/task/task.go | 15 +++---- .../virtualcluster_init_controller.go | 9 ---- pkg/kubenest/controlplane/endpoint.go | 31 ++++--------- pkg/kubenest/init.go | 2 +- pkg/kubenest/tasks/anp.go | 2 +- pkg/kubenest/util/helper.go | 45 +++++++++++++++++++ pkg/kubenest/util/node.go | 3 +- pkg/kubenest/util/util.go | 9 ---- 9 files changed, 72 insertions(+), 54 deletions(-) diff --git a/pkg/kubenest/controller/global.node.controller/global_node_controller.go b/pkg/kubenest/controller/global.node.controller/global_node_controller.go index 855e3b51f..5932ac752 100644 --- a/pkg/kubenest/controller/global.node.controller/global_node_controller.go +++ b/pkg/kubenest/controller/global.node.controller/global_node_controller.go @@ -79,6 +79,14 @@ func (r *GlobalNodeController) SetupWithManager(mgr manager.Manager) error { return false }, })). + Watches(&source.Kind{Type: &v1alpha1.GlobalNode{}}, handler.EnqueueRequestsFromMapFunc(func(a client.Object) []reconcile.Request { + gn := a.(*v1alpha1.GlobalNode) + return []reconcile.Request{ + {NamespacedName: types.NamespacedName{ + Name: gn.Name, + }}, + } + })). // Watches(&source.Kind{Type: &v1.Node{}}, handler.EnqueueRequestsFromMapFunc(r.newNodeMapFunc())). Watches(&source.Kind{Type: &v1alpha1.VirtualCluster{}}, handler.EnqueueRequestsFromMapFunc(r.newVirtualClusterMapFunc())). Complete(r) @@ -132,7 +140,7 @@ func (r *GlobalNodeController) SyncTaint(ctx context.Context, globalNode *v1alph return nil } - if err := util.DrainNode(ctx, targetNode.Name, r.RootClientSet, &targetNode, env.GetDrainWaitSeconds()); err != nil { + if err := util.DrainNode(ctx, targetNode.Name, r.RootClientSet, &targetNode, env.GetDrainWaitSeconds(), true); err != nil { return err } return nil diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go index 8269fd821..e37fb13fe 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go @@ -112,7 +112,8 @@ func NewDrainHostNodeTask() Task { return nil, fmt.Errorf("get node %s failed: %s", to.NodeInfo.Name, err) } - if err := util.DrainNode(ctx, targetNode.Name, to.HostK8sClient, targetNode, env.GetDrainWaitSeconds()); err != nil { + if err := util.DrainNode(ctx, targetNode.Name, to.HostK8sClient, targetNode, env.GetDrainWaitSeconds(), true); err != nil { + klog.Warningf("drain node %s failed: %s, will force delete node", to.NodeInfo.Name, err) return nil, err } return nil, nil @@ -125,13 +126,6 @@ func NewDrainVirtualNodeTask() Task { return Task{ Name: "drain virtual-control-plane node", Retry: true, - // ErrorIgnore: true, - Skip: func(ctx context.Context, opt TaskOpt) bool { - if opt.Opt != nil { - return opt.Opt.KubeInKubeConfig.ForceDestroy - } - return false - }, Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { targetNode, err := to.VirtualK8sClient.CoreV1().Nodes().Get(ctx, to.NodeInfo.Name, metav1.GetOptions{}) if err != nil { @@ -141,8 +135,9 @@ func NewDrainVirtualNodeTask() Task { return nil, fmt.Errorf("get node %s failed: %s", to.NodeInfo.Name, err) } - if err := util.DrainNode(ctx, targetNode.Name, to.HostK8sClient, targetNode, env.GetDrainWaitSeconds()); err != nil { - return nil, err + if err := util.DrainNode(ctx, targetNode.Name, to.VirtualK8sClient, targetNode, env.GetDrainWaitSeconds(), false); err != nil { + klog.Warningf("drain node %s failed: %s, will force delete node", to.NodeInfo.Name, err) + return nil, nil } return nil, nil }, diff --git a/pkg/kubenest/controller/virtualcluster_init_controller.go b/pkg/kubenest/controller/virtualcluster_init_controller.go index 9474dd2ff..87986c447 100644 --- a/pkg/kubenest/controller/virtualcluster_init_controller.go +++ b/pkg/kubenest/controller/virtualcluster_init_controller.go @@ -654,15 +654,6 @@ func (c *VirtualClusterInitController) ensureAllPodsRunning(virtualCluster *v1al return nil } -func mapContains(big map[string]string, small map[string]string) bool { - for k, v := range small { - if bigV, ok := big[k]; !ok || bigV != v { - return false - } - } - return true -} - func GetHostPortPoolFromConfigMap(client kubernetes.Interface, ns, cmName, dataKey string) (*HostPortPool, error) { hostPorts, err := client.CoreV1().ConfigMaps(ns).Get(context.TODO(), cmName, metav1.GetOptions{}) if err != nil { diff --git a/pkg/kubenest/controlplane/endpoint.go b/pkg/kubenest/controlplane/endpoint.go index f23924305..0c7589e6f 100644 --- a/pkg/kubenest/controlplane/endpoint.go +++ b/pkg/kubenest/controlplane/endpoint.go @@ -61,6 +61,7 @@ func CreateOrUpdateApiServerExternalEndpoint(kubeClient kubernetes.Interface) er // Update the existing endpoint newEndpoint.SetResourceVersion(existingEndpoint.ResourceVersion) + newEndpoint.SetUID(existingEndpoint.UID) _, err = kubeClient.CoreV1().Endpoints(constants.DefaultNs).Update(context.TODO(), newEndpoint, metav1.UpdateOptions{}) if err != nil { klog.Error("update api-server-external-service endpoint failed", err) @@ -93,31 +94,17 @@ func CreateOrUpdateApiServerExternalService(kubeClient kubernetes.Interface) err if err := yaml.Unmarshal([]byte(apiServerExternalServiceBytes), &svc); err != nil { return fmt.Errorf("err when decoding api-server-external-service in virtual cluster: %w", err) } - - // Try to create the service - _, err = kubeClient.CoreV1().Services(constants.DefaultNs).Create(context.TODO(), &svc, metav1.CreateOptions{}) + _, err = kubeClient.CoreV1().Services(constants.DefaultNs).Get(context.TODO(), constants.ApiServerExternalService, metav1.GetOptions{}) if err != nil { - if !apierrors.IsAlreadyExists(err) { - return fmt.Errorf("failed to create api-server-external-service service: %w", err) - } - - // Service already exists, retrieve it - existingSvc, err := kubeClient.CoreV1().Services(constants.DefaultNs).Get(context.TODO(), constants.ApiServerExternalService, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("failed to get existing api-server-external-service service: %w", err) + if !apierrors.IsNotFound(err) { + // Try to create the service + _, err = kubeClient.CoreV1().Services(constants.DefaultNs).Create(context.TODO(), &svc, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("error when creating api-server-external-service: %w", err) + } } - - // Update the existing service - svc.ResourceVersion = existingSvc.ResourceVersion - _, err = kubeClient.CoreV1().Services(constants.DefaultNs).Update(context.TODO(), &svc, metav1.UpdateOptions{}) - if err != nil { - return fmt.Errorf("failed to update api-server-external-service service: %w", err) - } - klog.V(4).Info("successfully updated api-server-external-service service") - } else { - klog.V(4).Info("successfully created api-server-external-service service") } - + klog.V(4).Info("successfully created api-server-external-service service") return nil } diff --git a/pkg/kubenest/init.go b/pkg/kubenest/init.go index a6111e80a..ec9b2acdf 100644 --- a/pkg/kubenest/init.go +++ b/pkg/kubenest/init.go @@ -91,7 +91,7 @@ func UninstallPhase(opts *InitOptions) *workflow.Phase { destroyPhase.AppendTask(tasks.UninstallVirtualClusterServiceTask()) destroyPhase.AppendTask(tasks.UninstallCertsAndKubeconfigTask()) destroyPhase.AppendTask(tasks.DeleteEtcdPvcTask()) - destroyPhase.AppendTask(tasks.UninstallVirtualClusterProxyTask()) + //destroyPhase.AppendTask(tasks.UninstallVirtualClusterProxyTask()) destroyPhase.SetDataInitializer(func() (workflow.RunData, error) { return newRunData(opts) diff --git a/pkg/kubenest/tasks/anp.go b/pkg/kubenest/tasks/anp.go index 50d4bb73e..b44eb962c 100644 --- a/pkg/kubenest/tasks/anp.go +++ b/pkg/kubenest/tasks/anp.go @@ -228,7 +228,7 @@ func installAnpAgent(data InitData) error { actionFunc := func(ctx context.Context, c dynamic.Interface, u *unstructured.Unstructured) error { // create the object return apiclient.TryRunCommand(func() error { - return util.ApplyObject(vcClient, u) + return util.ReplaceObject(vcClient, u) }, 3) } return util.ForEachObjectInYAML(context.TODO(), vcClient, []byte(anpAgentManifestBytes), "", actionFunc) diff --git a/pkg/kubenest/util/helper.go b/pkg/kubenest/util/helper.go index 78e78915d..52a2a7a00 100644 --- a/pkg/kubenest/util/helper.go +++ b/pkg/kubenest/util/helper.go @@ -422,3 +422,48 @@ func ForEachObjectInYAML( } } } + +func ReplaceObject(dynamicClient dynamic.Interface, obj *unstructured.Unstructured) error { + gvk := obj.GroupVersionKind() + gvr, _ := meta.UnsafeGuessKindToResource(gvk) + namespace := obj.GetNamespace() + name := obj.GetName() + + klog.V(2).Infof("Replace %s, name: %s, namespace: %s", gvr.String(), name, namespace) + + resourceClient := dynamicClient.Resource(gvr).Namespace(namespace) + + // Get the existing resource + _, err := resourceClient.Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + if kerrors.IsNotFound(err) { + // If not found, create the resource + _, err = resourceClient.Create(context.TODO(), obj, metav1.CreateOptions{}) + if err != nil { + return err + } + klog.V(2).Infof("Created %s %s in namespace %s", gvr.String(), name, namespace) + return nil + } + return err + } + + // If found, delete the existing resource + err = resourceClient.Delete(context.TODO(), name, metav1.DeleteOptions{}) + if err != nil { + klog.V(2).Infof("Failed to delete existing %s %s: %v", gvr.String(), name, err) + return fmt.Errorf("failed to delete existing %s %s: %v", gvr.String(), name, err) + } + + klog.V(2).Infof("Deleted existing %s %s in namespace %s", gvr.String(), name, namespace) + + // Create the resource with the new object + _, err = resourceClient.Create(context.TODO(), obj, metav1.CreateOptions{}) + if err != nil { + klog.V(2).Infof("Failed to create %s %s: %v", gvr.String(), name, err) + return fmt.Errorf("failed to create %s %s: %v", gvr.String(), name, err) + } + + klog.V(2).Infof("Replaced %s %s in namespace %s", gvr.String(), name, namespace) + return nil +} diff --git a/pkg/kubenest/util/node.go b/pkg/kubenest/util/node.go index ff8f04267..3c7c3a936 100644 --- a/pkg/kubenest/util/node.go +++ b/pkg/kubenest/util/node.go @@ -22,7 +22,7 @@ func IsNodeReady(conditions []v1.NodeCondition) bool { } // DrainNode cordons and drains a node. -func DrainNode(ctx context.Context, nodeName string, client kubernetes.Interface, node *v1.Node, drainWaitSeconds int) error { +func DrainNode(ctx context.Context, nodeName string, client kubernetes.Interface, node *v1.Node, drainWaitSeconds int, isHostCluster bool) error { if client == nil { return fmt.Errorf("K8sClient not set") } @@ -40,6 +40,7 @@ func DrainNode(ctx context.Context, nodeName string, client kubernetes.Interface IgnoreAllDaemonSets: true, Out: os.Stdout, ErrOut: os.Stdout, + DisableEviction: !isHostCluster, // We want to proceed even when pods are using emptyDir volumes DeleteEmptyDirData: true, Timeout: time.Duration(drainWaitSeconds) * time.Second, diff --git a/pkg/kubenest/util/util.go b/pkg/kubenest/util/util.go index 34457f3a5..8117c1db3 100644 --- a/pkg/kubenest/util/util.go +++ b/pkg/kubenest/util/util.go @@ -206,15 +206,6 @@ func SecureRandomInt(n int) (int, error) { return int(randInt.Int64()), nil } -func MapContains(big map[string]string, small map[string]string) bool { - for k, v := range small { - if bigV, ok := big[k]; !ok || bigV != v { - return false - } - } - return true -} - func IsIPAvailable(ips, vipPool []string) (string, error) { for _, ip := range ips { if b, err := IsIPInRange(ip, vipPool); b && err == nil {