From 9721e44e8e2f00b767972cbd69f857eb3ce153c5 Mon Sep 17 00:00:00 2001 From: Paco Xu Date: Thu, 16 Jun 2022 16:47:43 +0800 Subject: [PATCH] cross version upgration support like: v1.20 to v1.24 (#73) * kubelet restart depends on kubelet-reloader, we need to run it after upgrade-kubeadm * add support for cross version upgrade like v1.21.0 to v1.24.0 * add master toleration for v1.22 and before * fix: no need to upgrade to current minor version * UT: test cross version check * fix cross task order by rename --<0x>- --- controllers/util.go | 5 ++ operations/upgrade.go | 151 ++++++++++++++++++++++++------------- operations/util.go | 8 ++ operations/version.go | 103 +++++++++++++++++++++++++ operations/version_test.go | 67 ++++++++++++++++ 5 files changed, 283 insertions(+), 51 deletions(-) create mode 100644 operations/version.go create mode 100644 operations/version_test.go diff --git a/controllers/util.go b/controllers/util.go index 4e6cfa6..7558017 100644 --- a/controllers/util.go +++ b/controllers/util.go @@ -122,6 +122,11 @@ func createDaemonSet(c client.Client, operation *operatorv1.Operation, namespace Key: "node-role.kubernetes.io/control-plane", Effect: corev1.TaintEffectNoSchedule, }, + // to work for v1.22- + { + Key: "node-role.kubernetes.io/master", + Effect: corev1.TaintEffectNoSchedule, + }, }, Containers: []corev1.Container{ { diff --git a/operations/upgrade.go b/operations/upgrade.go index f5ff12c..df45787 100644 --- a/operations/upgrade.go +++ b/operations/upgrade.go @@ -38,60 +38,110 @@ func setupUpgrade() map[string]string { func planUpgrade(operation *operatorv1.Operation, spec *operatorv1.UpgradeOperationSpec, c client.Client) *operatorv1.RuntimeTaskGroupList { log := ctrl.Log.WithName("operations").WithName("Upgrade").WithValues("task", operation.Name) - - // TODO support upgrade to v1.n-1~v1.n of current kubernetes server version. - // If the current kubernetes server version is v1.n-2 which is below the target version, we need to generate a further upgrade plan - var items []operatorv1.RuntimeTaskGroup - dryRun := operation.Spec.GetTypedOperationExecutionMode() == operatorv1.OperationExecutionModeDryRun - serverNeedUpgrade := true serverVersion, err := getServerVersion() if err != nil { + log.Error(err, "get server version failed") + return nil + } + isServerSupported, isServerCrossVersion, isServerCanSkip := upgradeCheck(serverVersion, spec.KubernetesVersion) + if !isServerSupported { + log.Info("Upgrade is not supported", "serverVersion", serverVersion, "kubernetesVersion", spec.KubernetesVersion) + // TODO current not supported operation will succeed immeditely. + return nil + } + log.Info("Upgrade is supported", "serverVersion", serverVersion, "kubernetesVersion", spec.KubernetesVersion) + nodes, err := listNodesBySelector(c, getAllSelector()) + if err != nil { + log.Error(err, "list node failed") + return nil + } + var isClientSupported, isClientCrossVersion, isClientCanSkip bool = true, false, true + var clientServerMatch bool = true + log.Info("nodes list", "nodes", len(nodes.Items)) + for _, n := range nodes.Items { + supported, cross, skip := upgradeCheck(n.Status.NodeInfo.KubeletVersion, spec.KubernetesVersion) + isClientSupported = isClientSupported && supported + isClientCrossVersion = isClientCrossVersion || cross + isClientCanSkip = isClientCanSkip && skip + clientServerMatch = clientServerMatch && n.Status.NodeInfo.KubeletVersion == serverVersion + if n.Status.NodeInfo.KubeletVersion != serverVersion { + log.Info("node is not match server version", "node", n.Name, "serverVersion", serverVersion, "kubeletVersion", n.Status.NodeInfo.KubeletVersion) + } + } + if !isClientSupported { + log.Info("Upgrade is not supported", "clientVersion", spec.KubernetesVersion) return nil } - // TODO is it the right way to check if the `kubeadm upgrade apply` is successful? - // check ClusterConfiguration.kubernetesVersion in kubeadm-config configmap? - if operation.Spec.Upgrade.KubernetesVersion == serverVersion { - //skip upgrade if the current kubernetes server version is the same as the target version - serverNeedUpgrade = false - } - - // nodeNeedUpgrade := true - // nodeVersion, err := getNodeVersion(c, getNodeName()) - // if err != nil { - // return nil - // } - // if operation.Spec.Upgrade.KubernetesVersion == nodeVersion { - // //skip upgrade node if the current kubernetes server version is the same as the target version - // nodeNeedUpgrade = false - // } - - if serverNeedUpgrade { - t1 := createUpgradeApplyTaskGroup(operation, "01", "upgrade-apply") + log.Info("show all client version check results", "isClientSupported", isClientSupported, "isClientCrossVersion", isClientCrossVersion, "isClientCanSkip", isClientCanSkip, "clientServerMatch", clientServerMatch) + log.Info("show all server version check results", "isServerSupported", isServerSupported, "isServerCrossVersion", isServerCrossVersion, "isServerCanSkip", isServerCanSkip) + if isClientCanSkip && isServerCanSkip { + // skip upgrade directly + return &operatorv1.RuntimeTaskGroupList{ + Items: items, + } + } else if isClientCrossVersion || isServerCrossVersion { + // support upgrade to v1.n-1~v1.n of current kubernetes server version. + // If the current kubernetes server version is v1.n-2 which is below the target version, we need to generate a further upgrade plan + log.Info("Upgrade is not supported, need cross version for client or server", "targetVersion", spec.KubernetesVersion, "serverVersion", serverVersion) + if !clientServerMatch { + // upgrade nodes to the target version + log.Info("[cross-upgrade] add items to make server client match", "serverVersion", serverVersion) + items = append(items, planNextUpgrade(operation, serverVersion, c, true)...) + } + crossVersions := getCrossVersions(serverVersion, spec.KubernetesVersion) + for _, v := range crossVersions { + log.Info("[cross-upgrade] add items to upgrade to a middle version", "version", v) + items = append(items, planNextUpgrade(operation, v, c, false)...) + } + log.Info("[cross-upgrade] add items to upgrade to the target version", "version", spec.KubernetesVersion) + items = append(items, planNextUpgrade(operation, operation.Spec.Upgrade.KubernetesVersion, c, false)...) + + } else { + log.Info("add items to upgrade to the target version", "version", spec.KubernetesVersion) + items = append(items, planNextUpgrade(operation, operation.Spec.Upgrade.KubernetesVersion, c, isServerCanSkip)...) + } + + return &operatorv1.RuntimeTaskGroupList{ + Items: items, + } +} + +// the version may not be operation.Spec.Upgrade.KubernetesVersion for cross upgrade +func planNextUpgrade(operation *operatorv1.Operation, version string, c client.Client, isServerCanSkip bool) []operatorv1.RuntimeTaskGroup { + log := ctrl.Log.WithName("operations").WithName("Upgrade").WithValues("task", operation.Name) + log.Info("add task for upgrading", "version", version, "isServerCanSkip", isServerCanSkip) + + var items []operatorv1.RuntimeTaskGroup + dryRun := operation.Spec.GetTypedOperationExecutionMode() == operatorv1.OperationExecutionModeDryRun + + if !isServerCanSkip { + t1 := createUpgradeApplyTaskGroup(operation, fmt.Sprintf("%s-01", version), "upgrade-apply") setCP1Selector(&t1) // run `upgrade apply`` on the first node of all control plane t1.Spec.NodeFilter = string(operatorv1.RuntimeTaskGroupNodeFilterHead) t1.Spec.Template.Spec.Commands = append(t1.Spec.Template.Spec.Commands, operatorv1.CommandDescriptor{ UpgradeKubeadm: &operatorv1.UpgradeKubeadmCommandSpec{ - KubernetesVersion: operation.Spec.Upgrade.KubernetesVersion, - Local: operation.Spec.Upgrade.Local, - }, - }, - operatorv1.CommandDescriptor{ - UpgradeKubeletAndKubeactl: &operatorv1.UpgradeKubeletAndKubeactlCommandSpec{ - KubernetesVersion: operation.Spec.Upgrade.KubernetesVersion, + KubernetesVersion: version, Local: operation.Spec.Upgrade.Local, }, }, operatorv1.CommandDescriptor{ KubeadmUpgradeApply: &operatorv1.KubeadmUpgradeApplyCommandSpec{ DryRun: dryRun, - KubernetesVersion: operation.Spec.Upgrade.KubernetesVersion, + KubernetesVersion: version, SkipKubeProxy: operation.Spec.Upgrade.UpgradeKubeProxyAtLast, }, }, + // as it depends on kubelet-reloader, we need to run it after upgrade-kubeadm apply + operatorv1.CommandDescriptor{ + UpgradeKubeletAndKubeactl: &operatorv1.UpgradeKubeletAndKubeactlCommandSpec{ + KubernetesVersion: version, + Local: operation.Spec.Upgrade.Local, + }, + }, ) log.Info("add upgrade-apply task group", "task", t1.Name) items = append(items, t1) @@ -99,31 +149,32 @@ func planUpgrade(operation *operatorv1.Operation, spec *operatorv1.UpgradeOperat // this can be skipped if there is only one control-plane node. // currently it depends on the selector - t2 := createBasicTaskGroup(operation, "02", "upgrade-cp") + t2 := createBasicTaskGroup(operation, fmt.Sprintf("%s-02", version), "upgrade-cp") setCPSelector(&t2) cpNodes, err := listNodesBySelector(c, &t2.Spec.NodeSelector) if err != nil { log.Info("failed to list nodes.", "error", err) - return nil + return items } if len(cpNodes.Items) > 1 { t2.Spec.Template.Spec.Commands = append(t2.Spec.Template.Spec.Commands, operatorv1.CommandDescriptor{ UpgradeKubeadm: &operatorv1.UpgradeKubeadmCommandSpec{ - KubernetesVersion: operation.Spec.Upgrade.KubernetesVersion, + KubernetesVersion: version, Local: operation.Spec.Upgrade.Local, }, }, operatorv1.CommandDescriptor{ - UpgradeKubeletAndKubeactl: &operatorv1.UpgradeKubeletAndKubeactlCommandSpec{ - KubernetesVersion: operation.Spec.Upgrade.KubernetesVersion, - Local: operation.Spec.Upgrade.Local, + KubeadmUpgradeNode: &operatorv1.KubeadmUpgradeNodeCommandSpec{ + DryRun: operatorv1.OperationExecutionMode(operation.Spec.ExecutionMode) == operatorv1.OperationExecutionModeDryRun, }, }, + // as it depends on kubelet-reloader, we need to run it after upgrade-kubeadm operatorv1.CommandDescriptor{ - KubeadmUpgradeNode: &operatorv1.KubeadmUpgradeNodeCommandSpec{ - DryRun: operatorv1.OperationExecutionMode(operation.Spec.ExecutionMode) == operatorv1.OperationExecutionModeDryRun, + UpgradeKubeletAndKubeactl: &operatorv1.UpgradeKubeletAndKubeactlCommandSpec{ + KubernetesVersion: version, + Local: operation.Spec.Upgrade.Local, }, }, ) @@ -132,11 +183,11 @@ func planUpgrade(operation *operatorv1.Operation, spec *operatorv1.UpgradeOperat } if operation.Spec.Upgrade.UpgradeKubeProxyAtLast { - t3 := createBasicTaskGroup(operation, "03", "upgrade-kube-proxy") + t3 := createBasicTaskGroup(operation, fmt.Sprintf("%s-03", version), "upgrade-kube-proxy") t3.Spec.Template.Spec.Commands = append(t3.Spec.Template.Spec.Commands, operatorv1.CommandDescriptor{ KubeadmUpgradeKubeProxy: &operatorv1.KubeadmUpgradeKubeProxySpec{ - KubernetesVersion: operation.Spec.Upgrade.KubernetesVersion, + KubernetesVersion: version, }, }, ) @@ -146,13 +197,14 @@ func planUpgrade(operation *operatorv1.Operation, spec *operatorv1.UpgradeOperat // this can be skipped if there are no worker nodes. // currently it depends on the selector - t4 := createBasicTaskGroup(operation, "04", "upgrade-w") + t4 := createBasicTaskGroup(operation, fmt.Sprintf("%s-04", version), "upgrade-worker") setWSelector(&t4) workerNodes, err := listNodesBySelector(c, &t4.Spec.NodeSelector) if err != nil { fmt.Printf("failed to list nodes: %v", err) - return nil + return items } + log.Info("workerNodes check", "workerNum", len(workerNodes.Items)) if len(workerNodes.Items) > 0 { t4.Spec.Template.Spec.Commands = append(t4.Spec.Template.Spec.Commands, operatorv1.CommandDescriptor{ @@ -160,13 +212,13 @@ func planUpgrade(operation *operatorv1.Operation, spec *operatorv1.UpgradeOperat }, operatorv1.CommandDescriptor{ UpgradeKubeadm: &operatorv1.UpgradeKubeadmCommandSpec{ - KubernetesVersion: operation.Spec.Upgrade.KubernetesVersion, + KubernetesVersion: version, Local: operation.Spec.Upgrade.Local, }, }, operatorv1.CommandDescriptor{ UpgradeKubeletAndKubeactl: &operatorv1.UpgradeKubeletAndKubeactlCommandSpec{ - KubernetesVersion: operation.Spec.Upgrade.KubernetesVersion, + KubernetesVersion: version, Local: operation.Spec.Upgrade.Local, }, }, @@ -182,10 +234,7 @@ func planUpgrade(operation *operatorv1.Operation, spec *operatorv1.UpgradeOperat log.Info("add upgrade-w task group", "task", t4.Name) items = append(items, t4) } - - return &operatorv1.RuntimeTaskGroupList{ - Items: items, - } + return items } // check the current kubernetes server version diff --git a/operations/util.go b/operations/util.go index 7c6c85f..77a8d54 100644 --- a/operations/util.go +++ b/operations/util.go @@ -143,6 +143,14 @@ func setWSelector(t *operatorv1.RuntimeTaskGroup) { } } +func getAllSelector() *metav1.LabelSelector { + return &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "kubernetes.io/os": "linux", + }, + } +} + func fixupCustomTaskGroup(operation *operatorv1.Operation, taskgroup operatorv1.RuntimeTaskGroup, taskdeploymentOrder string) operatorv1.RuntimeTaskGroup { gv := operatorv1.GroupVersion diff --git a/operations/version.go b/operations/version.go new file mode 100644 index 0000000..b78d731 --- /dev/null +++ b/operations/version.go @@ -0,0 +1,103 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operations + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/util/version" +) + +func upgradeCheck(current, target string) (isSupported, isCrossVersion, canSkip bool) { + currentVer, err := version.ParseSemantic(current) + if err != nil { + return false, false, false + } + targetVer, err := version.ParseSemantic(target) + if err != nil { + return false, false, false + } + return upgradeCheckVersion(currentVer, targetVer) +} + +func upgradeCheckVersion(current, target *version.Version) (isSupported, isCrossVersion, canSkip bool) { + // no need to check major as only major 1 is supported + // if current.Major() != target.Major() { + // return false + // } + if current.Minor() == target.Minor() { + if current.Patch() == target.Patch() { + // just skip as the version is the same + return true, false, true + } + // upgrade to a patched version + return true, false, false + } else if current.Minor() < target.Minor() { + if current.Minor()+1 == target.Minor() { + // upgrade to a minor version + return true, false, false + } + // upgrade multi-minor version, need to split into multiple upgrade tasks + return true, true, false + } + // downgrade is not supported + if current.Minor()-1 == target.Minor() { + // TODO downgrade to a minor version + // this is just for test purpose, need to define if it should be supported in the future + return true, false, false + } + return false, false, false + +} + +func isSupported(ver string) bool { + v, err := version.ParseSemantic(ver) + if err != nil { + return false + } + return isSupportedVersion(v) +} + +func isSupportedVersion(ver *version.Version) bool { + // TODO a table of supported versions needs to be created in the docs + if ver.Major() != 1 && ver.Minor() < 17 { + return false + } + return true +} + +// before this, we should make sure the version is supported +func getCrossVersions(current, target string) []string { + versions := []string{} + cur, err := version.ParseSemantic(current) + if err != nil { + return versions + } + tar, err := version.ParseSemantic(target) + if err != nil { + return versions + } + _, isCross, _ := upgradeCheckVersion(cur, tar) + if !isCross { + return versions + } + tarMinor := tar.Minor() + for i := cur.Minor() + 1; i < tarMinor; i++ { + versions = append(versions, fmt.Sprintf("v1.%d.0", i)) + } + return versions +} diff --git a/operations/version_test.go b/operations/version_test.go new file mode 100644 index 0000000..3d11a78 --- /dev/null +++ b/operations/version_test.go @@ -0,0 +1,67 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operations + +import ( + "fmt" + "reflect" + "testing" +) + +func TestGetCrossVersions(t *testing.T) { + var tests = []struct { + current, target string + expected []string + }{ + { + current: "v1.0.0", + target: "v1.0.0", + expected: []string{}, + }, + { + current: "v1.0.0", + target: "v1.1.0", + expected: []string{}, + }, + { + current: "v1.0.0", + target: "v1.2.1", + expected: []string{ + "v1.1.0", + }, + }, + { + current: "v1.0.0", + target: "v1.5.2", + expected: []string{ + "v1.1.0", + "v1.2.0", + "v1.3.0", + "v1.4.0", + }, + }, + } + for _, rt := range tests { + t.Run(fmt.Sprintf("%s-%s", rt.current, rt.target), func(t *testing.T) { + actual := getCrossVersions(rt.current, rt.target) + if !reflect.DeepEqual(actual, rt.expected) { + t.Errorf("getCrossVersions(%s, %s) == %v, want %v", rt.current, rt.target, actual, rt.expected) + } + }) + } + +}