Skip to content

Commit

Permalink
Merge pull request kosmos-io#693 from village-way/main
Browse files Browse the repository at this point in the history
bugfix: fix drain virtual cluster node error and failed to create api-server-external-service endpoint
  • Loading branch information
duanmengkk authored Aug 26, 2024
2 parents c5b4e0f + 71f742c commit f65a1c4
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ func (r *GlobalNodeController) SetupWithManager(mgr manager.Manager) error {
return false
},
})).
Watches(&source.Kind{Type: &v1alpha1.GlobalNode{}}, handler.EnqueueRequestsFromMapFunc(func(a client.Object) []reconcile.Request {
gn := a.(*v1alpha1.GlobalNode)
return []reconcile.Request{
{NamespacedName: types.NamespacedName{
Name: gn.Name,
}},
}
})).
// Watches(&source.Kind{Type: &v1.Node{}}, handler.EnqueueRequestsFromMapFunc(r.newNodeMapFunc())).
Watches(&source.Kind{Type: &v1alpha1.VirtualCluster{}}, handler.EnqueueRequestsFromMapFunc(r.newVirtualClusterMapFunc())).
Complete(r)
Expand Down Expand Up @@ -132,7 +140,7 @@ func (r *GlobalNodeController) SyncTaint(ctx context.Context, globalNode *v1alph
return nil
}

if err := util.DrainNode(ctx, targetNode.Name, r.RootClientSet, &targetNode, env.GetDrainWaitSeconds()); err != nil {
if err := util.DrainNode(ctx, targetNode.Name, r.RootClientSet, &targetNode, env.GetDrainWaitSeconds(), true); err != nil {
return err
}
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ func NewDrainHostNodeTask() Task {
return nil, fmt.Errorf("get node %s failed: %s", to.NodeInfo.Name, err)
}

if err := util.DrainNode(ctx, targetNode.Name, to.HostK8sClient, targetNode, env.GetDrainWaitSeconds()); err != nil {
if err := util.DrainNode(ctx, targetNode.Name, to.HostK8sClient, targetNode, env.GetDrainWaitSeconds(), true); err != nil {
klog.Warningf("drain node %s failed: %s, will force delete node", to.NodeInfo.Name, err)
return nil, err
}
return nil, nil
Expand All @@ -125,13 +126,6 @@ func NewDrainVirtualNodeTask() Task {
return Task{
Name: "drain virtual-control-plane node",
Retry: true,
// ErrorIgnore: true,
Skip: func(ctx context.Context, opt TaskOpt) bool {
if opt.Opt != nil {
return opt.Opt.KubeInKubeConfig.ForceDestroy
}
return false
},
Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) {
targetNode, err := to.VirtualK8sClient.CoreV1().Nodes().Get(ctx, to.NodeInfo.Name, metav1.GetOptions{})
if err != nil {
Expand All @@ -141,8 +135,9 @@ func NewDrainVirtualNodeTask() Task {
return nil, fmt.Errorf("get node %s failed: %s", to.NodeInfo.Name, err)
}

if err := util.DrainNode(ctx, targetNode.Name, to.HostK8sClient, targetNode, env.GetDrainWaitSeconds()); err != nil {
return nil, err
if err := util.DrainNode(ctx, targetNode.Name, to.VirtualK8sClient, targetNode, env.GetDrainWaitSeconds(), false); err != nil {
klog.Warningf("drain node %s failed: %s, will force delete node", to.NodeInfo.Name, err)
return nil, nil
}
return nil, nil
},
Expand Down
9 changes: 0 additions & 9 deletions pkg/kubenest/controller/virtualcluster_init_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,15 +654,6 @@ func (c *VirtualClusterInitController) ensureAllPodsRunning(virtualCluster *v1al
return nil
}

func mapContains(big map[string]string, small map[string]string) bool {
for k, v := range small {
if bigV, ok := big[k]; !ok || bigV != v {
return false
}
}
return true
}

func GetHostPortPoolFromConfigMap(client kubernetes.Interface, ns, cmName, dataKey string) (*HostPortPool, error) {
hostPorts, err := client.CoreV1().ConfigMaps(ns).Get(context.TODO(), cmName, metav1.GetOptions{})
if err != nil {
Expand Down
31 changes: 9 additions & 22 deletions pkg/kubenest/controlplane/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func CreateOrUpdateApiServerExternalEndpoint(kubeClient kubernetes.Interface) er

// Update the existing endpoint
newEndpoint.SetResourceVersion(existingEndpoint.ResourceVersion)
newEndpoint.SetUID(existingEndpoint.UID)
_, err = kubeClient.CoreV1().Endpoints(constants.DefaultNs).Update(context.TODO(), newEndpoint, metav1.UpdateOptions{})
if err != nil {
klog.Error("update api-server-external-service endpoint failed", err)
Expand Down Expand Up @@ -93,31 +94,17 @@ func CreateOrUpdateApiServerExternalService(kubeClient kubernetes.Interface) err
if err := yaml.Unmarshal([]byte(apiServerExternalServiceBytes), &svc); err != nil {
return fmt.Errorf("err when decoding api-server-external-service in virtual cluster: %w", err)
}

// Try to create the service
_, err = kubeClient.CoreV1().Services(constants.DefaultNs).Create(context.TODO(), &svc, metav1.CreateOptions{})
_, err = kubeClient.CoreV1().Services(constants.DefaultNs).Get(context.TODO(), constants.ApiServerExternalService, metav1.GetOptions{})
if err != nil {
if !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to create api-server-external-service service: %w", err)
}

// Service already exists, retrieve it
existingSvc, err := kubeClient.CoreV1().Services(constants.DefaultNs).Get(context.TODO(), constants.ApiServerExternalService, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get existing api-server-external-service service: %w", err)
if !apierrors.IsNotFound(err) {
// Try to create the service
_, err = kubeClient.CoreV1().Services(constants.DefaultNs).Create(context.TODO(), &svc, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("error when creating api-server-external-service: %w", err)
}
}

// Update the existing service
svc.ResourceVersion = existingSvc.ResourceVersion
_, err = kubeClient.CoreV1().Services(constants.DefaultNs).Update(context.TODO(), &svc, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update api-server-external-service service: %w", err)
}
klog.V(4).Info("successfully updated api-server-external-service service")
} else {
klog.V(4).Info("successfully created api-server-external-service service")
}

klog.V(4).Info("successfully created api-server-external-service service")
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kubenest/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func UninstallPhase(opts *InitOptions) *workflow.Phase {
destroyPhase.AppendTask(tasks.UninstallVirtualClusterServiceTask())
destroyPhase.AppendTask(tasks.UninstallCertsAndKubeconfigTask())
destroyPhase.AppendTask(tasks.DeleteEtcdPvcTask())
destroyPhase.AppendTask(tasks.UninstallVirtualClusterProxyTask())
//destroyPhase.AppendTask(tasks.UninstallVirtualClusterProxyTask())

destroyPhase.SetDataInitializer(func() (workflow.RunData, error) {
return newRunData(opts)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubenest/tasks/anp.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func installAnpAgent(data InitData) error {
actionFunc := func(ctx context.Context, c dynamic.Interface, u *unstructured.Unstructured) error {
// create the object
return apiclient.TryRunCommand(func() error {
return util.ApplyObject(vcClient, u)
return util.ReplaceObject(vcClient, u)
}, 3)
}
return util.ForEachObjectInYAML(context.TODO(), vcClient, []byte(anpAgentManifestBytes), "", actionFunc)
Expand Down
45 changes: 45 additions & 0 deletions pkg/kubenest/util/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,3 +422,48 @@ func ForEachObjectInYAML(
}
}
}

func ReplaceObject(dynamicClient dynamic.Interface, obj *unstructured.Unstructured) error {
gvk := obj.GroupVersionKind()
gvr, _ := meta.UnsafeGuessKindToResource(gvk)
namespace := obj.GetNamespace()
name := obj.GetName()

klog.V(2).Infof("Replace %s, name: %s, namespace: %s", gvr.String(), name, namespace)

resourceClient := dynamicClient.Resource(gvr).Namespace(namespace)

// Get the existing resource
_, err := resourceClient.Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
if kerrors.IsNotFound(err) {
// If not found, create the resource
_, err = resourceClient.Create(context.TODO(), obj, metav1.CreateOptions{})
if err != nil {
return err
}
klog.V(2).Infof("Created %s %s in namespace %s", gvr.String(), name, namespace)
return nil
}
return err
}

// If found, delete the existing resource
err = resourceClient.Delete(context.TODO(), name, metav1.DeleteOptions{})
if err != nil {
klog.V(2).Infof("Failed to delete existing %s %s: %v", gvr.String(), name, err)
return fmt.Errorf("failed to delete existing %s %s: %v", gvr.String(), name, err)
}

klog.V(2).Infof("Deleted existing %s %s in namespace %s", gvr.String(), name, namespace)

// Create the resource with the new object
_, err = resourceClient.Create(context.TODO(), obj, metav1.CreateOptions{})
if err != nil {
klog.V(2).Infof("Failed to create %s %s: %v", gvr.String(), name, err)
return fmt.Errorf("failed to create %s %s: %v", gvr.String(), name, err)
}

klog.V(2).Infof("Replaced %s %s in namespace %s", gvr.String(), name, namespace)
return nil
}
3 changes: 2 additions & 1 deletion pkg/kubenest/util/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func IsNodeReady(conditions []v1.NodeCondition) bool {
}

// DrainNode cordons and drains a node.
func DrainNode(ctx context.Context, nodeName string, client kubernetes.Interface, node *v1.Node, drainWaitSeconds int) error {
func DrainNode(ctx context.Context, nodeName string, client kubernetes.Interface, node *v1.Node, drainWaitSeconds int, isHostCluster bool) error {
if client == nil {
return fmt.Errorf("K8sClient not set")
}
Expand All @@ -40,6 +40,7 @@ func DrainNode(ctx context.Context, nodeName string, client kubernetes.Interface
IgnoreAllDaemonSets: true,
Out: os.Stdout,
ErrOut: os.Stdout,
DisableEviction: !isHostCluster,
// We want to proceed even when pods are using emptyDir volumes
DeleteEmptyDirData: true,
Timeout: time.Duration(drainWaitSeconds) * time.Second,
Expand Down
9 changes: 0 additions & 9 deletions pkg/kubenest/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,6 @@ func SecureRandomInt(n int) (int, error) {
return int(randInt.Int64()), nil
}

func MapContains(big map[string]string, small map[string]string) bool {
for k, v := range small {
if bigV, ok := big[k]; !ok || bigV != v {
return false
}
}
return true
}

func IsIPAvailable(ips, vipPool []string) (string, error) {
for _, ip := range ips {
if b, err := IsIPInRange(ip, vipPool); b && err == nil {
Expand Down

0 comments on commit f65a1c4

Please sign in to comment.