From 8296fe5656e38948779fc36cd847f46310c37412 Mon Sep 17 00:00:00 2001 From: Viktor Kram <92625690+ViktorKram@users.noreply.github.com> Date: Thu, 18 Jul 2024 16:45:50 +0300 Subject: [PATCH] [controller] Add an auto extension feature for LVMVolumeGroup thin-pools and LVMLogicalVolumes (#74) Signed-off-by: Viktor Kramarenko --- crds/doc-ru-lvmlogicalvolume.yaml | 4 +- crds/doc-ru-lvmvolumegroup.yaml | 6 +- crds/lvmlogicalvolume.yaml | 4 +- crds/lvmvolumegroup.yaml | 6 +- images/agent/cmd/main.go | 5 + images/agent/go.mod | 14 +- images/agent/go.sum | 11 + images/agent/internal/const.go | 2 + images/agent/pkg/cache/cache.go | 20 ++ .../lvm_logical_volume_bench_test.go | 4 +- .../lvm_logical_volume_extender_watcher.go | 255 ++++++++++++++++++ .../controller/lvm_logical_volume_watcher.go | 77 ++---- .../lvm_logical_volume_watcher_func.go | 50 ++-- .../lvm_logical_volume_watcher_test.go | 34 +-- .../controller/lvm_volume_group_discover.go | 38 +-- .../controller/lvm_volume_group_watcher.go | 55 ++-- .../lvm_volume_group_watcher_func.go | 69 +++-- images/agent/pkg/scanner/scanner.go | 9 +- images/agent/pkg/utils/commands.go | 4 +- 19 files changed, 454 insertions(+), 213 deletions(-) create mode 100644 images/agent/pkg/controller/lvm_logical_volume_extender_watcher.go diff --git a/crds/doc-ru-lvmlogicalvolume.yaml b/crds/doc-ru-lvmlogicalvolume.yaml index fb5de893..cb0dacfc 100644 --- a/crds/doc-ru-lvmlogicalvolume.yaml +++ b/crds/doc-ru-lvmlogicalvolume.yaml @@ -18,7 +18,9 @@ spec: Тип LV: Thick или Thin. size: description: | - Желаемый размер LV. + Желаемый размер LV. Может быть указан как в численном, так и процентном отношении к общему размеру VG или thin pool (для thin pool общий размер - это реальный размер пула, умноженный на allocationLimit). + + > Обратите внимание, что при указании размера в процентах LV будет автоматически расширена при расширении VG, расширении thin pool или увеличения поля AllocationLimit для thin pool. lvmVolumeGroupName: description: | Имя LVMVolumeGroup ресурса, VG которого будет использовано для создания LV. diff --git a/crds/doc-ru-lvmvolumegroup.yaml b/crds/doc-ru-lvmvolumegroup.yaml index 623b0607..f967c8c1 100644 --- a/crds/doc-ru-lvmvolumegroup.yaml +++ b/crds/doc-ru-lvmvolumegroup.yaml @@ -32,12 +32,14 @@ spec: properties: name: description: | - Желаемое имя Thin-pool. + Желаемое имя thin pool. > Неизменяемое поле. size: description: | - Желаемый размер Thin-pool. + Желаемый размер thin pool. Может быть указан как в численном, так и процентном отношении к общему размеру VG. + + > Обратите внимание, что при указании размера в процентах thin pool будет автоматически расширен при расширении VG. status: properties: phase: diff --git a/crds/lvmlogicalvolume.yaml b/crds/lvmlogicalvolume.yaml index f410f831..d21d619f 100644 --- a/crds/lvmlogicalvolume.yaml +++ b/crds/lvmlogicalvolume.yaml @@ -66,7 +66,9 @@ spec: size: x-kubernetes-int-or-string: true description: | - Desired LV size. + The desired LV size. Might be specified as number or percent size of total VG or thin pool space (for thin pool total space is counted by actual thin pool size multiply by allocationLimit value) + + > Note, that if you specify the percent size, the LV will be automatically extended when VG, thin pool or thin pool's allocationLimit value is extended. minLength: 1 pattern: '^[0-9]+(\.[0-9]+)?(E|P|T|G|M|k|Ei|Pi|Ti|Gi|Mi|Ki)?$|^[1-9][0-9]?%$|100%' lvmVolumeGroupName: diff --git a/crds/lvmvolumegroup.yaml b/crds/lvmvolumegroup.yaml index 7821cdd2..b8b60aa6 100644 --- a/crds/lvmvolumegroup.yaml +++ b/crds/lvmvolumegroup.yaml @@ -69,14 +69,16 @@ spec: name: type: string description: | - The desired Thin-pool name. + The desired thin pool name. > This field is immutable. size: x-kubernetes-int-or-string: true pattern: '^[0-9]+(\.[0-9]+)?(E|P|T|G|M|k|Ei|Pi|Ti|Gi|Mi|Ki)?$|^[1-9][0-9]?%$|100%' description: | - The desired Thin-pool size. + The desired thin pool size. Might be specified as number or percent size of total VG space. + + > Note, that if you specify the percent size, the thin pool will be automatically extended when VG is extended. allocationLimit: type: string pattern: '^[1-9][0-9]{2,3}%$' diff --git a/images/agent/cmd/main.go b/images/agent/cmd/main.go index 71dde698..7a028a0f 100644 --- a/images/agent/cmd/main.go +++ b/images/agent/cmd/main.go @@ -146,6 +146,11 @@ func main() { os.Exit(1) } + if err = controller.RunLVMLogicalVolumeExtenderWatcherController(mgr, *cfgParams, *log, metrics, sdsCache); err != nil { + log.Error(err, "[main] unable to controller.RunLVMLogicalVolumeExtenderWatcherController") + os.Exit(1) + } + if err = mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { log.Error(err, "[main] unable to mgr.AddHealthzCheck") os.Exit(1) diff --git a/images/agent/go.mod b/images/agent/go.mod index 1b5693db..6e5a844f 100644 --- a/images/agent/go.mod +++ b/images/agent/go.mod @@ -6,18 +6,18 @@ require ( github.com/deckhouse/sds-node-configurator/api v0.0.0-20240709091744-c9d24f05db41 github.com/go-logr/logr v1.4.1 github.com/google/go-cmp v0.6.0 - github.com/onsi/ginkgo/v2 v2.15.0 - github.com/onsi/gomega v1.31.0 + github.com/onsi/ginkgo/v2 v2.17.1 + github.com/onsi/gomega v1.32.0 github.com/pilebones/go-udev v0.9.0 github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.8.4 k8s.io/api v0.30.2 - k8s.io/apiextensions-apiserver v0.29.4 + k8s.io/apiextensions-apiserver v0.30.1 k8s.io/apimachinery v0.30.2 - k8s.io/client-go v0.29.4 + k8s.io/client-go v0.30.1 k8s.io/klog/v2 v2.120.1 k8s.io/utils v0.0.0-20231127182322-b307cd553661 - sigs.k8s.io/controller-runtime v0.17.3 + sigs.k8s.io/controller-runtime v0.18.4 ) require ( @@ -26,7 +26,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/evanphx/json-patch v5.6.0+incompatible // indirect - github.com/evanphx/json-patch/v5 v5.8.0 // indirect + github.com/evanphx/json-patch/v5 v5.9.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-openapi/jsonpointer v0.20.0 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect @@ -67,7 +67,7 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/component-base v0.29.4 // indirect + k8s.io/component-base v0.30.1 // indirect k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect diff --git a/images/agent/go.sum b/images/agent/go.sum index a9b5be95..daa2acb7 100644 --- a/images/agent/go.sum +++ b/images/agent/go.sum @@ -19,6 +19,8 @@ github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCv github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch/v5 v5.8.0 h1:lRj6N9Nci7MvzrXuX6HFzU8XjmhPiXPlsKEy1u0KQro= github.com/evanphx/json-patch/v5 v5.8.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= +github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0/FOJfg= +github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= @@ -85,8 +87,10 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/onsi/ginkgo/v2 v2.15.0 h1:79HwNRBAZHOEwrczrgSOPy+eFTTlIGELKy5as+ClttY= github.com/onsi/ginkgo/v2 v2.15.0/go.mod h1:HlxMHtYF57y6Dpf+mc5529KKmSq9h2FpCF+/ZkwUxKM= +github.com/onsi/ginkgo/v2 v2.17.1/go.mod h1:llBI3WDLL9Z6taip6f33H76YcWtJv+7R3HigUjbIBOs= github.com/onsi/gomega v1.31.0 h1:54UJxxj6cPInHS3a35wm6BK/F9nHYueZ1NVujHDrnXE= github.com/onsi/gomega v1.31.0/go.mod h1:DW9aCi7U6Yi40wNVAvT6kzFnEVEI5n3DloYBiKiT6zk= +github.com/onsi/gomega v1.32.0/go.mod h1:a4x4gW6Pz2yK1MAmvluYme5lvYTn61afQ2ETw/8n4Lg= github.com/pilebones/go-udev v0.9.0 h1:N1uEO/SxUwtIctc0WLU0t69JeBxIYEYnj8lT/Nabl9Q= github.com/pilebones/go-udev v0.9.0/go.mod h1:T2eI2tUSK0hA2WS5QLjXJUfQkluZQu+18Cqvem3CaXI= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -203,12 +207,17 @@ k8s.io/api v0.30.2 h1:+ZhRj+28QT4UOH+BKznu4CBgPWgkXO7XAvMcMl0qKvI= k8s.io/api v0.30.2/go.mod h1:ULg5g9JvOev2dG0u2hig4Z7tQ2hHIuS+m8MNZ+X6EmI= k8s.io/apiextensions-apiserver v0.29.4 h1:M7hbuHU/ckbibR7yPbe6DyNWgTFKNmZDbdZKD8q1Smk= k8s.io/apiextensions-apiserver v0.29.4/go.mod h1:TTDC9fB+0kHY2rogf5hgBR03KBKCwED+GHUsXGpR7SM= +k8s.io/apiextensions-apiserver v0.30.1 h1:4fAJZ9985BmpJG6PkoxVRpXv9vmPUOVzl614xarePws= +k8s.io/apiextensions-apiserver v0.30.1/go.mod h1:R4GuSrlhgq43oRY9sF2IToFh7PVlF1JjfWdoG3pixk4= k8s.io/apimachinery v0.30.2 h1:fEMcnBj6qkzzPGSVsAZtQThU62SmQ4ZymlXRC5yFSCg= k8s.io/apimachinery v0.30.2/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc= k8s.io/client-go v0.29.4 h1:79ytIedxVfyXV8rpH3jCBW0u+un0fxHDwX5F9K8dPR8= k8s.io/client-go v0.29.4/go.mod h1:kC1thZQ4zQWYwldsfI088BbK6RkxK+aF5ebV8y9Q4tk= +k8s.io/client-go v0.30.1 h1:uC/Ir6A3R46wdkgCV3vbLyNOYyCJ8oZnjtJGKfytl/Q= +k8s.io/client-go v0.30.1/go.mod h1:wrAqLNs2trwiCH/wxxmT/x3hKVH9PuV0GGW0oDoHVqc= k8s.io/component-base v0.29.4 h1:xeKzuuHI/1tjleu5jycDAcYbhAxeGHCQBZUY2eRIkOo= k8s.io/component-base v0.29.4/go.mod h1:pYjt+oEZP9gtmwSikwAJgfSBikqKX2gOqRat0QjmQt0= +k8s.io/component-base v0.30.1/go.mod h1:e/X9kDiOebwlI41AvBHuWdqFriSRrX50CdwA9TFaHLI= k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw= k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag= @@ -217,6 +226,8 @@ k8s.io/utils v0.0.0-20231127182322-b307cd553661 h1:FepOBzJ0GXm8t0su67ln2wAZjbQ6R k8s.io/utils v0.0.0-20231127182322-b307cd553661/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= sigs.k8s.io/controller-runtime v0.17.3 h1:65QmN7r3FWgTxDMz9fvGnO1kbf2nu+acg9p2R9oYYYk= sigs.k8s.io/controller-runtime v0.17.3/go.mod h1:N0jpP5Lo7lMTF9aL56Z/B2oWBJjey6StQM0jRbKQXtY= +sigs.k8s.io/controller-runtime v0.18.4 h1:87+guW1zhvuPLh1PHybKdYFLU0YJp4FhJRmiHvm5BZw= +sigs.k8s.io/controller-runtime v0.18.4/go.mod h1:TVoGrfdpbA9VRFaRnKgk9P5/atA0pMwq+f+msb9M8Sg= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= diff --git a/images/agent/internal/const.go b/images/agent/internal/const.go index f010a484..5727bc5c 100644 --- a/images/agent/internal/const.go +++ b/images/agent/internal/const.go @@ -41,6 +41,8 @@ const ( AllocationLimitDefaultValue = "150%" + PhaseReady = "Ready" + ReasonValidationFailed = "ValidationFailed" ReasonCreating = "Creating" ReasonUpdating = "Updating" diff --git a/images/agent/pkg/cache/cache.go b/images/agent/pkg/cache/cache.go index 8c042e79..78d3ec30 100644 --- a/images/agent/pkg/cache/cache.go +++ b/images/agent/pkg/cache/cache.go @@ -70,6 +70,26 @@ func (c *Cache) GetLVs() ([]internal.LVData, bytes.Buffer) { return dst, c.lvsErrs } +func (c *Cache) FindLV(vgName, lvName string) *internal.LVData { + for _, lv := range c.lvs { + if lv.VGName == vgName && lv.LVName == lvName { + return &lv + } + } + + return nil +} + +func (c *Cache) FindVG(vgName string) *internal.VGData { + for _, vg := range c.vgs { + if vg.VGName == vgName { + return &vg + } + } + + return nil +} + func (c *Cache) PrintTheCache(log logger.Logger) { log.Cache("*****************CACHE BEGIN*****************") log.Cache("[Devices BEGIN]") diff --git a/images/agent/pkg/controller/lvm_logical_volume_bench_test.go b/images/agent/pkg/controller/lvm_logical_volume_bench_test.go index 819c1dc6..2aff0247 100644 --- a/images/agent/pkg/controller/lvm_logical_volume_bench_test.go +++ b/images/agent/pkg/controller/lvm_logical_volume_bench_test.go @@ -73,7 +73,7 @@ func BenchmarkRunThickLLVCreationSingleThread(b *testing.B) { } if llv.Status != nil && - llv.Status.Phase == StatusPhaseCreated && + llv.Status.Phase == LLVStatusPhaseCreated && !created { succeeded++ llvNames[llvName] = true @@ -138,7 +138,7 @@ func BenchmarkRunThinLLVCreationSingleThread(b *testing.B) { } if llv.Status != nil && - llv.Status.Phase == StatusPhaseCreated && + llv.Status.Phase == LLVStatusPhaseCreated && !visited { succeeded++ llvNames[llvName] = true diff --git a/images/agent/pkg/controller/lvm_logical_volume_extender_watcher.go b/images/agent/pkg/controller/lvm_logical_volume_extender_watcher.go new file mode 100644 index 00000000..bf5d2ec5 --- /dev/null +++ b/images/agent/pkg/controller/lvm_logical_volume_extender_watcher.go @@ -0,0 +1,255 @@ +package controller + +import ( + "agent/config" + "agent/internal" + "agent/pkg/cache" + "agent/pkg/logger" + "agent/pkg/monitoring" + "agent/pkg/utils" + "context" + "errors" + "fmt" + "github.com/deckhouse/sds-node-configurator/api/v1alpha1" + k8serr "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + "reflect" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + "time" +) + +const ( + LVMLogicalVolumeExtenderCtrlName = "lvm-logical-volume-extender-controller" +) + +func RunLVMLogicalVolumeExtenderWatcherController( + mgr manager.Manager, + cfg config.Options, + log logger.Logger, + metrics monitoring.Metrics, + sdsCache *cache.Cache, +) error { + cl := mgr.GetClient() + mgrCache := mgr.GetCache() + + c, err := controller.New(LVMLogicalVolumeExtenderCtrlName, mgr, controller.Options{ + Reconciler: reconcile.Func(func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { + log.Info(fmt.Sprintf("[RunLVMLogicalVolumeExtenderWatcherController] starts the reconciliation for the LVMVolumeGroup %s", request.NamespacedName.String())) + + log.Debug(fmt.Sprintf("[RunLVMLogicalVolumeExtenderWatcherController] tries to get the LVMVolumeGroup %s", request.Name)) + lvg, err := getLVMVolumeGroup(ctx, cl, metrics, request.Name) + if err != nil { + if k8serr.IsNotFound(err) { + log.Error(err, fmt.Sprintf("[RunLVMLogicalVolumeExtenderWatcherController] LVMVolumeGroup %s not found (probably was deleted). Stop the reconcile", request.Name)) + return reconcile.Result{}, nil + } + + log.Error(err, fmt.Sprintf("[RunLVMLogicalVolumeExtenderWatcherController] unable to get the LVMVolumeGroup %s", request.Name)) + return reconcile.Result{}, err + } + log.Debug(fmt.Sprintf("[RunLVMLogicalVolumeExtenderWatcherController] successfully got the LVMVolumeGroup %s", request.Name)) + + if !shouldLLVExtenderReconcileEvent(log, lvg, cfg.NodeName) { + log.Info(fmt.Sprintf("[RunLVMLogicalVolumeExtenderWatcherController] no need to reconcile a request for the LVMVolumeGroup %s", lvg.Name)) + return reconcile.Result{}, nil + } + + shouldRequeue := ReconcileLVMLogicalVolumeExtension(ctx, cl, metrics, log, sdsCache, lvg) + if shouldRequeue { + log.Warning(fmt.Sprintf("[RunLVMLogicalVolumeExtenderWatcherController] Reconciler needs a retry for the LVMVolumeGroup %s. Retry in %s", lvg.Name, cfg.VolumeGroupScanIntervalSec.String())) + return reconcile.Result{ + RequeueAfter: cfg.VolumeGroupScanIntervalSec, + }, nil + } + + log.Info(fmt.Sprintf("[RunLVMLogicalVolumeExtenderWatcherController] successfully reconciled LVMLogicalVolumes for the LVMVolumeGroup %s", lvg.Name)) + return reconcile.Result{}, nil + }), + }) + if err != nil { + log.Error(err, "[RunLVMLogicalVolumeExtenderWatcherController] unable to create a controller") + return err + } + + err = c.Watch(source.Kind(mgrCache, &v1alpha1.LvmVolumeGroup{}, handler.TypedFuncs[*v1alpha1.LvmVolumeGroup]{ + CreateFunc: func(ctx context.Context, e event.TypedCreateEvent[*v1alpha1.LvmVolumeGroup], q workqueue.RateLimitingInterface) { + log.Info(fmt.Sprintf("[RunLVMLogicalVolumeExtenderWatcherController] got a Create event for the LVMVolumeGroup %s", e.Object.GetName())) + request := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: e.Object.GetNamespace(), Name: e.Object.GetName()}} + q.Add(request) + log.Info(fmt.Sprintf("[RunLVMLogicalVolumeExtenderWatcherController] added the LVMVolumeGroup %s to the Reconcilers queue", e.Object.GetName())) + }, + UpdateFunc: func(ctx context.Context, e event.TypedUpdateEvent[*v1alpha1.LvmVolumeGroup], q workqueue.RateLimitingInterface) { + log.Info(fmt.Sprintf("[RunLVMLogicalVolumeExtenderWatcherController] got an Update event for the LVMVolumeGroup %s", e.ObjectNew.GetName())) + request := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: e.ObjectNew.GetNamespace(), Name: e.ObjectNew.GetName()}} + q.Add(request) + log.Info(fmt.Sprintf("[RunLVMLogicalVolumeExtenderWatcherController] added the LVMVolumeGroup %s to the Reconcilers queue", e.ObjectNew.GetName())) + }, + })) + if err != nil { + log.Error(err, "[RunLVMLogicalVolumeExtenderWatcherController] unable to watch the events") + return err + } + + return nil +} + +func shouldLLVExtenderReconcileEvent(log logger.Logger, newLVG *v1alpha1.LvmVolumeGroup, nodeName string) bool { + // for new LVMVolumeGroups + if reflect.DeepEqual(newLVG.Status, v1alpha1.LvmVolumeGroupStatus{}) { + log.Debug(fmt.Sprintf("[RunLVMLogicalVolumeExtenderWatcherController] the LVMVolumeGroup %s should not be reconciled as its Status is not initialized yet", newLVG.Name)) + return false + } + + if !belongsToNode(newLVG, nodeName) { + log.Debug(fmt.Sprintf("[RunLVMLogicalVolumeExtenderWatcherController] the LVMVolumeGroup %s should not be reconciled as it does not belong to the node %s", newLVG.Name, nodeName)) + return false + } + + if newLVG.Status.Phase != internal.PhaseReady { + log.Debug(fmt.Sprintf("[RunLVMLogicalVolumeExtenderWatcherController] the LVMVolumeGroup %s should not be reconciled as its Status.Phase is not Ready", newLVG.Name)) + return false + } + + return true +} + +func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, metrics monitoring.Metrics, log logger.Logger, sdsCache *cache.Cache, lvg *v1alpha1.LvmVolumeGroup) bool { + log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] tries to get LLV resources with percent size for the LVMVolumeGroup %s", lvg.Name)) + llvs, err := getAllLLVsWithPercentSize(ctx, cl, lvg.Name) + if err != nil { + log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to get LLV resources")) + return true + } + log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] successfully got LLV resources for the LVMVolumeGroup %s", lvg.Name)) + + if len(llvs) == 0 { + log.Info(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] no LVMLogicalVolumes with percent size were found for the LVMVolumeGroup %s", lvg.Name)) + return false + } + + shouldRetry := false + for _, llv := range llvs { + log.Info(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] starts to reconcile the LVMLogicalVolume %s", llv.Name)) + llvRequestedSize, err := getLLVRequestedSize(&llv, lvg) + if err != nil { + log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to get requested size of the LVMLogicalVolume %s", llv.Name)) + shouldRetry = true + continue + } + log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] successfully got the requested size of the LVMLogicalVolume %s, size: %s", llv.Name, llvRequestedSize.String())) + + lv := sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode) + if lv == nil { + err = fmt.Errorf("LV %s not found", llv.Spec.ActualLVNameOnTheNode) + log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to find LV %s of the LVMLogicalVolume %s", llv.Spec.ActualLVNameOnTheNode, llv.Name)) + err = updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, &llv, LLVStatusPhaseFailed, err.Error()) + if err != nil { + log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to update the LVMLogicalVolume %s", llv.Name)) + } + shouldRetry = true + continue + } + + if utils.AreSizesEqualWithinDelta(llvRequestedSize, lv.LVSize, internal.ResizeDelta) { + log.Info(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s should not be extended", llv.Name)) + continue + } + + if llvRequestedSize.Value() < lv.LVSize.Value() { + log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s requested size %s is less than actual one on the node %s", llv.Name, llvRequestedSize.String(), lv.LVSize.String())) + continue + } + + freeSpace := getFreeLVGSpaceForLLV(lvg, &llv) + if llvRequestedSize.Value()+internal.ResizeDelta.Value() > freeSpace.Value() { + err = errors.New("not enough space") + log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to extend the LV %s of the LVMLogicalVolume %s", llv.Spec.ActualLVNameOnTheNode, llv.Name)) + err = updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, &llv, LLVStatusPhaseFailed, fmt.Sprintf("unable to extend LV, err: %s", err.Error())) + if err != nil { + log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to update the LVMLogicalVolume %s", llv.Name)) + shouldRetry = true + } + continue + } + + log.Info(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s should be extended from %s to %s size", llv.Name, llv.Status.ActualSize.String(), llvRequestedSize.String())) + err = updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, &llv, LLVStatusPhaseResizing, "") + cmd, err := utils.ExtendLV(llvRequestedSize.Value(), lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode) + if err != nil { + log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to extend LV %s of the LVMLogicalVolume %s, cmd: %s", llv.Spec.ActualLVNameOnTheNode, llv.Name, cmd)) + err = updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, &llv, LLVStatusPhaseFailed, fmt.Sprintf("unable to extend LV, err: %s", err.Error())) + if err != nil { + log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to update the LVMLogicalVolume %s", llv.Name)) + } + shouldRetry = true + continue + } + log.Info(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s has been successfully extended", llv.Name)) + + var ( + maxAttempts = 5 + currentAttempts = 0 + ) + for currentAttempts < maxAttempts { + lv = sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode) + if utils.AreSizesEqualWithinDelta(lv.LVSize, llvRequestedSize, internal.ResizeDelta) { + log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s of the LVMLogicalVolume %s was successfully updated in the cache", lv.LVName, llv.Name)) + break + } + + log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s size of the LVMLogicalVolume %s was not yet updated in the cache, retry...", lv.LVName, llv.Name)) + currentAttempts++ + time.Sleep(1 * time.Second) + } + + if currentAttempts == maxAttempts { + err = fmt.Errorf("LV %s is not updated in the cache", lv.LVName) + log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to resize the LVMLogicalVolume %s", llv.Name)) + shouldRetry = true + + if err = updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, &llv, LLVStatusPhaseFailed, err.Error()); err != nil { + log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to update the LVMLogicalVolume %s", llv.Name)) + } + continue + } + + updated, err := updateLLVPhaseToCreatedIfNeeded(ctx, cl, &llv, lv.LVSize) + if err != nil { + log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to update the LVMLogicalVolume %s", llv.Name)) + shouldRetry = true + continue + } + + if updated { + log.Info(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s was successfully updated", llv.Name)) + } else { + log.Info(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] no need to update the LVMLogicalVolume %s", llv.Name)) + } + } + + return shouldRetry +} + +func getAllLLVsWithPercentSize(ctx context.Context, cl client.Client, lvgName string) ([]v1alpha1.LVMLogicalVolume, error) { + llvList := &v1alpha1.LVMLogicalVolumeList{} + err := cl.List(ctx, llvList) + if err != nil { + return nil, err + } + + result := make([]v1alpha1.LVMLogicalVolume, 0, len(llvList.Items)) + for _, llv := range llvList.Items { + if llv.Spec.LvmVolumeGroupName == lvgName && isPercentSize(llv.Spec.Size) { + result = append(result, llv) + } + } + + return result, nil +} diff --git a/images/agent/pkg/controller/lvm_logical_volume_watcher.go b/images/agent/pkg/controller/lvm_logical_volume_watcher.go index 288cd390..dcb55e1d 100644 --- a/images/agent/pkg/controller/lvm_logical_volume_watcher.go +++ b/images/agent/pkg/controller/lvm_logical_volume_watcher.go @@ -35,10 +35,10 @@ const ( lvmLogicalVolumeWatcherCtrlName = "lvm-logical-volume-watcher-controller" - StatusPhaseCreated = "Created" - StatusPhasePending = "Pending" - StatusPhaseResizing = "Resizing" - StatusPhaseFailed = "Failed" + LLVStatusPhaseCreated = "Created" + LLVStatusPhasePending = "Pending" + LLVStatusPhaseResizing = "Resizing" + LLVStatusPhaseFailed = "Failed" ) type ( @@ -53,18 +53,11 @@ func RunLVMLogicalVolumeWatcherController( sdsCache *cache.Cache, ) (controller.Controller, error) { cl := mgr.GetClient() - kubeCache := mgr.GetCache() c, err := controller.New(lvmLogicalVolumeWatcherCtrlName, mgr, controller.Options{ Reconciler: reconcile.Func(func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { log.Info(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] Reconciler starts reconciliation of the LVMLogicalVolume: %s", request.Name)) - // this case prevents the unexpected behavior when the controller runs up with existing LVMLogicalVolumes - if vgs, _ := sdsCache.GetVGs(); len(vgs) == 0 { - log.Warning(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] unable to reconcile the request as no VG was found in the cache. Retry in %s", cfg.VolumeGroupScanIntervalSec.String())) - return reconcile.Result{RequeueAfter: cfg.VolumeGroupScanIntervalSec}, nil - } - log.Debug(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] tries to get the LVMLogicalVolume %s", request.Name)) llv := &v1alpha1.LVMLogicalVolume{} err := cl.Get(ctx, request.NamespacedName, llv) @@ -80,7 +73,7 @@ func RunLVMLogicalVolumeWatcherController( if err != nil { if k8serr.IsNotFound(err) { log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolume] LVMVolumeGroup %s not found for LVMLogicalVolume %s. Retry in %s", llv.Spec.LvmVolumeGroupName, llv.Name, cfg.VolumeGroupScanIntervalSec.String())) - err = updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, llv, StatusPhaseFailed, fmt.Sprintf("LVMVolumeGroup %s not found", llv.Spec.LvmVolumeGroupName)) + err = updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, llv, LLVStatusPhaseFailed, fmt.Sprintf("LVMVolumeGroup %s not found", llv.Spec.LvmVolumeGroupName)) if err != nil { log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolume] unable to update the LVMLogicalVolume %s", llv.Name)) return reconcile.Result{}, err @@ -91,7 +84,7 @@ func RunLVMLogicalVolumeWatcherController( }, nil } - err = updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, llv, StatusPhaseFailed, fmt.Sprintf("Unable to get selected LVMVolumeGroup, err: %s", err.Error())) + err = updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, llv, LLVStatusPhaseFailed, fmt.Sprintf("Unable to get selected LVMVolumeGroup, err: %s", err.Error())) if err != nil { log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolume] unable to update the LVMLogicalVolume %s", llv.Name)) } @@ -104,6 +97,12 @@ func RunLVMLogicalVolumeWatcherController( } log.Info(fmt.Sprintf("[ReconcileLVMLogicalVolume] the LVMVolumeGroup %s of the LVMLogicalVolume %s belongs to the current node: %s. Reconciliation continues", lvg.Name, llv.Name, cfg.NodeName)) + // this case prevents the unexpected behavior when the controller runs up with existing LVMLogicalVolumes + if vgs, _ := sdsCache.GetVGs(); len(vgs) == 0 { + log.Warning(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] unable to reconcile the request as no VG was found in the cache. Retry in %s", cfg.VolumeGroupScanIntervalSec.String())) + return reconcile.Result{RequeueAfter: cfg.VolumeGroupScanIntervalSec}, nil + } + log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolume] tries to add the finalizer %s to the LVMLogicalVolume %s", internal.SdsNodeConfiguratorFinalizer, llv.Name)) added, err := addLLVFinalizerIfNotExist(ctx, cl, log, metrics, llv) if err != nil { @@ -120,7 +119,7 @@ func RunLVMLogicalVolumeWatcherController( valid, reason := validateLVMLogicalVolume(sdsCache, llv, lvg) if !valid { log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolume] the LVMLogicalVolume %s is not valid, reason: %s", llv.Name, reason)) - err = updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, llv, StatusPhaseFailed, reason) + err = updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, llv, LLVStatusPhaseFailed, reason) if err != nil { log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolume] unable to update the LVMLogicalVolume %s", llv.Name)) return reconcile.Result{}, err @@ -133,7 +132,7 @@ func RunLVMLogicalVolumeWatcherController( shouldRequeue, err := ReconcileLVMLogicalVolume(ctx, cl, log, metrics, sdsCache, llv, lvg) if err != nil { log.Error(err, fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] an error occurred while reconciling the LVMLogicalVolume: %s", request.Name)) - updErr := updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, llv, StatusPhaseFailed, err.Error()) + updErr := updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, llv, LLVStatusPhaseFailed, err.Error()) if updErr != nil { log.Error(updErr, fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] unable to update the LVMLogicalVolume %s", llv.Name)) return reconcile.Result{}, updErr @@ -155,53 +154,33 @@ func RunLVMLogicalVolumeWatcherController( return nil, err } - err = c.Watch(source.Kind(kubeCache, &v1alpha1.LVMLogicalVolume{}), handler.Funcs{ - CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) { + err = c.Watch(source.Kind(mgr.GetCache(), &v1alpha1.LVMLogicalVolume{}, handler.TypedFuncs[*v1alpha1.LVMLogicalVolume]{ + CreateFunc: func(ctx context.Context, e event.TypedCreateEvent[*v1alpha1.LVMLogicalVolume], q workqueue.RateLimitingInterface) { log.Info(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] got a create event for the LVMLogicalVolume: %s", e.Object.GetName())) request := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: e.Object.GetNamespace(), Name: e.Object.GetName()}} - q.Add(request) log.Info(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] added the request of the LVMLogicalVolume %s to Reconciler", e.Object.GetName())) }, - UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) { + UpdateFunc: func(ctx context.Context, e event.TypedUpdateEvent[*v1alpha1.LVMLogicalVolume], q workqueue.RateLimitingInterface) { log.Info(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] got an update event for the LVMLogicalVolume: %s", e.ObjectNew.GetName())) - - log.Debug(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] tries to cast the request's old object %s to LVMLogicalVolume", e.ObjectOld.GetName())) - oldLLV, ok := e.ObjectOld.(*v1alpha1.LVMLogicalVolume) - if !ok { - err = errors.New("unable to cast event object to a given type") - log.Error(err, "[RunLVMLogicalVolumeWatcherController] an error occurs while handling an update event") - return - } - log.Debug(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] successfully casted the request's old object %s to LVMLogicalVolume", e.ObjectOld.GetName())) - - log.Debug(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] tries to cast the request's new object %s to LVMLogicalVolume", e.ObjectOld.GetName())) - newLLV, ok := e.ObjectNew.(*v1alpha1.LVMLogicalVolume) - if !ok { - err = errors.New("unable to cast event object to a given type") - log.Error(err, "[RunLVMLogicalVolumeWatcherController] an error occurs while handling update event") - return - } - log.Debug(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] successfully casted the request's new object %s to LVMLogicalVolume", e.ObjectOld.GetName())) - // TODO: Figure out how to log it in our logger. if cfg.Loglevel == "4" { fmt.Println("==============START DIFF==================") - fmt.Println(cmp.Diff(oldLLV, newLLV)) + fmt.Println(cmp.Diff(e.ObjectOld, e.ObjectNew)) fmt.Println("==============END DIFF==================") } - if reflect.DeepEqual(oldLLV.Spec, newLLV.Spec) && newLLV.DeletionTimestamp == nil { - log.Info(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] no target changes were made for the LVMLogicalVolume %s. No need to reconcile the request", newLLV.Name)) + if reflect.DeepEqual(e.ObjectOld.Spec, e.ObjectNew.Spec) && e.ObjectNew.DeletionTimestamp == nil { + log.Info(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] no target changes were made for the LVMLogicalVolume %s. No need to reconcile the request", e.ObjectNew.Name)) return } - request := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: newLLV.Namespace, Name: newLLV.Name}} + request := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: e.ObjectNew.Namespace, Name: e.ObjectNew.Name}} q.Add(request) log.Info(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] added the request of the LVMLogicalVolume %s to Reconciler", e.ObjectNew.GetName())) }, - }) + })) if err != nil { log.Error(err, "[RunLVMLogicalVolumeWatcherController] the controller is unable to watch") @@ -226,9 +205,9 @@ func ReconcileLVMLogicalVolume(ctx context.Context, cl client.Client, log logger return reconcileLLVDeleteFunc(ctx, cl, log, metrics, sdsCache, llv, lvg) default: log.Info(fmt.Sprintf("[runEventReconcile] the LVMLogicalVolume %s has compeleted configuration and should not be reconciled", llv.Name)) - if llv.Status.Phase != StatusPhaseCreated { - log.Warning(fmt.Sprintf("[runEventReconcile] the LVMLogicalVolume %s should not be reconciled but has an unexpected phase: %s. Setting the phase to %s", llv.Name, llv.Status.Phase, StatusPhaseCreated)) - err := updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, llv, StatusPhaseCreated, "") + if llv.Status.Phase != LLVStatusPhaseCreated { + log.Warning(fmt.Sprintf("[runEventReconcile] the LVMLogicalVolume %s should not be reconciled but has an unexpected phase: %s. Setting the phase to %s", llv.Name, llv.Status.Phase, LLVStatusPhaseCreated)) + err := updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, llv, LLVStatusPhaseCreated, "") if err != nil { return true, err } @@ -251,7 +230,7 @@ func reconcileLLVCreateFunc( // this check prevents infinite resource updating after retries if llv.Status == nil { - err := updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, llv, StatusPhasePending, "") + err := updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, llv, LLVStatusPhasePending, "") if err != nil { log.Error(err, fmt.Sprintf("[reconcileLLVCreateFunc] unable to update the LVMLogicalVolume %s", llv.Name)) return true, err @@ -331,7 +310,7 @@ func reconcileLLVUpdateFunc( // status might be nil if a user creates the resource with LV name which matches existing LV on the node if llv.Status == nil { - err := updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, llv, StatusPhasePending, "") + err := updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, llv, LLVStatusPhasePending, "") if err != nil { log.Error(err, fmt.Sprintf("[reconcileLLVUpdateFunc] unable to update the LVMLogicalVolume %s", llv.Name)) return true, err @@ -387,7 +366,7 @@ func reconcileLLVUpdateFunc( log.Info(fmt.Sprintf("[reconcileLLVUpdateFunc] the LVMLogicalVolume %s should be resized", llv.Name)) // this check prevents infinite resource updates after retry if llv.Status.Phase != Failed { - err := updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, llv, StatusPhaseResizing, "") + err := updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, llv, LLVStatusPhaseResizing, "") if err != nil { log.Error(err, fmt.Sprintf("[reconcileLLVUpdateFunc] unable to update the LVMLogicalVolume %s", llv.Name)) return true, err diff --git a/images/agent/pkg/controller/lvm_logical_volume_watcher_func.go b/images/agent/pkg/controller/lvm_logical_volume_watcher_func.go index db7479cc..78a474e9 100644 --- a/images/agent/pkg/controller/lvm_logical_volume_watcher_func.go +++ b/images/agent/pkg/controller/lvm_logical_volume_watcher_func.go @@ -133,11 +133,11 @@ func updateLLVPhaseToCreatedIfNeeded(ctx context.Context, cl client.Client, llv } } - if llv.Status.Phase != StatusPhaseCreated || + if llv.Status.Phase != LLVStatusPhaseCreated || llv.Status.ActualSize.Value() != actualSize.Value() || llv.Status.Reason != "" || llv.Status.Contiguous != contiguous { - llv.Status.Phase = StatusPhaseCreated + llv.Status.Phase = LLVStatusPhaseCreated llv.Status.Reason = "" llv.Status.ActualSize = actualSize llv.Status.Contiguous = contiguous @@ -153,7 +153,7 @@ func updateLLVPhaseToCreatedIfNeeded(ctx context.Context, cl client.Client, llv } func deleteLVIfNeeded(log logger.Logger, sdsCache *cache.Cache, vgName string, llv *v1alpha1.LVMLogicalVolume) error { - lv := FindLV(sdsCache, vgName, llv.Spec.ActualLVNameOnTheNode) + lv := sdsCache.FindLV(vgName, llv.Spec.ActualLVNameOnTheNode) if lv == nil { log.Warning(fmt.Sprintf("[deleteLVIfNeeded] did not find LV %s in VG %s", llv.Spec.ActualLVNameOnTheNode, vgName)) return nil @@ -176,7 +176,7 @@ func deleteLVIfNeeded(log logger.Logger, sdsCache *cache.Cache, vgName string, l } func getLVActualSize(sdsCache *cache.Cache, vgName, lvName string) resource.Quantity { - lv := FindLV(sdsCache, vgName, lvName) + lv := sdsCache.FindLV(vgName, lvName) if lv == nil { return resource.Quantity{} } @@ -207,7 +207,7 @@ func shouldReconcileByCreateFunc(sdsCache *cache.Cache, vgName string, llv *v1al return false } - lv := FindLV(sdsCache, vgName, llv.Spec.ActualLVNameOnTheNode) + lv := sdsCache.FindLV(vgName, llv.Spec.ActualLVNameOnTheNode) if lv != nil { return false } @@ -292,30 +292,21 @@ func validateLVMLogicalVolume(sdsCache *cache.Cache, llv *v1alpha1.LVMLogicalVol if !exist { reason.WriteString("Selected thin pool does not exist in selected LVMVolumeGroup. ") } - - // if a specified Thin LV name matches the existing Thick one - lv := FindLV(sdsCache, lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode) - if lv != nil { - if !checkIfLVBelongsToLLV(llv, lv) { - reason.WriteString(fmt.Sprintf("Specified LV %s is already created and does not belong to selected thin pool %s. ", lv.LVName, llv.Spec.Thin.PoolName)) - } - } case Thick: if llv.Spec.Thin != nil { reason.WriteString("Thin pool specified for Thick LV. ") } + } - // if a specified Thick LV name matches the existing Thin one - lv := FindLV(sdsCache, lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode) - if lv != nil && len(lv.LVAttr) == 0 { - reason.WriteString(fmt.Sprintf("LV %s was found on the node, but can't be validated due to its attributes is empty string. ", lv.LVName)) - break - } + // if a specified Thick LV name matches the existing Thin one + lv := sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode) + if lv != nil && len(lv.LVAttr) == 0 { + reason.WriteString(fmt.Sprintf("LV %s was found on the node, but can't be validated due to its attributes is empty string. ", lv.LVName)) + } - if lv != nil { - if !checkIfLVBelongsToLLV(llv, lv) { - reason.WriteString(fmt.Sprintf("Specified LV %s is already created and it is doesnt match the one on the node.", lv.LVName)) - } + if lv != nil { + if !checkIfLVBelongsToLLV(llv, lv) { + reason.WriteString(fmt.Sprintf("Specified LV %s is already created and it is doesnt match the one on the node.", lv.LVName)) } } @@ -355,23 +346,12 @@ func updateLVMLogicalVolume(ctx context.Context, metrics monitoring.Metrics, cl return cl.Update(ctx, llv) } -func FindLV(sdsCache *cache.Cache, vgName, lvName string) *internal.LVData { - lvs, _ := sdsCache.GetLVs() - for _, lv := range lvs { - if lv.VGName == vgName && lv.LVName == lvName { - return &lv - } - } - - return nil -} - func shouldReconcileByUpdateFunc(sdsCache *cache.Cache, vgName string, llv *v1alpha1.LVMLogicalVolume) bool { if llv.DeletionTimestamp != nil { return false } - lv := FindLV(sdsCache, vgName, llv.Spec.ActualLVNameOnTheNode) + lv := sdsCache.FindLV(vgName, llv.Spec.ActualLVNameOnTheNode) if lv == nil { return false } diff --git a/images/agent/pkg/controller/lvm_logical_volume_watcher_test.go b/images/agent/pkg/controller/lvm_logical_volume_watcher_test.go index 329fb54a..ba81dd61 100644 --- a/images/agent/pkg/controller/lvm_logical_volume_watcher_test.go +++ b/images/agent/pkg/controller/lvm_logical_volume_watcher_test.go @@ -280,7 +280,7 @@ func TestLVMLogicaVolumeWatcher(t *testing.T) { Size: *specSize, }, Status: &v1alpha1.LVMLogicalVolumeStatus{ - Phase: StatusPhaseCreated, + Phase: LLVStatusPhaseCreated, ActualSize: *statusSize, }, } @@ -300,7 +300,7 @@ func TestLVMLogicaVolumeWatcher(t *testing.T) { llv := &v1alpha1.LVMLogicalVolume{ ObjectMeta: v1.ObjectMeta{DeletionTimestamp: &v1.Time{}}, Status: &v1alpha1.LVMLogicalVolumeStatus{ - Phase: StatusPhaseCreated, + Phase: LLVStatusPhaseCreated, }, } @@ -317,7 +317,7 @@ func TestLVMLogicaVolumeWatcher(t *testing.T) { Size: *specSize, }, Status: &v1alpha1.LVMLogicalVolumeStatus{ - Phase: StatusPhaseCreated, + Phase: LLVStatusPhaseCreated, ActualSize: *statusSize, }, } @@ -342,7 +342,7 @@ func TestLVMLogicaVolumeWatcher(t *testing.T) { t.Run("if_phase_created_returns_false", func(t *testing.T) { llv := &v1alpha1.LVMLogicalVolume{ Status: &v1alpha1.LVMLogicalVolumeStatus{ - Phase: StatusPhaseCreated, + Phase: LLVStatusPhaseCreated, }, } @@ -356,7 +356,7 @@ func TestLVMLogicaVolumeWatcher(t *testing.T) { t.Run("if_phase_resizing_returns_false", func(t *testing.T) { llv := &v1alpha1.LVMLogicalVolume{ Status: &v1alpha1.LVMLogicalVolumeStatus{ - Phase: StatusPhaseResizing, + Phase: LLVStatusPhaseResizing, }, } @@ -396,7 +396,7 @@ func TestLVMLogicaVolumeWatcher(t *testing.T) { t.Run("if_phase_pending_returns_false", func(t *testing.T) { llv := &v1alpha1.LVMLogicalVolume{ Status: &v1alpha1.LVMLogicalVolumeStatus{ - Phase: StatusPhasePending, + Phase: LLVStatusPhasePending, }, } @@ -410,7 +410,7 @@ func TestLVMLogicaVolumeWatcher(t *testing.T) { t.Run("if_phase_resizing_returns_false", func(t *testing.T) { llv := &v1alpha1.LVMLogicalVolume{ Status: &v1alpha1.LVMLogicalVolumeStatus{ - Phase: StatusPhaseResizing, + Phase: LLVStatusPhaseResizing, }, } @@ -429,7 +429,7 @@ func TestLVMLogicaVolumeWatcher(t *testing.T) { Size: *specSize, }, Status: &v1alpha1.LVMLogicalVolumeStatus{ - Phase: StatusPhaseCreated, + Phase: LLVStatusPhaseCreated, ActualSize: *statusSize, }, } @@ -449,7 +449,7 @@ func TestLVMLogicaVolumeWatcher(t *testing.T) { Size: *specSize, }, Status: &v1alpha1.LVMLogicalVolumeStatus{ - Phase: StatusPhaseCreated, + Phase: LLVStatusPhaseCreated, ActualSize: *statusSize, }, } @@ -469,7 +469,7 @@ func TestLVMLogicaVolumeWatcher(t *testing.T) { Size: *specSize, }, Status: &v1alpha1.LVMLogicalVolumeStatus{ - Phase: StatusPhaseCreated, + Phase: LLVStatusPhaseCreated, ActualSize: *statusSize, }, } @@ -509,7 +509,7 @@ func TestLVMLogicaVolumeWatcher(t *testing.T) { Name: "test", }, Status: &v1alpha1.LVMLogicalVolumeStatus{ - Phase: StatusPhaseCreated, + Phase: LLVStatusPhaseCreated, Reason: "", }, } @@ -527,7 +527,7 @@ func TestLVMLogicaVolumeWatcher(t *testing.T) { } }() - err = updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, llv, StatusPhaseFailed, reason) + err = updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, llv, LLVStatusPhaseFailed, reason) if assert.NoError(t, err) { newLLV := &v1alpha1.LVMLogicalVolume{} err = cl.Get(ctx, client.ObjectKey{ @@ -535,7 +535,7 @@ func TestLVMLogicaVolumeWatcher(t *testing.T) { Namespace: "", }, newLLV) - assert.Equal(t, newLLV.Status.Phase, StatusPhaseFailed) + assert.Equal(t, newLLV.Status.Phase, LLVStatusPhaseFailed) assert.Equal(t, newLLV.Status.Reason, reason) } }) @@ -629,7 +629,7 @@ func TestLVMLogicaVolumeWatcher(t *testing.T) { Name: lvgName, }, Status: &v1alpha1.LVMLogicalVolumeStatus{ - Phase: StatusPhasePending, + Phase: LLVStatusPhasePending, Reason: "", ActualSize: *resource.NewQuantity(oldSize, resource.BinarySI), }, @@ -658,11 +658,11 @@ func TestLVMLogicaVolumeWatcher(t *testing.T) { } if assert.NotNil(t, oldLLV) { - assert.Equal(t, StatusPhasePending, oldLLV.Status.Phase) + assert.Equal(t, LLVStatusPhasePending, oldLLV.Status.Phase) assert.Equal(t, oldSize, oldLLV.Status.ActualSize.Value()) } - oldLLV.Status.Phase = StatusPhaseCreated + oldLLV.Status.Phase = LLVStatusPhaseCreated oldLLV.Status.ActualSize = *resource.NewQuantity(newSize, resource.BinarySI) err = updateLVMLogicalVolume(ctx, metrics, cl, oldLLV) @@ -676,7 +676,7 @@ func TestLVMLogicaVolumeWatcher(t *testing.T) { return } - assert.Equal(t, StatusPhaseCreated, newLLV.Status.Phase) + assert.Equal(t, LLVStatusPhaseCreated, newLLV.Status.Phase) assert.Equal(t, newSize, newLLV.Status.ActualSize.Value()) } }) diff --git a/images/agent/pkg/controller/lvm_volume_group_discover.go b/images/agent/pkg/controller/lvm_volume_group_discover.go index a9e59701..502176b4 100644 --- a/images/agent/pkg/controller/lvm_volume_group_discover.go +++ b/images/agent/pkg/controller/lvm_volume_group_discover.go @@ -138,15 +138,9 @@ func LVMVolumeGroupDiscoverReconcile(ctx context.Context, cl kclient.Client, met shouldRequeue := false for _, candidate := range candidates { if lvg, exist := filteredLVGs[candidate.ActualVGNameOnTheNode]; exist { - if !shouldReconcileLVG(&lvg) { - log.Warning(fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] the LVMVolumeGroup %s is not ready to be reconciled due its conditions state, retry...", lvg.Name)) - shouldRequeue = true - continue - } - log.Debug(fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] the LVMVolumeGroup %s is already exist. Tries to update it", lvg.Name)) - log.Trace(fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] candidate: %v", candidate)) - log.Trace(fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] lvg: %v", lvg)) + log.Trace(fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] candidate: %+v", candidate)) + log.Trace(fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] lvg: %+v", lvg)) if !hasLVMVolumeGroupDiff(log, lvg, candidate) { log.Debug(fmt.Sprintf(`[RunLVMVolumeGroupDiscoverController] no data to update for LvmVolumeGroup, name: "%s"`, lvg.Name)) @@ -206,20 +200,6 @@ func LVMVolumeGroupDiscoverReconcile(ctx context.Context, cl kclient.Client, met return false } -func shouldReconcileLVG(lvg *v1alpha1.LvmVolumeGroup) bool { - if lvg.Status.Conditions == nil { - return false - } - - for _, c := range lvg.Status.Conditions { - if c.Type == internal.TypeVGConfigurationApplied && c.Reason == internal.ReasonCreating { - return false - } - } - - return true -} - func filterLVGsByNode( ctx context.Context, cl kclient.Client, @@ -438,10 +418,9 @@ func ReconcileUnhealthyLVMVolumeGroups( } func GetLVMVolumeGroupCandidates(log logger.Logger, sdsCache *cache.Cache, bds map[string]v1alpha1.BlockDevice, currentNode string) ([]internal.LVMVolumeGroupCandidate, error) { - var candidates []internal.LVMVolumeGroupCandidate - vgs, vgErrs := sdsCache.GetVGs() vgWithTag := filterVGByTag(vgs, internal.LVMTags) + candidates := make([]internal.LVMVolumeGroupCandidate, 0, len(vgWithTag)) // If there is no VG with our tag, then there is no any candidate. if len(vgWithTag) == 0 { @@ -892,13 +871,12 @@ func UpdateLVMVolumeGroupByCandidate( ) error { // Check if VG has some problems if candidate.Health == NonOperational { - err := errors.New(candidate.Message) - log.Error(err, fmt.Sprintf("[UpdateLVMVolumeGroupByCandidate] candidate for LVMVolumeGroup %s has NonOperational health, message %s. Update the VGReady condition to False", lvg.Name, candidate.Message)) + log.Warning(fmt.Sprintf("[UpdateLVMVolumeGroupByCandidate] candidate for LVMVolumeGroup %s has NonOperational health, message %s. Update the VGReady condition to False", lvg.Name, candidate.Message)) updErr := updateLVGConditionIfNeeded(ctx, cl, log, lvg, metav1.ConditionFalse, internal.TypeVGReady, internal.ReasonScanFailed, candidate.Message) if updErr != nil { - log.Error(err, fmt.Sprintf("[UpdateLVMVolumeGroupByCandidate] unable to add a condition %s to the LVMVolumeGroup %s", internal.TypeVGReady, lvg.Name)) + log.Error(updErr, fmt.Sprintf("[UpdateLVMVolumeGroupByCandidate] unable to add a condition %s to the LVMVolumeGroup %s", internal.TypeVGReady, lvg.Name)) } - return err + return updErr } // The resource.Status.Nodes can not be just re-written, it needs to be updated directly by a node. @@ -955,10 +933,8 @@ func convertLVMVGNodes(nodes map[string][]internal.LVMVGDevice) []v1alpha1.LvmVo lvmvgNodes := make([]v1alpha1.LvmVolumeGroupNode, 0, len(nodes)) for nodeName, nodeDevices := range nodes { - convertedDevices := convertLVMVGDevices(nodeDevices) - lvmvgNodes = append(lvmvgNodes, v1alpha1.LvmVolumeGroupNode{ - Devices: convertedDevices, + Devices: convertLVMVGDevices(nodeDevices), Name: nodeName, }) } diff --git a/images/agent/pkg/controller/lvm_volume_group_watcher.go b/images/agent/pkg/controller/lvm_volume_group_watcher.go index 5a5d0184..e80ea32b 100644 --- a/images/agent/pkg/controller/lvm_volume_group_watcher.go +++ b/images/agent/pkg/controller/lvm_volume_group_watcher.go @@ -192,8 +192,8 @@ func RunLVMVolumeGroupWatcherController( return nil, err } - err = c.Watch(source.Kind(mgrCache, &v1alpha1.LvmVolumeGroup{}), handler.Funcs{ - CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) { + err = c.Watch(source.Kind(mgrCache, &v1alpha1.LvmVolumeGroup{}, handler.TypedFuncs[*v1alpha1.LvmVolumeGroup]{ + CreateFunc: func(ctx context.Context, e event.TypedCreateEvent[*v1alpha1.LvmVolumeGroup], q workqueue.RateLimitingInterface) { log.Info(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] createFunc got a create event for the LVMVolumeGroup, name: %s", e.Object.GetName())) request := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: e.Object.GetNamespace(), Name: e.Object.GetName()}} @@ -201,36 +201,19 @@ func RunLVMVolumeGroupWatcherController( log.Info(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] createFunc added a request for the LVMVolumeGroup %s to the Reconcilers queue", e.Object.GetName())) }, - UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) { + UpdateFunc: func(ctx context.Context, e event.TypedUpdateEvent[*v1alpha1.LvmVolumeGroup], q workqueue.RateLimitingInterface) { log.Info(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] UpdateFunc got a update event for the LVMVolumeGroup %s", e.ObjectNew.GetName())) - - oldLVG, ok := e.ObjectOld.(*v1alpha1.LvmVolumeGroup) - if !ok { - err = errors.New("unable to cast event object to a given type") - log.Error(err, "[RunLVMVolumeGroupWatcherController] an error occurred while handling a create event") - return - } - log.Debug(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] successfully casted an old state of the LVMVolumeGroup %s", oldLVG.Name)) - - newLVG, ok := e.ObjectNew.(*v1alpha1.LvmVolumeGroup) - if !ok { - err = errors.New("unable to cast event object to a given type") - log.Error(err, "[RunLVMVolumeGroupWatcherController] an error occurred while handling a create event") - return - } - log.Debug(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] successfully casted a new state of the LVMVolumeGroup %s", newLVG.Name)) - - if !shouldReconcileUpdateEvent(log, oldLVG, newLVG) { - log.Info(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] update event for the LVMVolumeGroup %s should not be reconciled as not target changed were made", newLVG.Name)) + if !shouldLVGWatcherReconcileUpdateEvent(log, e.ObjectOld, e.ObjectNew) { + log.Info(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] update event for the LVMVolumeGroup %s should not be reconciled as not target changed were made", e.ObjectNew.Name)) return } request := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: e.ObjectNew.GetNamespace(), Name: e.ObjectNew.GetName()}} q.Add(request) - log.Info(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] updateFunc added a request for the LVMVolumeGroup %s to the Reconcilers queue", newLVG.Name)) + log.Info(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] updateFunc added a request for the LVMVolumeGroup %s to the Reconcilers queue", e.ObjectNew.Name)) }, - }) + })) if err != nil { log.Error(err, "[RunLVMVolumeGroupWatcherController] error Watch controller RunLVMVolumeGroupWatcherController") @@ -354,7 +337,7 @@ func reconcileLVGUpdateFunc(ctx context.Context, cl client.Client, log logger.Lo log.Debug(fmt.Sprintf("[reconcileLVGUpdateFunc] tries to validate the LVMVolumeGroup %s", lvg.Name)) pvs, _ := sdsCache.GetPVs() - valid, reason := validateLVGForUpdateFunc(log, lvg, blockDevices, pvs) + valid, reason := validateLVGForUpdateFunc(log, sdsCache, lvg, blockDevices, pvs) if !valid { log.Warning(fmt.Sprintf("[reconcileLVGUpdateFunc] the LVMVolumeGroup %s is not valid", lvg.Name)) err := updateLVGConditionIfNeeded(ctx, cl, log, lvg, v1.ConditionFalse, internal.TypeVGConfigurationApplied, internal.ReasonValidationFailed, reason) @@ -397,30 +380,30 @@ func reconcileLVGUpdateFunc(ctx context.Context, cl client.Client, log logger.Lo log.Debug(fmt.Sprintf("[reconcileLVGUpdateFunc] no need to update VG %s tag of the LVMVolumeGroup %s", vg.VGName, lvg.Name)) } - log.Debug(fmt.Sprintf("[reconcileLVGUpdateFunc] starts to extend VG %s of the LVMVolumeGroup %s", vg.VGName, lvg.Name)) - err = ExtendVGIfNeeded(ctx, cl, log, metrics, lvg, vg, pvs, blockDevices) + log.Debug(fmt.Sprintf("[reconcileLVGUpdateFunc] starts to resize PV of the LVMVolumeGroup %s", lvg.Name)) + err = ResizePVIfNeeded(ctx, cl, log, metrics, lvg) if err != nil { - log.Error(err, fmt.Sprintf("[reconcileLVGUpdateFunc] unable to extend VG of the LVMVolumeGroup %s", lvg.Name)) - err = updateLVGConditionIfNeeded(ctx, cl, log, lvg, v1.ConditionFalse, internal.TypeVGConfigurationApplied, "VGExtendFailed", fmt.Sprintf("unable to extend VG, err: %s", err.Error())) + log.Error(err, fmt.Sprintf("[reconcileLVGUpdateFunc] unable to resize PV of the LVMVolumeGroup %s", lvg.Name)) + err = updateLVGConditionIfNeeded(ctx, cl, log, lvg, v1.ConditionFalse, internal.TypeVGConfigurationApplied, "PVResizeFailed", fmt.Sprintf("unable to resize PV, err: %s", err.Error())) if err != nil { log.Error(err, fmt.Sprintf("[reconcileLVGUpdateFunc] unable to add a condition %s to the LVMVolumeGroup %s", internal.TypeVGConfigurationApplied, lvg.Name)) } - return true, err } - log.Debug(fmt.Sprintf("[reconcileLVGUpdateFunc] successfully ended the extend operation for VG of the LVMVolumeGroup %s", lvg.Name)) + log.Debug(fmt.Sprintf("[reconcileLVGUpdateFunc] successfully ended the resize operation for PV of the LVMVolumeGroup %s", lvg.Name)) - log.Debug(fmt.Sprintf("[reconcileLVGUpdateFunc] starts to resize PV of the LVMVolumeGroup %s", lvg.Name)) - err = ResizePVIfNeeded(ctx, cl, log, metrics, lvg) + log.Debug(fmt.Sprintf("[reconcileLVGUpdateFunc] starts to extend VG %s of the LVMVolumeGroup %s", vg.VGName, lvg.Name)) + err = ExtendVGIfNeeded(ctx, cl, log, metrics, lvg, vg, pvs, blockDevices) if err != nil { - log.Error(err, fmt.Sprintf("[reconcileLVGUpdateFunc] unable to resize PV of the LVMVolumeGroup %s", lvg.Name)) - err = updateLVGConditionIfNeeded(ctx, cl, log, lvg, v1.ConditionFalse, internal.TypeVGConfigurationApplied, "PVResizeFailed", fmt.Sprintf("unable to resize PV, err: %s", err.Error())) + log.Error(err, fmt.Sprintf("[reconcileLVGUpdateFunc] unable to extend VG of the LVMVolumeGroup %s", lvg.Name)) + err = updateLVGConditionIfNeeded(ctx, cl, log, lvg, v1.ConditionFalse, internal.TypeVGConfigurationApplied, "VGExtendFailed", fmt.Sprintf("unable to extend VG, err: %s", err.Error())) if err != nil { log.Error(err, fmt.Sprintf("[reconcileLVGUpdateFunc] unable to add a condition %s to the LVMVolumeGroup %s", internal.TypeVGConfigurationApplied, lvg.Name)) } + return true, err } - log.Debug(fmt.Sprintf("[reconcileLVGUpdateFunc] successfully ended the resize operation for PV of the LVMVolumeGroup %s", lvg.Name)) + log.Debug(fmt.Sprintf("[reconcileLVGUpdateFunc] successfully ended the extend operation for VG of the LVMVolumeGroup %s", lvg.Name)) if lvg.Spec.ThinPools != nil { log.Debug(fmt.Sprintf("[reconcileLVGUpdateFunc] starts to reconcile thin-pools of the LVMVolumeGroup %s", lvg.Name)) diff --git a/images/agent/pkg/controller/lvm_volume_group_watcher_func.go b/images/agent/pkg/controller/lvm_volume_group_watcher_func.go index cd3eca9a..a87e907c 100644 --- a/images/agent/pkg/controller/lvm_volume_group_watcher_func.go +++ b/images/agent/pkg/controller/lvm_volume_group_watcher_func.go @@ -73,17 +73,26 @@ func checkIfVGExist(vgName string, vgs []internal.VGData) bool { return false } -func shouldReconcileUpdateEvent(log logger.Logger, oldLVG, newLVG *v1alpha1.LvmVolumeGroup) bool { +func shouldLVGWatcherReconcileUpdateEvent(log logger.Logger, oldLVG, newLVG *v1alpha1.LvmVolumeGroup) bool { if newLVG.DeletionTimestamp != nil { - log.Debug(fmt.Sprintf("[shouldReconcileUpdateEvent] update event should be reconciled as LVMVolumeGroup %s has deletionTimestamp", newLVG.Name)) + log.Debug(fmt.Sprintf("[shouldLVGWatcherReconcileUpdateEvent] update event should be reconciled as the LVMVolumeGroup %s has deletionTimestamp", newLVG.Name)) return true } if !reflect.DeepEqual(oldLVG.Spec, newLVG.Spec) { - log.Debug(fmt.Sprintf("[shouldReconcileUpdateEvent] update event should be reconciled as LVMVolumeGroup %s configuration has been changed", newLVG.Name)) + log.Debug(fmt.Sprintf("[shouldLVGWatcherReconcileUpdateEvent] update event should be reconciled as the LVMVolumeGroup %s configuration has been changed", newLVG.Name)) return true } + for _, c := range newLVG.Status.Conditions { + if c.Type == internal.TypeVGConfigurationApplied { + if c.Reason == internal.ReasonUpdating || c.Reason == internal.ReasonCreating { + log.Debug(fmt.Sprintf("[shouldLVGWatcherReconcileUpdateEvent] update event should not be reconciled as the LVMVolumeGroup %s reconciliation still in progress", newLVG.Name)) + return false + } + } + } + for _, n := range newLVG.Status.Nodes { for _, d := range n.Devices { if !utils.AreSizesEqualWithinDelta(d.PVSize, d.DevSize, internal.ResizeDelta) { @@ -357,7 +366,7 @@ func validateLVGForCreateFunc(log logger.Logger, lvg *v1alpha1.LvmVolumeGroup, b return true, "" } -func validateLVGForUpdateFunc(log logger.Logger, lvg *v1alpha1.LvmVolumeGroup, blockDevices map[string]v1alpha1.BlockDevice, pvs []internal.PVData) (bool, string) { +func validateLVGForUpdateFunc(log logger.Logger, sdsCache *cache.Cache, lvg *v1alpha1.LvmVolumeGroup, blockDevices map[string]v1alpha1.BlockDevice, pvs []internal.PVData) (bool, string) { reason := strings.Builder{} log.Debug(fmt.Sprintf("[validateLVGForUpdateFunc] check if every new BlockDevice of the LVMVolumeGroup %s is comsumable", lvg.Name)) @@ -406,9 +415,17 @@ func validateLVGForUpdateFunc(log logger.Logger, lvg *v1alpha1.LvmVolumeGroup, b if lvg.Spec.ThinPools != nil { log.Debug(fmt.Sprintf("[validateLVGForUpdateFunc] the LVMVolumeGroup %s has thin-pools. Validate them", lvg.Name)) - usedThinPools := make(map[string]v1alpha1.LvmVolumeGroupThinPoolStatus, len(lvg.Status.ThinPools)) - for _, tp := range lvg.Status.ThinPools { - usedThinPools[tp.Name] = tp + actualThinPools := make(map[string]internal.LVData, len(lvg.Spec.ThinPools)) + for _, tp := range lvg.Spec.ThinPools { + lv := sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, tp.Name) + if lv != nil { + if !isThinPool(*lv) { + reason.WriteString(fmt.Sprintf("LV %s is already created on the node and it is not a thin-pool", lv.LVName)) + continue + } + + actualThinPools[lv.LVName] = *lv + } } // check if added thin-pools has valid requested size @@ -416,8 +433,16 @@ func validateLVGForUpdateFunc(log logger.Logger, lvg *v1alpha1.LvmVolumeGroup, b addingThinPoolSize int64 hasFullThinPool = false ) + + vg := sdsCache.FindVG(lvg.Spec.ActualVGNameOnTheNode) + if vg == nil { + reason.WriteString(fmt.Sprintf("Missed VG %s in the cache", lvg.Spec.ActualVGNameOnTheNode)) + return false, reason.String() + } + for _, specTp := range lvg.Spec.ThinPools { - tpRequestedSize, err := getRequestedSizeFromString(specTp.Size, lvg.Status.VGSize) + // might be a case when Thin-pool is already created, but is not shown in status + tpRequestedSize, err := getRequestedSizeFromString(specTp.Size, vg.VGSize) if err != nil { reason.WriteString(err.Error()) continue @@ -438,20 +463,20 @@ func validateLVGForUpdateFunc(log logger.Logger, lvg *v1alpha1.LvmVolumeGroup, b reason.WriteString(fmt.Sprintf("Thin-pool %s requests size of full VG space, but there are any other thin-pools. ", specTp.Name)) } case false: - if statusTp, used := usedThinPools[specTp.Name]; !used { - log.Debug(fmt.Sprintf("[validateLVGForUpdateFunc] thin-pool %s of the LVMVolumeGroup %s is not used yet, adds its requested size", specTp.Name, lvg.Name)) + if actualThinPool, created := actualThinPools[specTp.Name]; !created { + log.Debug(fmt.Sprintf("[validateLVGForUpdateFunc] thin-pool %s of the LVMVolumeGroup %s is not yet created, adds its requested size", specTp.Name, lvg.Name)) addingThinPoolSize += tpRequestedSize.Value() } else { log.Debug(fmt.Sprintf("[validateLVGForUpdateFunc] thin-pool %s of the LVMVolumeGroup %s is already created, check its requested size", specTp.Name, lvg.Name)) - if tpRequestedSize.Value()+internal.ResizeDelta.Value() < statusTp.ActualSize.Value() { - log.Debug(fmt.Sprintf("[validateLVGForUpdateFunc] the LVMVolumeGroup %s Spec.ThinPool %s size %s is less than Status one: %s", lvg.Name, specTp.Name, tpRequestedSize.String(), statusTp.ActualSize.String())) - reason.WriteString(fmt.Sprintf("Requested Spec.ThinPool %s size %s is less than actual one %s. ", specTp.Name, tpRequestedSize.String(), statusTp.ActualSize.String())) + if tpRequestedSize.Value()+internal.ResizeDelta.Value() < actualThinPool.LVSize.Value() { + log.Debug(fmt.Sprintf("[validateLVGForUpdateFunc] the LVMVolumeGroup %s Spec.ThinPool %s size %s is less than Status one: %s", lvg.Name, specTp.Name, tpRequestedSize.String(), actualThinPool.LVSize.String())) + reason.WriteString(fmt.Sprintf("Requested Spec.ThinPool %s size %s is less than actual one %s. ", specTp.Name, tpRequestedSize.String(), actualThinPool.LVSize.String())) continue } - thinPoolSizeDiff := tpRequestedSize.Value() - statusTp.ActualSize.Value() + thinPoolSizeDiff := tpRequestedSize.Value() - actualThinPool.LVSize.Value() if thinPoolSizeDiff > internal.ResizeDelta.Value() { - log.Debug(fmt.Sprintf("[validateLVGForUpdateFunc] the LVMVolumeGroup %s Spec.ThinPool %s size %s more than Status one: %s", lvg.Name, specTp.Name, tpRequestedSize.String(), statusTp.ActualSize.String())) + log.Debug(fmt.Sprintf("[validateLVGForUpdateFunc] the LVMVolumeGroup %s Spec.ThinPool %s size %s more than Status one: %s", lvg.Name, specTp.Name, tpRequestedSize.String(), actualThinPool.LVSize.String())) addingThinPoolSize += thinPoolSizeDiff } } @@ -495,11 +520,8 @@ func shouldReconcileLVGByCreateFunc(lvg *v1alpha1.LvmVolumeGroup, ch *cache.Cach return false } - vgs, _ := ch.GetVGs() - for _, vg := range vgs { - if vg.VGName == lvg.Spec.ActualVGNameOnTheNode { - return false - } + if vg := ch.FindVG(lvg.Spec.ActualVGNameOnTheNode); vg != nil { + return false } return true @@ -510,11 +532,8 @@ func shouldReconcileLVGByUpdateFunc(lvg *v1alpha1.LvmVolumeGroup, ch *cache.Cach return false } - vgs, _ := ch.GetVGs() - for _, vg := range vgs { - if vg.VGName == lvg.Spec.ActualVGNameOnTheNode { - return true - } + if vg := ch.FindVG(lvg.Spec.ActualVGNameOnTheNode); vg != nil { + return true } return false diff --git a/images/agent/pkg/scanner/scanner.go b/images/agent/pkg/scanner/scanner.go index ccaa54a7..f5b2f60f 100644 --- a/images/agent/pkg/scanner/scanner.go +++ b/images/agent/pkg/scanner/scanner.go @@ -158,22 +158,23 @@ func runControllersReconcile(ctx context.Context, log logger.Logger, bdCtrl, lvg } func fillTheCache(ctx context.Context, log logger.Logger, cache *cache.Cache, cfg config.Options) error { - devices, devErr, err := scanDevices(ctx, log, cfg) + // the scan operations order is very important as it guarantees the consistent and reliable data from the node + lvs, lvsErr, err := scanLVs(ctx, log, cfg) if err != nil { return err } - pvs, pvsErr, err := scanPVs(ctx, log, cfg) + vgs, vgsErr, err := scanVGs(ctx, log, cfg) if err != nil { return err } - vgs, vgsErr, err := scanVGs(ctx, log, cfg) + pvs, pvsErr, err := scanPVs(ctx, log, cfg) if err != nil { return err } - lvs, lvsErr, err := scanLVs(ctx, log, cfg) + devices, devErr, err := scanDevices(ctx, log, cfg) if err != nil { return err } diff --git a/images/agent/pkg/utils/commands.go b/images/agent/pkg/utils/commands.go index 9ab96d90..e39806f4 100644 --- a/images/agent/pkg/utils/commands.go +++ b/images/agent/pkg/utils/commands.go @@ -332,7 +332,9 @@ func ExtendLV(size int64, vgName, lvName string) (string, error) { var stderr bytes.Buffer cmd.Stderr = &stderr - if err := cmd.Run(); err != nil { + err := cmd.Run() + filteredStdErr := filterStdErr(cmd.String(), stderr) + if err != nil && filteredStdErr.Len() > 0 { return cmd.String(), fmt.Errorf("unable to run cmd: %s, err: %w, stderr: %s", cmd.String(), err, stderr.String()) }