diff --git a/crds/lvmlogicalvolume.yaml b/crds/lvmlogicalvolume.yaml index 6d07087e..9d969a53 100644 --- a/crds/lvmlogicalvolume.yaml +++ b/crds/lvmlogicalvolume.yaml @@ -33,13 +33,19 @@ spec: description: | properties: type: + x-kubernetes-validations: + - rule: self == oldSelf type: string enum: [Thick, Thin] size: type: string lvmVolumeGroup: + x-kubernetes-validations: + - rule: self == oldSelf type: string thin: + x-kubernetes-validations: + - rule: self == oldSelf type: object properties: poolName: @@ -56,6 +62,10 @@ spec: actualSize: type: string additionalPrinterColumns: + - jsonPath: .status.phase + name: Phase + type: string + description: The current resource status. - jsonPath: .spec.lvmVolumeGroup name: LVMVolumeGroup type: string diff --git a/images/agent/cmd/bc/main.go b/images/agent/cmd/bc/main.go index 739e992b..c302b5f7 100644 --- a/images/agent/cmd/bc/main.go +++ b/images/agent/cmd/bc/main.go @@ -110,28 +110,33 @@ func main() { } log.Info("[main] ReTag ends") - if _, err := controller.RunBlockDeviceController(ctx, mgr, *cfgParams, *log, metrics); err != nil { + if _, err = controller.RunBlockDeviceController(ctx, mgr, *cfgParams, *log, metrics); err != nil { log.Error(err, "[main] unable to controller.RunBlockDeviceController") os.Exit(1) } - if _, err := controller.RunLVMVolumeGroupWatcherController(mgr, *cfgParams, *log, metrics); err != nil { - log.Error(err, "[main] error Run RunLVMVolumeGroupController") + if _, err = controller.RunLVMVolumeGroupWatcherController(mgr, *cfgParams, *log, metrics); err != nil { + log.Error(err, "[main] unable to controller.RunLVMVolumeGroupWatcherController") os.Exit(1) } - if _, err := controller.RunLVMVolumeGroupDiscoverController(ctx, mgr, *cfgParams, *log, metrics); err != nil { + if _, err = controller.RunLVMVolumeGroupDiscoverController(ctx, mgr, *cfgParams, *log, metrics); err != nil { log.Error(err, "[main] unable to controller.RunLVMVolumeGroupDiscoverController") os.Exit(1) } - if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { + if _, err = controller.RunLVMLogicalVolumeWatcherController(mgr, *cfgParams, *log, metrics); err != nil { + log.Error(err, "[main] unable to controller.RunLVMLogicalVolumeWatcherController") + os.Exit(1) + } + + if err = mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { log.Error(err, "[main] unable to mgr.AddHealthzCheck") os.Exit(1) } log.Info("[main] successfully AddHealthzCheck") - if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { + if err = mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { log.Error(err, "[main] unable to mgr.AddReadyzCheck") os.Exit(1) } diff --git a/images/agent/config/config.go b/images/agent/config/config.go index f2c804eb..db4383a9 100644 --- a/images/agent/config/config.go +++ b/images/agent/config/config.go @@ -17,8 +17,10 @@ limitations under the License. package config import ( + "bytes" "fmt" "os" + "os/exec" "sds-node-configurator/pkg/logger" "time" ) @@ -75,12 +77,19 @@ func NewConfig() (*Options, error) { func getMachineId() (string, error) { id := os.Getenv(MachineID) if id == "" { - byteId, err := os.ReadFile("/host-root/etc/machine-id") + args := []string{"-m", "-u", "-i", "-n", "-p", "-t", "1", "cat", "./etc/machine-id"} + + var stdout bytes.Buffer + cmd := exec.Command("/usr/bin/nsenter", args...) + cmd.Stdout = &stdout + err := cmd.Run() if err != nil { return "", err } - id = string(byteId) + id = stdout.String() + fmt.Println("MACHINE ID " + id) + } return id, nil diff --git a/images/agent/internal/const.go b/images/agent/internal/const.go index 4cdf0748..687d5ec1 100644 --- a/images/agent/internal/const.go +++ b/images/agent/internal/const.go @@ -17,14 +17,14 @@ limitations under the License. package internal const ( - TypePart = "part" - DRBDName = "/dev/drbd" - LoopDeviceType = "loop" - LVMDeviceType = "lvm" - LVMFSType = "LVM2_member" - SdsNodeConfigurator = "storage.deckhouse.io/sds-node-configurator" - LVMVGHealthOperational = "Operational" - LVMVGHealthNonOperational = "NonOperational" + TypePart = "part" + DRBDName = "/dev/drbd" + LoopDeviceType = "loop" + LVMDeviceType = "lvm" + LVMFSType = "LVM2_member" + SdsNodeConfiguratorFinalizer = "storage.deckhouse.io/sds-node-configurator" + LVMVGHealthOperational = "Operational" + LVMVGHealthNonOperational = "NonOperational" ) var ( @@ -32,6 +32,6 @@ var ( InvalidDeviceTypes = [...]string{LoopDeviceType, LVMDeviceType} BlockDeviceValidSize = "1G" ResizeDelta = "32Mi" - Finalizers = []string{SdsNodeConfigurator} + Finalizers = []string{SdsNodeConfiguratorFinalizer} LVMTags = []string{"storage.deckhouse.io/enabled=true", "linstor-"} ) diff --git a/images/agent/pkg/controller/lvm_logical_volume_watcher.go b/images/agent/pkg/controller/lvm_logical_volume_watcher.go index e0195854..2853bb5e 100644 --- a/images/agent/pkg/controller/lvm_logical_volume_watcher.go +++ b/images/agent/pkg/controller/lvm_logical_volume_watcher.go @@ -2,11 +2,21 @@ package controller import ( "context" + "errors" + "fmt" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/client-go/util/workqueue" + "k8s.io/utils/strings/slices" + "math" "sds-node-configurator/api/v1alpha1" "sds-node-configurator/config" + "sds-node-configurator/internal" "sds-node-configurator/pkg/logger" "sds-node-configurator/pkg/monitoring" + "sds-node-configurator/pkg/utils" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -14,38 +24,697 @@ import ( ) const ( - LVMLogicalVolumeWatcherCtrlName = "lvm-logical-volume-watcher-controller" + Thick deviceType = "Thick" + Thin deviceType = "Thin" + + CreateReconcile reconcileType = "Create" + UpdateReconcile reconcileType = "Update" + DeleteReconcile reconcileType = "Delete" + + lvmLogicalVolumeWatcherCtrlName = "lvm-logical-volume-watcher-controller" + + createdStatusPhase = "Created" + pendingStatusPhase = "Pending" + resizingStatusPhase = "Resizing" + failedStatusPhase = "Failed" ) -func RunLVMLogicalVolumeController( +type ( + deviceType string + reconcileType string +) + +func RunLVMLogicalVolumeWatcherController( mgr manager.Manager, cfg config.Options, log logger.Logger, metrics monitoring.Metrics, ) (controller.Controller, error) { - //cl := mgr.GetClient() + cl := mgr.GetClient() cache := mgr.GetCache() - c, err := controller.New(LVMLogicalVolumeWatcherCtrlName, mgr, controller.Options{ + c, err := controller.New(lvmLogicalVolumeWatcherCtrlName, mgr, controller.Options{ Reconciler: reconcile.Func(func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { return reconcile.Result{}, nil }), }) if err != nil { - log.Error(err, "[RunLVMLogicalVolumeController] unable to create controller") + log.Error(err, "[RunLVMLogicalVolumeWatcherController] unable to create controller") return nil, err } err = c.Watch(source.Kind(cache, &v1alpha1.LvmLogicalVolume{}), handler.Funcs{ - CreateFunc: nil, - UpdateFunc: nil, - DeleteFunc: nil, - GenericFunc: nil, + CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) { + log.Info("[RunLVMLogicalVolumeWatcherController] CreateFunc starts reconciliation") + + llv, ok := e.Object.(*v1alpha1.LvmLogicalVolume) + if !ok { + err = errors.New("unable to cast event object to a given type") + log.Error(err, "[CreateFunc] an error occurred while handling create event") + return + } + + lvg, err := getLVMVolumeGroup(ctx, cl, metrics, "", llv.Spec.LvmVolumeGroup) + if err != nil { + log.Error(err, "[CreateFunc] unable to get a LVMVolumeGroup") + err = updateLVMLogicalVolumePhase(ctx, cl, metrics, llv, failedStatusPhase, fmt.Sprintf("Unable to get selected LVMVolumeGroup, err: %s", err.Error())) + if err != nil { + log.Error(err, "[CreateFunc] unable to update a LVMLogicalVolume Phase") + } + return + } + + if !belongsToNode(lvg, cfg.NodeName) { + log.Debug(fmt.Sprintf("[CreateFunc] the LVMVolumeGroup %s does not belongs to the current node: %s. Reconciliation stopped", lvg.Name, cfg.NodeName)) + return + } + log.Debug(fmt.Sprintf("[CreateFunc] the LVMVolumeGroup %s belongs to the current node: %s", lvg.Name, cfg.NodeName)) + + recType, err := identifyReconcileFunc(log, llv) + if err != nil { + log.Error(err, "[CreateFunc] an error occurs while identify reconcile func") + err = updateLVMLogicalVolumePhase(ctx, cl, metrics, llv, failedStatusPhase, fmt.Sprintf("Unable to identify reconcile func, err: %s", err.Error())) + if err != nil { + log.Error(err, "[CreateFunc] unable to update a LVMLogicalVolume Phase") + } + return + } + switch recType { + case CreateReconcile: + log.Debug(fmt.Sprintf("[CreateFunc] CreateReconcile starts reconciliation for the LVMLogicalVolume: %s", llv.Name)) + reconcileLLVCreateFunc(ctx, cl, log, metrics, llv, lvg) + case UpdateReconcile: + log.Debug(fmt.Sprintf("[CreateFunc] UpdateReconcile starts reconciliation for the LVMLogicalVolume: %s", llv.Name)) + reconcileLLVUpdateFunc(ctx, cl, log, metrics, llv, lvg) + case DeleteReconcile: + log.Debug(fmt.Sprintf("[CreateFunc] DeleteReconcile starts reconciliation for the LVMLogicalVolume: %s", llv.Name)) + reconcileLLVDeleteFunc(ctx, cl, log, metrics, llv) + default: + log.Debug(fmt.Sprintf("[CreateFunc] the LVMLogicalVolume %s should not be reconciled", llv.Name)) + } + + log.Info("[RunLVMLogicalVolumeWatcherController] CreateFunc ends reconciliation") + }, + + UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) { + log.Info("[RunLVMLogicalVolumeWatcherController] UpdateFunc starts reconciliation") + + llv, ok := e.ObjectNew.(*v1alpha1.LvmLogicalVolume) + if !ok { + err = errors.New("unable to cast event object to a given type") + log.Error(err, "[UpdateFunc] an error occurs while handling update event") + return + } + + lvg, err := getLVMVolumeGroup(ctx, cl, metrics, "", llv.Spec.LvmVolumeGroup) + if err != nil { + log.Error(err, fmt.Sprintf("[UpdateFunc] unable to get the LVMVolumeGroup, name: %s", llv.Spec.LvmVolumeGroup)) + err = updateLVMLogicalVolumePhase(ctx, cl, metrics, llv, failedStatusPhase, fmt.Sprintf("Unable to get selected LVMVolumeGroup, err: %s", err.Error())) + if err != nil { + log.Error(err, "[UpdateFunc] unable to updateLVMLogicalVolumePhase") + } + return + } + + if !belongsToNode(lvg, cfg.NodeName) { + log.Debug(fmt.Sprintf("[UpdateFunc] the LVMVolumeGroup %s does not belongs to the current node: %s. Reconciliation stopped", lvg.Name, cfg.NodeName)) + return + } + log.Debug(fmt.Sprintf("[UpdateFunc] the LVMVolumeGroup %s belongs to the current node: %s", lvg.Name, cfg.NodeName)) + + recType, err := identifyReconcileFunc(log, llv) + if err != nil { + log.Error(err, "[UpdateFunc] an error occurs while identify reconcile func") + err = updateLVMLogicalVolumePhase(ctx, cl, metrics, llv, failedStatusPhase, fmt.Sprintf("Unable to identify reconcile func, err: %s", err.Error())) + if err != nil { + log.Error(err, "[UpdateFunc] unable to update a LVMLogicalVolume Phase") + } + return + } + switch recType { + case UpdateReconcile: + log.Debug(fmt.Sprintf("[UpdateFunc] UpdateReconcile starts reconciliation for the LVMLogicalVolume: %s", llv.Name)) + reconcileLLVUpdateFunc(ctx, cl, log, metrics, llv, lvg) + case DeleteReconcile: + log.Debug(fmt.Sprintf("[UpdateFunc] DeleteReconcile starts reconciliation for the LVMLogicalVolume: %s", llv.Name)) + reconcileLLVDeleteFunc(ctx, cl, log, metrics, llv) + default: + log.Debug(fmt.Sprintf("[UpdateFunc] should not reconcile the LVMLogicalVolume %s", llv.Name)) + } + + log.Info("[RunLVMLogicalVolumeWatcherController] UpdateFunc ends reconciliation") + }, }) if err != nil { - log.Error(err, "[RunLVMLogicalVolumeController] the controller is unable to watch") + log.Error(err, "[RunLVMLogicalVolumeWatcherController] the controller is unable to watch") return nil, err } return c, err } + +func identifyReconcileFunc(log logger.Logger, llv *v1alpha1.LvmLogicalVolume) (reconcileType, error) { + should, err := shouldReconcileByCreateFunc(log, llv) + if err != nil { + return "", err + } + if should { + return CreateReconcile, nil + } + + should, err = shouldReconcileByUpdateFunc(llv) + if err != nil { + return "", err + } + if should { + return UpdateReconcile, nil + } + + should = shouldReconcileByDeleteFunc(llv) + if should { + return DeleteReconcile, nil + } + + return "", nil +} + +func reconcileLLVDeleteFunc( + ctx context.Context, + cl client.Client, + log logger.Logger, + metrics monitoring.Metrics, + llv *v1alpha1.LvmLogicalVolume, +) { + log.Info("[reconcileLLVDeleteFunc] starts reconciliation") + + err := deleteLVIfExists(log, llv) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVDeleteFunc] unable to delete the LV %s", llv.Name)) + err = updateLVMLogicalVolumePhase(ctx, cl, metrics, llv, failedStatusPhase, fmt.Sprintf("Unable to delete the LV, name %s, err: %s", llv.Name, err.Error())) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVDeleteFunc] unable to update the LVMLogicalVolume %s", llv.Name)) + } + return + } + + log.Info(fmt.Sprintf("[reconcileLLVDeleteFunc] successfully deleted the LV %s", llv.Name)) + + err = removeLLVFinalizersIfExist(ctx, cl, metrics, log, llv) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVDeleteFunc] unable to remove finalizers from the LVMVolumeGroup %s", llv.Name)) + err = updateLVMLogicalVolumePhase(ctx, cl, metrics, llv, failedStatusPhase, fmt.Sprintf("Unable to remove finalizer %s, err: %s", internal.SdsNodeConfiguratorFinalizer, err.Error())) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVDeleteFunc] unable to update the LVMLogicalVolume %s", llv.Name)) + } + } + + log.Info("[reconcileLLVDeleteFunc] ends reconciliation") +} + +func shouldReconcileByDeleteFunc(llv *v1alpha1.LvmLogicalVolume) bool { + if llv.DeletionTimestamp == nil { + return false + } + + return true +} + +func removeLLVFinalizersIfExist( + ctx context.Context, + cl client.Client, + metrics monitoring.Metrics, + log logger.Logger, + llv *v1alpha1.LvmLogicalVolume, +) error { + var removed bool + for i, f := range llv.Finalizers { + if f == internal.SdsNodeConfiguratorFinalizer { + llv.Finalizers = append(llv.Finalizers[:i], llv.Finalizers[i+1:]...) + removed = true + log.Debug(fmt.Sprintf("[removeLLVFinalizersIfExist] removed finalizer %s from the LVMLogicalVolume %s", internal.SdsNodeConfiguratorFinalizer, llv.Name)) + break + } + } + + if removed { + err := updateLVMLogicalVolume(ctx, metrics, cl, llv) + if err != nil { + log.Error(err, fmt.Sprintf("[updateLVMLogicalVolume] unable to update the LVMVolumeGroup %s", llv.Name)) + return err + } + } + + return nil +} + +func deleteLVIfExists(log logger.Logger, llv *v1alpha1.LvmLogicalVolume) error { + lvs, cmd, _, err := utils.GetAllLVs() + log.Debug(fmt.Sprintf("[deleteLVIfExists] runs cmd: %s", cmd)) + if err != nil { + return err + } + + var ( + lv *internal.LVData + ) + for _, l := range lvs { + if l.LVName == llv.Name { + lv = &l + break + } + } + + if lv == nil { + log.Debug(fmt.Sprintf("[deleteLVIfExists] did not find LV %s", lv.LVName)) + return errors.New("lv does not exist") + } + + cmd, err = utils.RemoveLV(lv.VGName, lv.LVName) + log.Debug("[deleteLVIfExists] runs cmd: %s", cmd) + if err != nil { + log.Error(err, "[deleteLVIfExists] unable to RemoveLV") + return err + } + + return nil +} + +func getExtendingSize(llv *v1alpha1.LvmLogicalVolume) (resource.Quantity, error) { + return subtractQuantity(llv.Spec.Size, llv.Status.ActualSize), nil +} + +func reconcileLLVUpdateFunc( + ctx context.Context, + cl client.Client, + log logger.Logger, + metrics monitoring.Metrics, + llv *v1alpha1.LvmLogicalVolume, + lvg *v1alpha1.LvmVolumeGroup, +) { + log.Info("[reconcileLLVUpdateFunc] starts reconciliation") + + err := updateLVMLogicalVolumePhase(ctx, cl, metrics, llv, resizingStatusPhase, "") + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVUpdateFunc] unable to update the LVMLogicalVolume, name: %s", llv.Name)) + } + log.Debug(fmt.Sprintf("[reconcileLLVUpdateFunc] updated LVMLogicaVolume %s status.phase to %s", llv.Name, resizingStatusPhase)) + log.Debug(fmt.Sprintf("[reconcileLLVUpdateFunc] the LVMLogicalVolume %s spec.thin.poolname: \"%s\"", llv.Name, llv.Spec.Thin.PoolName)) + + extendingSize, err := getExtendingSize(llv) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVUpdateFunc] error occurs while getting extending size for the LVMLogicalVolume %s", llv.Name)) + err = updateLVMLogicalVolumePhase(ctx, cl, metrics, llv, failedStatusPhase, err.Error()) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVUpdateFunc] unable to update the LVMLogicalVolume, name: %s", llv.Name)) + } + return + } + log.Trace(fmt.Sprintf("[reconcileLLVUpdateFunc] the LVMLogicalVolume %s has extending size %d", llv.Name, extendingSize.Value())) + + switch getLVMLogicalVolumeType(llv) { + case Thick: + freeSpace, err := getFreeVGSpace(lvg) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVUpdateFunc] unable to count free space in VG, name: %s", lvg.Spec.ActualVGNameOnTheNode)) + err = updateLVMLogicalVolumePhase(ctx, cl, metrics, llv, failedStatusPhase, fmt.Sprintf("Unable to count free VG space, VG name %s, err: %s", lvg.Spec.ActualVGNameOnTheNode, err.Error())) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVUpdateFunc] unable to update the LVMLogicalVolume %s", llv.Name)) + } + return + } + log.Trace(fmt.Sprintf("[reconcileLLVUpdateFunc] the LVMLogicalVolume %s requested size: %d, free size: %d", llv.Name, llv.Spec.Size.Value(), freeSpace.Value())) + if freeSpace.Value() < extendingSize.Value() { + err = errors.New("not enough space") + log.Error(err, fmt.Sprintf("[reconcileLLVUpdateFunc] the LVMLogicalVolume %s requested size is more than the VG %s free space", llv.Name, lvg.Spec.ActualVGNameOnTheNode)) + err = updateLVMLogicalVolumePhase(ctx, cl, metrics, llv, failedStatusPhase, fmt.Sprintf("Not enough space on VG, requested: %d, free: %d", llv.Spec.Size.Value(), freeSpace.Value())) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVUpdateFunc] unable to update the LVMLogicalVolume %s", llv.Name)) + } + return + } + + log.Debug(fmt.Sprintf("[reconcileLLVUpdateFunc] LV %s will be extended in VG %s with Quantity value: %d", llv.Name, lvg.Spec.ActualVGNameOnTheNode, llv.Spec.Size.Value())) + cmd, err := utils.ExtendLV(llv.Spec.Size.Value(), lvg.Spec.ActualVGNameOnTheNode, llv.Name) + log.Debug(fmt.Sprintf("[reconcileLLVUpdateFunc] runs cmd: %s", cmd)) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVUpdateFunc] unable to extend LV, name: %s", llv.Name)) + err = updateLVMLogicalVolumePhase(ctx, cl, metrics, llv, failedStatusPhase, fmt.Sprintf("Unable to extend Thick LV, err: %s", err.Error())) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVUpdateFunc] unable to update the LVMLogicalVolume %s", llv.Name)) + } + return + } + case Thin: + freeSpace, err := getFreeLVSpace(log, llv.Spec.Thin.PoolName) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVUpdateFunc] unable to count free space in Thin-pool, name: %s", llv.Spec.Thin.PoolName)) + err = updateLVMLogicalVolumePhase(ctx, cl, metrics, llv, failedStatusPhase, fmt.Sprintf("Unable to count free Thin-pool space, err: %s", err.Error())) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVUpdateFunc] unable to update the LVMLogicalVolume %s", llv.Name)) + } + return + } + + log.Trace(fmt.Sprintf("[reconcileLLVUpdateFunc] the LVMLogicalVolume %s extending size: %d, free size: %d", llv.Name, extendingSize.Value(), freeSpace.Value())) + if freeSpace.Value() < extendingSize.Value() { + err = errors.New("not enough space") + log.Error(err, fmt.Sprintf("[reconcileLLVUpdateFunc] the LVMLogicalVolume %s requested size is more than the Thin-pool %s free space", llv.Name, llv.Spec.Thin.PoolName)) + err = updateLVMLogicalVolumePhase(ctx, cl, metrics, llv, failedStatusPhase, "Not enough space in a Thin-pool") + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVUpdateFunc] unable to update the LVMLogicalVolume %s", llv.Name)) + } + return + } + + log.Debug(fmt.Sprintf("[reconcileLLVUpdateFunc] LV %s will be extended in Thin-pool %s with Quantity value: %d", llv.Name, llv.Spec.Thin.PoolName, llv.Spec.Size.Value())) + cmd, err := utils.ExtendLV(llv.Spec.Size.Value(), lvg.Spec.ActualVGNameOnTheNode, llv.Name) + log.Debug(fmt.Sprintf("[reconcileLLVUpdateFunc] runs cmd: %s", cmd)) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVUpdateFunc] unable to ExtendLV, name: %s", llv.Name)) + err = updateLVMLogicalVolumePhase(ctx, cl, metrics, llv, failedStatusPhase, fmt.Sprintf("Unable to extend a Thin-pool, err: %s", err.Error())) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVUpdateFunc] unable to update the LVMLogicalVolume %s", llv.Name)) + } + return + } + + } + + log.Info(fmt.Sprintf("[reconcileLLVUpdateFunc] successfully extended Logical Volume for LVMLogicalVolume, name: %s", llv.Name)) + actualSize, err := getLVActualSize(log, llv.Name) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVUpdateFunc] unable to get actual size for LV %s", llv.Name)) + err = updateLVMLogicalVolumePhase(ctx, cl, metrics, llv, failedStatusPhase, fmt.Sprintf("Unable to get LV actual size, err: %s", err.Error())) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVUpdateFunc] unable to update the LVMLogicalVolume %s", llv.Name)) + } + return + } + log.Trace(fmt.Sprintf("[reconcileLLVUpdateFunc] the LVMLogicalVolume, name %s actual size %d", llv.Name, actualSize.Value())) + + llv.Status.Phase = createdStatusPhase + llv.Status.ActualSize = actualSize + err = updateLVMLogicalVolume(ctx, metrics, cl, llv) + if err != nil { + log.Error(err, "[reconcileLLVUpdateFunc] unable to updateLVMLogicalVolume") + return + } + + log.Info("[reconcileLLVUpdateFunc] ends reconciliation") +} + +func shouldReconcileByUpdateFunc(llv *v1alpha1.LvmLogicalVolume) (bool, error) { + if llv.DeletionTimestamp != nil { + return false, nil + } + + if llv.Status.Phase == pendingStatusPhase || + llv.Status.Phase == resizingStatusPhase { + return false, nil + } + + delta, err := resource.ParseQuantity(internal.ResizeDelta) + if err != nil { + return false, err + } + + if llv.Spec.Size.Value()+delta.Value() < llv.Status.ActualSize.Value() { + return false, fmt.Errorf("requested size %d is less than actual %d", llv.Spec.Size.Value(), llv.Status.ActualSize.Value()) + } + + if math.Abs(float64(llv.Spec.Size.Value()-llv.Status.ActualSize.Value())) < float64(delta.Value()) { + return false, nil + } + + return true, nil +} + +func reconcileLLVCreateFunc( + ctx context.Context, + cl client.Client, + log logger.Logger, + metrics monitoring.Metrics, + llv *v1alpha1.LvmLogicalVolume, + lvg *v1alpha1.LvmVolumeGroup, +) { + log.Info("[reconcileLLVCreateFunc] starts reconciliation") + + err := updateLVMLogicalVolumePhase(ctx, cl, metrics, llv, pendingStatusPhase, "") + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVCreateFunc] unable to update the LVMLogicalVolume %s", llv.Name)) + return + } + log.Debug(fmt.Sprintf("[reconcileLLVCreateFunc] updated the LVMLogicaVolume %s status.phase to %s", llv.Name, pendingStatusPhase)) + log.Debug(fmt.Sprintf("[reconcileLLVCreateFunc] the LVMLogicalVolume %s spec.thin.poolname: \"%s\"", llv.Name, llv.Spec.Thin.PoolName)) + + added, err := addLLVFinalizerIfNotExist(ctx, cl, metrics, llv) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVCreateFunc] unable to update the LVMLogicalVolume %s", llv.Name)) + return + } + log.Debug(fmt.Sprintf("[reconcileLLVCreateFunc] a finalizer to the LVMLogicalVolume %s was added: %t", llv.Name, added)) + + switch getLVMLogicalVolumeType(llv) { + case Thick: + freeSpace, err := getFreeVGSpace(lvg) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVCreateFunc] unable to count free space in VG, name: %s", lvg.Spec.ActualVGNameOnTheNode)) + err = updateLVMLogicalVolumePhase(ctx, cl, metrics, llv, failedStatusPhase, fmt.Sprintf("Unable to get free VG space, err: %s", err.Error())) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVCreateFunc] unable to updateLVMLogicalVolumePhase for LVMLogicalVolume %s", llv.Name)) + } + return + } + + log.Trace(fmt.Sprintf("[reconcileLLVCreateFunc] the LVMLogicalVolume %s requested size: %d, free size: %d", llv.Name, llv.Spec.Size.Value(), freeSpace.Value())) + if freeSpace.Value() < llv.Spec.Size.Value() { + err = errors.New("not enough space") + log.Error(err, fmt.Sprintf("[reconcileLLVCreateFunc] the LVMLogicalVolume %s requested size is more than the VG %s free space", llv.Name, lvg.Spec.ActualVGNameOnTheNode)) + err = updateLVMLogicalVolumePhase(ctx, cl, metrics, llv, failedStatusPhase, "Not enough space in VG") + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVCreateFunc] unable to update the LVMLogicalVolume %s", llv.Name)) + } + return + } + + log.Debug(fmt.Sprintf("[reconcileLLVCreateFunc] LV %s will be create in VG %s with Quantity value: %d", llv.Name, lvg.Spec.ActualVGNameOnTheNode, llv.Spec.Size.Value())) + cmd, err := utils.CreateThickLogicalVolume(lvg.Spec.ActualVGNameOnTheNode, llv.Name, llv.Spec.Size.Value()) + log.Debug(fmt.Sprintf("[reconcileLLVCreateFunc] runs cmd: %s", cmd)) + if err != nil { + log.Error(err, "[reconcileLLVCreateFunc] unable to create a thick LogicalVolume") + err = updateLVMLogicalVolumePhase(ctx, cl, metrics, llv, failedStatusPhase, fmt.Sprintf("Unable to create Thick LV, err: %s", err.Error())) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVCreateFunc] unable to update the LVMLogicalVolume, name: %s", llv.Name)) + } + return + } + case Thin: + freeSpace, err := getFreeLVSpace(log, llv.Spec.Thin.PoolName) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVCreateFunc] unable to count free space in LV, name: %s", llv.Spec.Thin.PoolName)) + err = updateLVMLogicalVolumePhase(ctx, cl, metrics, llv, failedStatusPhase, fmt.Sprintf("Unable to get free LV space, err: %s", err.Error())) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVCreateFunc] unable to update the LVMLogicalVolume, name: %s", llv.Name)) + } + return + } + + log.Trace(fmt.Sprintf("[reconcileLLVCreateFunc] the LVMLogicalVolume %s requested size: %d, free size: %d", llv.Name, llv.Spec.Size.Value(), freeSpace.Value())) + if freeSpace.Value() < llv.Spec.Size.Value() { + err = errors.New("not enough space") + log.Error(err, fmt.Sprintf("[reconcileLLVCreateFunc] the LVMLogicalVolume %s requested size is more than the Thin-pool %s free space", llv.Name, llv.Spec.Thin.PoolName)) + err = updateLVMLogicalVolumePhase(ctx, cl, metrics, llv, failedStatusPhase, "Not enough space in Thin-pool") + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVCreateFunc] unable to update the LVMLogicalVolume, name: %s", llv.Name)) + } + return + } + + log.Debug(fmt.Sprintf("[reconcileLLVCreateFunc] LV %s will be create in Thin-pool %s with size %s", llv.Name, llv.Spec.Thin.PoolName, llv.Spec.Size.String())) + cmd, err := utils.CreateThinLogicalVolume(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.Thin.PoolName, llv.Name, llv.Spec.Size.Value()) + log.Debug(fmt.Sprintf("[reconcileLLVCreateFunc] runs cmd: %s", cmd)) + if err != nil { + log.Error(err, "[reconcileLLVCreateFunc] unable to CreateThickLogicalVolume") + err = updateLVMLogicalVolumePhase(ctx, cl, metrics, llv, failedStatusPhase, fmt.Sprintf("Unable to create Thin LV, err: %s", err.Error())) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVCreateFunc] unable to update the LVMLogicalVolume, name: %s", llv.Name)) + } + return + } + } + + log.Info(fmt.Sprintf("[reconcileLLVCreateFunc] successfully created Logical Volume for LVMLogicalVolume, name: %s", llv.Name)) + + actualSize, err := getLVActualSize(log, llv.Name) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVCreateFunc] unable to get actual size for LV %s", llv.Name)) + err = updateLVMLogicalVolumePhase(ctx, cl, metrics, llv, failedStatusPhase, fmt.Sprintf("Unable to get actual LV size, LV name: %s, err: %s", llv.Name, err.Error())) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVCreateFunc] unable to update the LVMLogicalVolume, name: %s", llv.Name)) + } + return + } + log.Trace(fmt.Sprintf("[reconcileLLVCreateFunc] the LV, name: %s has actual size: %d", llv.Name, actualSize.Value())) + + llv.Status.Phase = createdStatusPhase + llv.Status.ActualSize = actualSize + err = updateLVMLogicalVolume(ctx, metrics, cl, llv) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcileLLVCreateFunc] unable to update the LVMLogicalVolume, name: %s", llv.Name)) + return + } + + log.Info("[reconcileLLVCreateFunc] ends reconciliation") +} + +func getLVActualSize(log logger.Logger, lvName string) (resource.Quantity, error) { + lvs, cmd, _, err := utils.GetAllLVs() + log.Debug(fmt.Sprintf("[getActualSize] runs cmd: %s", cmd)) + if err != nil { + return resource.Quantity{}, err + } + + for _, lv := range lvs { + if lv.LVName == lvName { + return *resource.NewQuantity(lv.LVSize.Value(), resource.BinarySI), nil + } + } + + return resource.Quantity{}, nil +} + +func addLLVFinalizerIfNotExist(ctx context.Context, cl client.Client, metrics monitoring.Metrics, llv *v1alpha1.LvmLogicalVolume) (bool, error) { + if slices.Contains(llv.Finalizers, internal.SdsNodeConfiguratorFinalizer) { + return false, nil + } + + llv.Finalizers = append(llv.Finalizers, internal.SdsNodeConfiguratorFinalizer) + err := cl.Update(ctx, llv) + if err != nil { + return false, err + } + + return true, nil +} + +func shouldReconcileByCreateFunc(log logger.Logger, llv *v1alpha1.LvmLogicalVolume) (bool, error) { + if llv.Status.Phase == createdStatusPhase || + llv.Status.Phase == resizingStatusPhase { + return false, nil + } + + lvs, cmd, _, err := utils.GetAllLVs() + log.Debug(fmt.Sprintf("[shouldReconcileByCreateFunc] runs cmd: %s", cmd)) + if err != nil { + log.Error(err, "[shouldReconcileByCreateFunc] unable to GetAllLVs") + return false, err + } + + for _, lv := range lvs { + if lv.LVName == llv.Name { + return false, nil + } + } + + return true, nil +} + +func getFreeLVSpace(log logger.Logger, thinPoolName string) (resource.Quantity, error) { + lvs, cmd, _, err := utils.GetAllLVs() + log.Debug(fmt.Sprintf("[getFreeLVSpace] runs cmd: %s", cmd)) + if err != nil { + log.Error(err, "[getFreeVGSpace] unable to GetAllLVs") + return resource.Quantity{}, err + } + + for _, lv := range lvs { + if lv.LVName == thinPoolName { + used, err := getLVUsedSize(lv) + vlsSize := getVirtualLVSize(lv.LVName, lvs) + + if err != nil { + log.Error(err, "[getFreeLVSpace] unable to getLVUsedSize") + return resource.Quantity{}, err + } + + free := subtractQuantity(lv.LVSize, *used) + free = subtractQuantity(free, vlsSize) + + return free, nil + } + } + + return resource.Quantity{}, nil +} + +func getVirtualLVSize(thinPool string, lvs []internal.LVData) resource.Quantity { + sum := int64(0) + + for _, lv := range lvs { + if lv.PoolLv == thinPool { + sum += lv.LVSize.Value() + } + } + + return *resource.NewQuantity(sum, resource.BinarySI) +} + +func subtractQuantity(min, sub resource.Quantity) resource.Quantity { + val := min + val.Sub(sub) + return val +} + +func getFreeVGSpace(lvg *v1alpha1.LvmVolumeGroup) (resource.Quantity, error) { + total, err := resource.ParseQuantity(lvg.Status.VGSize) + if err != nil { + return resource.Quantity{}, err + } + + allocated, err := resource.ParseQuantity(lvg.Status.AllocatedSize) + if err != nil { + return resource.Quantity{}, err + } + + return subtractQuantity(total, allocated), nil +} + +func getLVMLogicalVolumeType(llv *v1alpha1.LvmLogicalVolume) deviceType { + if llv.Spec.Thin.PoolName == "" { + return Thick + } + + return Thin +} + +func belongsToNode(lvg *v1alpha1.LvmVolumeGroup, nodeName string) bool { + var belongs bool + for _, node := range lvg.Status.Nodes { + if node.Name == nodeName { + belongs = true + } + } + + return belongs +} + +func updateLVMLogicalVolumePhase(ctx context.Context, cl client.Client, metrics monitoring.Metrics, llv *v1alpha1.LvmLogicalVolume, phase, reason string) error { + llv.Status.Phase = phase + llv.Status.Reason = reason + + err := updateLVMLogicalVolume(ctx, metrics, cl, llv) + if err != nil { + return err + } + + return nil +} + +func updateLVMLogicalVolume(ctx context.Context, metrics monitoring.Metrics, cl client.Client, llv *v1alpha1.LvmLogicalVolume) error { + err := cl.Update(ctx, llv) + if err != nil { + return err + } + + return nil +} diff --git a/images/agent/pkg/controller/lvm_logical_volume_watcher_test.go b/images/agent/pkg/controller/lvm_logical_volume_watcher_test.go new file mode 100644 index 00000000..cdcc0792 --- /dev/null +++ b/images/agent/pkg/controller/lvm_logical_volume_watcher_test.go @@ -0,0 +1,18 @@ +package controller + +import ( + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/api/resource" + "testing" +) + +func TestLVMLogicaVolumeWatcher(t *testing.T) { + t.Run("subtractQuantity_returns_correct_value", func(t *testing.T) { + min := resource.NewQuantity(1000, resource.BinarySI) + sub := resource.NewQuantity(300, resource.BinarySI) + expected := resource.NewQuantity(700, resource.BinarySI) + + actual := subtractQuantity(*min, *sub) + assert.Equal(t, expected, &actual) + }) +} diff --git a/images/agent/pkg/controller/lvm_volume_group_discover.go b/images/agent/pkg/controller/lvm_volume_group_discover.go index ceee1dea..34ba046f 100644 --- a/images/agent/pkg/controller/lvm_volume_group_discover.go +++ b/images/agent/pkg/controller/lvm_volume_group_discover.go @@ -669,20 +669,20 @@ func getStatusThinPools(log logger.Logger, thinPools map[string][]internal.LVDat tps := make([]internal.LVMVGStatusThinPool, 0, len(filtered)) for _, lv := range filtered { - usedSize, err := getUsedSizeMiB(lv) //todo rename + usedSize, err := getLVUsedSize(lv) if err != nil { - log.Error(err, "[getStatusThinPools] unable to getUsedSizeMiB") + log.Error(err, "[getStatusThinPools] unable to getLVUsedSize") } tps = append(tps, internal.LVMVGStatusThinPool{ Name: lv.LVName, ActualSize: lv.LVSize, - UsedSize: usedSize, + UsedSize: usedSize.String(), }) } return tps } -func getUsedSizeMiB(lv internal.LVData) (string, error) { +func getLVUsedSize(lv internal.LVData) (*resource.Quantity, error) { var ( err error dataPercent float64 @@ -693,12 +693,13 @@ func getUsedSizeMiB(lv internal.LVData) (string, error) { } else { dataPercent, err = strconv.ParseFloat(lv.DataPercent, 64) if err != nil { - return "", err + return nil, err } } tmp := float64(lv.LVSize.Value()) * dataPercent - return utils.BytesToQuantity(int64(tmp)), nil + + return resource.NewQuantity(int64(tmp), resource.BinarySI), nil } func isThinPool(lv internal.LVData) bool { diff --git a/images/agent/pkg/controller/lvm_volume_group_discover_test.go b/images/agent/pkg/controller/lvm_volume_group_discover_test.go index 25a78344..58ce78c8 100644 --- a/images/agent/pkg/controller/lvm_volume_group_discover_test.go +++ b/images/agent/pkg/controller/lvm_volume_group_discover_test.go @@ -112,10 +112,10 @@ func TestLVMVolumeGroupDiscover(t *testing.T) { DataPercent: "50", } expected := "97656250Ki" - actual, err := getUsedSizeMiB(lv) + actual, err := getLVUsedSize(lv) if assert.NoError(t, err) { - assert.Equal(t, expected, actual) + assert.Equal(t, expected, actual.String()) } }) diff --git a/images/agent/pkg/controller/lvm_volume_group_watcher.go b/images/agent/pkg/controller/lvm_volume_group_watcher.go index 5ec29ec0..d45061cc 100644 --- a/images/agent/pkg/controller/lvm_volume_group_watcher.go +++ b/images/agent/pkg/controller/lvm_volume_group_watcher.go @@ -90,6 +90,7 @@ func RunLVMVolumeGroupWatcherController( log.Warning(fmt.Sprintf(`Added request, namespace: "%s" name: "%s", to requeue`, request.Namespace, request.Name)) } } + updateFunc := func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) { log.Info(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] update LVMVolumeGroupn, name: %s", e.ObjectNew.GetName())) @@ -404,12 +405,12 @@ func ReconcileLVMVG( log.Error(err, fmt.Sprintf("[ReconcileLVMVG] error CreateEventLVMVolumeGroup, resource name: %s", group.Name)) } start := time.Now() - command, err := utils.CreateLV(pool, group.Spec.ActualVGNameOnTheNode) + command, err := utils.CreateThinPool(pool, group.Spec.ActualVGNameOnTheNode) metrics.UtilsCommandsDuration(LVMVolumeGroupWatcherCtrlName, "lvcreate").Observe(metrics.GetEstimatedTimeInSeconds(start)) metrics.UtilsCommandsExecutionCount(LVMVolumeGroupWatcherCtrlName, "lvcreate").Inc() log.Debug(command) if err != nil { - log.Error(err, fmt.Sprintf("[ReconcileLVMVG] error CreateLV, thin pool: %s", pool.Name)) + log.Error(err, fmt.Sprintf("[ReconcileLVMVG] error CreateThinPool, thin pool: %s", pool.Name)) metrics.UtilsCommandsErrorsCount(LVMVolumeGroupWatcherCtrlName, "lvcreate").Inc() if err = updateLVMVolumeGroupHealthStatus(ctx, cl, metrics, group.Name, group.Namespace, err.Error(), NonOperational); err != nil { log.Error(err, fmt.Sprintf("[ReconcileLVMVG] unable to update LVMVolumeGroupStatus, resource name: %s", group.Name)) @@ -453,9 +454,9 @@ func ReconcileLVMVG( if err != nil { log.Error(err, fmt.Sprintf("[ReconcileLVMVG] error CreateEventLVMVolumeGroup, resource name: %s", group.Name)) } - newLVSizeStr := strconv.FormatInt(pool.Size.Value()/1024, 10) + //newLVSizeStr := strconv.FormatInt(pool.Size.Value()/1024, 10) start := time.Now() - cmd, err := utils.ExtendLV(newLVSizeStr+"K", group.Spec.ActualVGNameOnTheNode, pool.Name) + cmd, err := utils.ExtendLV(pool.Size.Value(), group.Spec.ActualVGNameOnTheNode, pool.Name) metrics.UtilsCommandsDuration(LVMVolumeGroupWatcherCtrlName, "lvextend").Observe(metrics.GetEstimatedTimeInSeconds(start)) metrics.UtilsCommandsExecutionCount(LVMVolumeGroupWatcherCtrlName, "lvextend").Inc() log.Debug(cmd) @@ -496,13 +497,13 @@ func ReconcileLVMVG( if len(group.Spec.ThinPools) != 0 { for _, v := range group.Spec.ThinPools { start := time.Now() - command, err := utils.CreateLV(v, group.Spec.ActualVGNameOnTheNode) + command, err := utils.CreateThinPool(v, group.Spec.ActualVGNameOnTheNode) metrics.UtilsCommandsDuration(LVMVolumeGroupWatcherCtrlName, "lvcreate").Observe(metrics.GetEstimatedTimeInSeconds(start)) metrics.UtilsCommandsExecutionCount(LVMVolumeGroupWatcherCtrlName, "lvcreate").Inc() log.Debug(command) if err != nil { metrics.UtilsCommandsErrorsCount(LVMVolumeGroupWatcherCtrlName, "lvcreate").Inc() - log.Error(err, fmt.Sprintf("[ReconcileLVMVG] error CreateLV, thin pool: %s", v.Name)) + log.Error(err, fmt.Sprintf("[ReconcileLVMVG] error CreateThinPool, thin pool: %s", v.Name)) if err = updateLVMVolumeGroupHealthStatus(ctx, cl, metrics, group.Name, group.Namespace, err.Error(), NonOperational); err != nil { log.Error(err, fmt.Sprintf("[ReconcileLVMVG] unable to update LVMVolumeGroupStatus, resource name: %s", group.Name)) } diff --git a/images/agent/pkg/controller/lvm_volume_group_watcher_func.go b/images/agent/pkg/controller/lvm_volume_group_watcher_func.go index bd8b38f1..92597f0d 100644 --- a/images/agent/pkg/controller/lvm_volume_group_watcher_func.go +++ b/images/agent/pkg/controller/lvm_volume_group_watcher_func.go @@ -135,13 +135,10 @@ func ValidateLVMGroup(ctx context.Context, cl client.Client, metrics monitoring. status.Health = NonOperational status.Phase = Failed status.Message = "selected block devices are from different nodes for local LVMVolumeGroup" - return false, &status, nil + return false, &status, errors.New("wrong block devices selected") } if membership == 0 { - status.Health = NonOperational - status.Phase = Failed - status.Message = "selected block devices not affiliated to current Watcher's node" return false, &status, nil } } diff --git a/images/agent/pkg/utils/commands.go b/images/agent/pkg/utils/commands.go index 47d0f991..839ba852 100644 --- a/images/agent/pkg/utils/commands.go +++ b/images/agent/pkg/utils/commands.go @@ -23,16 +23,26 @@ import ( "os/exec" "sds-node-configurator/api/v1alpha1" "sds-node-configurator/internal" + "strings" +) + +const ( + nsenter = "/usr/bin/nsenter" ) func GetBlockDevices() ([]internal.Device, string, error) { var outs bytes.Buffer - cmd := exec.Command("lsblk", "-J", "-lpfb", "-no", "name,MOUNTPOINT,PARTUUID,HOTPLUG,MODEL,SERIAL,SIZE,FSTYPE,TYPE,WWN,KNAME,PKNAME,ROTA") + args := []string{"lsblk", "-J", "-lpfb", "-no", "name,MOUNTPOINT,PARTUUID,HOTPLUG,MODEL,SERIAL,SIZE,FSTYPE,TYPE,WWN,KNAME,PKNAME,ROTA"} + extendedArgs := extendArgs(args) + cmd := exec.Command(nsenter, extendedArgs...) cmd.Stdout = &outs + var stderr bytes.Buffer + cmd.Stderr = &stderr + err := cmd.Run() if err != nil { - return nil, cmd.String(), fmt.Errorf("unable to GetBlockDevices, err: %w", err) + return nil, cmd.String(), fmt.Errorf("unable to run cmd: %s, err: %w, stderr: %s", cmd.String(), err, stderr.String()) } devices, err := unmarshalDevices(outs.Bytes()) @@ -45,12 +55,14 @@ func GetBlockDevices() ([]internal.Device, string, error) { func GetAllVGs() (data []internal.VGData, command string, stdErr bytes.Buffer, err error) { var outs bytes.Buffer - cmd := exec.Command("vgs", "-o", "+uuid,tags,shared", "--units", "B", "--nosuffix", "--reportformat", "json") + args := []string{"vgs", "-o", "+uuid,tags,shared", "--units", "B", "--nosuffix", "--reportformat", "json"} + extendedArgs := extendArgs(args) + cmd := exec.Command(nsenter, extendedArgs...) cmd.Stdout = &outs cmd.Stderr = &stdErr - if err := cmd.Run(); err != nil { - return nil, cmd.String(), stdErr, fmt.Errorf("unable to GetAllVGs, err: %w", err) + if err = cmd.Run(); err != nil { + return nil, cmd.String(), stdErr, fmt.Errorf("unable to run cmd: %s, err: %w, stderr: %s", cmd.String(), err, stdErr.String()) } data, err = unmarshalVGs(outs.Bytes()) @@ -63,12 +75,14 @@ func GetAllVGs() (data []internal.VGData, command string, stdErr bytes.Buffer, e func GetAllLVs() (data []internal.LVData, command string, stdErr bytes.Buffer, err error) { var outs bytes.Buffer - cmd := exec.Command("lvs", "-o", "+vg_uuid,tags", "--units", "B", "--nosuffix", "--reportformat", "json") + args := []string{"lvs", "-o", "+vg_uuid,tags", "--units", "B", "--nosuffix", "--reportformat", "json"} + extendedArgs := extendArgs(args) + cmd := exec.Command(nsenter, extendedArgs...) cmd.Stdout = &outs cmd.Stderr = &stdErr - if err := cmd.Run(); err != nil { - return nil, cmd.String(), stdErr, fmt.Errorf("unable to GetAllLVs, err: %w", err) + if err = cmd.Run(); err != nil { + return nil, cmd.String(), stdErr, fmt.Errorf("unable to run cmd: %s, err: %w, stderr: %s", cmd.String(), err, stdErr.String()) } lvs, err := unmarshalLVs(outs.Bytes()) @@ -81,12 +95,14 @@ func GetAllLVs() (data []internal.LVData, command string, stdErr bytes.Buffer, e func GetAllPVs() (data []internal.PVData, command string, stdErr bytes.Buffer, err error) { var outs bytes.Buffer - cmd := exec.Command("pvs", "-o", "+pv_used,pv_uuid,vg_tags,vg_uuid", "--units", "B", "--nosuffix", "--reportformat", "json") + args := []string{"pvs", "-o", "+pv_used,pv_uuid,vg_tags,vg_uuid", "--units", "B", "--nosuffix", "--reportformat", "json"} + extendedArgs := extendArgs(args) + cmd := exec.Command(nsenter, extendedArgs...) cmd.Stdout = &outs cmd.Stderr = &stdErr - if err := cmd.Run(); err != nil { - return nil, cmd.String(), stdErr, fmt.Errorf("unable to GetAllPVs, err: %w", err) + if err = cmd.Run(); err != nil { + return nil, cmd.String(), stdErr, fmt.Errorf("unable to run cmd: %s, err: %w, stderr: %s", cmd.String(), err, stdErr.String()) } data, err = unmarshalPVs(outs.Bytes()) @@ -97,220 +113,227 @@ func GetAllPVs() (data []internal.PVData, command string, stdErr bytes.Buffer, e return data, cmd.String(), stdErr, nil } -func GetSinglePV(pVname string) (*internal.PVData, string, error) { - var outs bytes.Buffer - cmd := exec.Command("pvs", pVname, "-o", "+pv_used,pv_uuid,vg_tags,vg_uuid", "--reportformat", "json") - cmd.Stdout = &outs - - if err := cmd.Run(); err != nil { - return nil, cmd.String(), fmt.Errorf("unable to GetSinglePV, err: %w", err) - } - - pvs, err := unmarshalPVs(outs.Bytes()) - if err != nil { - return nil, cmd.String(), fmt.Errorf("unable to GetSinglePV, err: %w", err) - } - - singlePv := pvs[0] - - if len(pvs) != 1 || - singlePv.PVName != pVname { - return nil, cmd.String(), fmt.Errorf(`unable to GetSinglePV by name: "%s"`, pVname) - } - - return &singlePv, cmd.String(), nil -} - func CreatePV(path string) (string, error) { - cmd := exec.Command("pvcreate", path) + args := []string{"pvcreate", path} + extendedArgs := extendArgs(args) + cmd := exec.Command(nsenter, extendedArgs...) var stderr bytes.Buffer cmd.Stderr = &stderr if err := cmd.Run(); err != nil { - return cmd.String(), fmt.Errorf("unable to CreatePV, err: %w , stderror = %s", err, stderr.String()) + return cmd.String(), fmt.Errorf("unable to run cmd: %s, err: %w, stderror = %s", cmd.String(), err, stderr.String()) } return cmd.String(), nil } func CreateVGLocal(vgName, lvmName string, pvNames []string) (string, error) { - tmpStr := fmt.Sprintf("storage.deckhouse.io/lvmVolumeGroupName=%s", lvmName) - var arg []string - arg = append(arg, vgName) - arg = append(arg, pvNames...) - arg = append(arg, "--addtag", "storage.deckhouse.io/enabled=true", "--addtag", tmpStr) + args := []string{"vgcreate", vgName, strings.Join(pvNames, " "), "--addtag", "storage.deckhouse.io/enabled=true", "--addtag", tmpStr} - cmd := exec.Command("vgcreate", arg...) + extendedArgs := extendArgs(args) + cmd := exec.Command(nsenter, extendedArgs...) var stderr bytes.Buffer cmd.Stderr = &stderr if err := cmd.Run(); err != nil { - return cmd.String(), fmt.Errorf("unable to CreateVGLocal, err: %w , stderror = %s", err, stderr.String()) + return cmd.String(), fmt.Errorf("unable to run cmd: %s, err: %w, stderror: %s", cmd.String(), err, stderr.String()) } return cmd.String(), nil } func CreateVGShared(vgName, lvmName string, pvNames []string) (string, error) { - var arg []string - arg = append(arg, "--shared") - arg = append(arg, vgName) - arg = append(arg, pvNames...) - arg = append(arg, "--addtag", - "storage.deckhouse.io/enabled=true", - "--addtag", fmt.Sprintf("storage.deckhouse.io/lvmVolumeGroupName=%s", lvmName)) + args := []string{"vgcreate", "--shared", vgName, strings.Join(pvNames, " "), "--addtag", "storage.deckhouse.io/enabled=true", "--addtag", fmt.Sprintf("storage.deckhouse.io/lvmVolumeGroupName=%s", lvmName)} + extendedArgs := extendArgs(args) + cmd := exec.Command(nsenter, extendedArgs...) - cmd := exec.Command("vgcreate", arg...) + var stderr bytes.Buffer + cmd.Stderr = &stderr if err := cmd.Run(); err != nil { - return cmd.String(), fmt.Errorf("unable to CreateVGShared, err: %w", err) + return cmd.String(), fmt.Errorf("unable to run cmd: %s, err: %w, stderr: %s", cmd.String(), err, stderr.String()) } return cmd.String(), nil } -func CreateLV(thinPool v1alpha1.SpecThinPool, VGName string) (string, error) { - - cmd := exec.Command( - "lvcreate", "-L", thinPool.Size.String(), "-T", fmt.Sprintf("%s/%s", VGName, thinPool.Name)) +func CreateThinPool(thinPool v1alpha1.SpecThinPool, VGName string) (string, error) { + args := []string{"lvcreate", "-L", thinPool.Size.String(), "-T", fmt.Sprintf("%s/%s", VGName, thinPool.Name)} + extendedArgs := extendArgs(args) + cmd := exec.Command(nsenter, extendedArgs...) var stderr bytes.Buffer cmd.Stderr = &stderr if err := cmd.Run(); err != nil { - return cmd.String(), fmt.Errorf("unable to CreateLV, err: %w tderr = %s", err, stderr.String()) + return cmd.String(), fmt.Errorf("unable to run cmd: %s, err: %w, stderr: %s", cmd.String(), err, stderr.String()) } return cmd.String(), nil } -func ExtendVG(vgName string, paths []string) (string, error) { - var arg []string - arg = append(arg, vgName) - arg = append(arg, paths...) - cmd := exec.Command("vgextend", arg...) +func CreateThinLogicalVolume(vgName, tpName, lvName string, size int64) (string, error) { + args := []string{"lvcreate", "-T", fmt.Sprintf("%s/%s", vgName, tpName), "-n", lvName, "-V", fmt.Sprintf("%dk", size/1024), "-W", "y", "-y"} + extendedArgs := extendArgs(args) + cmd := exec.Command(nsenter, extendedArgs...) var stderr bytes.Buffer cmd.Stderr = &stderr + var stdout bytes.Buffer + cmd.Stdout = &stdout - if err := cmd.Run(); err != nil { - return cmd.String(), fmt.Errorf("unable to ExtendVG, err: %w stderr = %s", err, stderr.String()) + err := cmd.Run() + if err != nil { + return cmd.String(), fmt.Errorf("unable to run cmd: %s, err: %w, stderr: %s", cmd.String(), err, stderr.String()) } return cmd.String(), nil } -func ExtendLV(size, vgName, lvName string) (string, error) { - cmd := exec.Command("lvextend", "-L", size, fmt.Sprintf("/dev/%s/%s", vgName, lvName)) +func CreateThickLogicalVolume(vgName, lvName string, size int64) (string, error) { + args := []string{"lvcreate", "-n", fmt.Sprintf("%s/%s", vgName, lvName), "-L", fmt.Sprintf("%dk", size/1024), "-W", "y", "-y"} + extendedArgs := extendArgs(args) + cmd := exec.Command(nsenter, extendedArgs...) var stderr bytes.Buffer cmd.Stderr = &stderr if err := cmd.Run(); err != nil { - return cmd.String(), fmt.Errorf("unable to ExtendLV, err: %w stderr = %s", err, stderr.String()) + return cmd.String(), fmt.Errorf("unable to run cmd: %s, err: %w, stderr: %s", cmd.String(), err, stderr.String()) } return cmd.String(), nil } -func ResizePV(pvName string) (string, error) { - cmd := exec.Command("pvresize", pvName) +func ExtendVG(vgName string, paths []string) (string, error) { + args := []string{"vgextend", vgName, strings.Join(paths, " ")} + extendedArgs := extendArgs(args) + cmd := exec.Command(nsenter, extendedArgs...) + + var stderr bytes.Buffer + cmd.Stderr = &stderr + if err := cmd.Run(); err != nil { - return cmd.String(), fmt.Errorf("unable to ResizePV, err: %w", err) + return cmd.String(), fmt.Errorf("unable to run cmd: %s, err: %w, stderr: %s", cmd.String(), err, stderr.String()) } return cmd.String(), nil } -func RemovePVFromVG(pvName, vgName string) (string, error) { - pvs, cmdStr, _, err := GetAllPVs() - if err != nil { - return cmdStr, fmt.Errorf("unable to RemovePVFromVG, err: %w", err) - } - - if len(pvs) == 1 { - pv := pvs[0] - - if pv.PVName != pvName || - pv.VGName != vgName { - return cmdStr, - fmt.Errorf( - `unable to RemovePVFromVG, err: unexpected pv gotten, no pv with pvName: "%s", vgName: "%s"`, - pvName, vgName) - } +func ExtendLV(size int64, vgName, lvName string) (string, error) { + args := []string{"lvextend", "-L", fmt.Sprintf("%dk", size/1024), fmt.Sprintf("/dev/%s/%s", vgName, lvName)} + extendedArgs := extendArgs(args) + cmd := exec.Command(nsenter, extendedArgs...) - if pv.PVUsed != "0 " { - return cmdStr, fmt.Errorf("unable to RemovePVFromVG, err: single PVData has data") - } + var stderr bytes.Buffer + cmd.Stderr = &stderr - cmdStr, err = RemoveVG(pv.VGName) - return cmdStr, err + if err := cmd.Run(); err != nil { + return cmd.String(), fmt.Errorf("unable to run cmd: %s, err: %w, stderr: %s", cmd.String(), err, stderr.String()) } - if cmdStr, err = MovePV(pvName); err != nil { - return cmdStr, err - } + return cmd.String(), nil +} - clr, cmdStr, err := CheckPVHasNoData(pvName) - if err != nil { - return cmdStr, err - } +func ResizePV(pvName string) (string, error) { + args := []string{"pvresize", pvName} + extendedArgs := extendArgs(args) + cmd := exec.Command(nsenter, extendedArgs...) - if !clr { - return cmdStr, fmt.Errorf(`unable to RemovePVFromVG, err: can't move LV segments from PVData, pv name: "%s"`, pvName) - } + var stderr bytes.Buffer + cmd.Stderr = &stderr - cmd := exec.Command("vgreduce", vgName, pvName) - if err = cmd.Run(); err != nil { - return cmd.String(), fmt.Errorf(`unable to RemovePVFromVG with vgName: "%s", pvName: "%s", err: %w`, - vgName, pvName, err) + if err := cmd.Run(); err != nil { + return cmd.String(), fmt.Errorf("unable to run cmd: %s, err: %w, stderr: %s", cmd.String(), err, stderr.String()) } return cmd.String(), nil } -func CheckPVHasNoData(pvName string) (bool, string, error) { - pv, cmdStr, err := GetSinglePV(pvName) - if err != nil { - return true, cmdStr, err +func RemoveVG(vgName string) (string, error) { + args := []string{"vgremove", vgName} + extendedArgs := extendArgs(args) + cmd := exec.Command(nsenter, extendedArgs...) + + var stderr bytes.Buffer + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + return cmd.String(), fmt.Errorf("unable to run cmd: %s, err: %w, stderr: %s", cmd.String(), err, stderr.String()) } - return pv.PVUsed == "0 ", cmdStr, nil + return cmd.String(), nil } -func MovePV(pvName string) (string, error) { - cmd := exec.Command("pvmove", pvName) +func RemovePV(pvNames []string) (string, error) { + args := []string{"pvremove", strings.Join(pvNames, " ")} + extendedArgs := extendArgs(args) + cmd := exec.Command(nsenter, extendedArgs...) + + var stderr bytes.Buffer + cmd.Stderr = &stderr if err := cmd.Run(); err != nil { - return cmd.String(), fmt.Errorf("unable to MovePV, err: %w", err) + return cmd.String(), fmt.Errorf("unable to run cmd: %s, err: %w, stderr, %s", cmd.String(), err, stderr.String()) } - return cmd.String(), nil } -func RemoveVG(vgName string) (string, error) { - cmd := exec.Command("vgremove", vgName) +func RemoveLV(vgName, lvName string) (string, error) { + args := []string{"lvremove", fmt.Sprintf("/dev/%s/%s", vgName, lvName), "-y"} + extendedArgs := extendArgs(args) + cmd := exec.Command(nsenter, extendedArgs...) var stderr bytes.Buffer cmd.Stderr = &stderr if err := cmd.Run(); err != nil { - return cmd.String(), fmt.Errorf("unable to RemoveVG, err: %w stderr = %s", err, stderr.String()) + return cmd.String(), fmt.Errorf("unable to run cmd: %s, err: %w, stderr: %s", cmd.String(), err, stderr.String()) } + return cmd.String(), nil +} + +func VGChangeAddTag(vGName, tag string) (string, error) { + var outs, stdErr bytes.Buffer + args := []string{"vgchange", vGName, "--addtag", tag} + extendedArgs := extendArgs(args) + cmd := exec.Command(nsenter, extendedArgs...) + cmd.Stdout = &outs + cmd.Stderr = &stdErr + if err := cmd.Run(); err != nil { + return cmd.String(), fmt.Errorf("unable to run cmd: %s, err: %w, stdErr: %s", cmd.String(), err, stdErr.String()) + } return cmd.String(), nil } -func RemovePV(pvNames []string) (string, error) { - cmd := exec.Command("pvremove", pvNames...) +func VGChangeDelTag(vGName, tag string) (string, error) { + var outs, stdErr bytes.Buffer + args := []string{"vgchange", vGName, "--deltag", tag} + extendedArgs := extendArgs(args) + cmd := exec.Command(nsenter, extendedArgs...) + cmd.Stdout = &outs + cmd.Stderr = &stdErr - var stderr bytes.Buffer - cmd.Stderr = &stderr + if err := cmd.Run(); err != nil { + return cmd.String(), fmt.Errorf("unable to run cmd: %s, err: %w, stdErr: %s", cmd.String(), err, stdErr.String()) + } + return cmd.String(), nil +} + +func LVChangeDelTag(lv internal.LVData, tag string) (string, error) { + tmpStr := fmt.Sprintf("/dev/%s/%s", lv.VGName, lv.LVName) + var outs, stdErr bytes.Buffer + args := []string{"lvchange", tmpStr, "--deltag", tag} + extendedArgs := extendArgs(args) + cmd := exec.Command(nsenter, extendedArgs...) + cmd.Stdout = &outs + cmd.Stderr = &stdErr if err := cmd.Run(); err != nil { - return cmd.String(), fmt.Errorf("unable to RemovePV, err: %w stderr = %s", err, stderr.String()) + return cmd.String(), fmt.Errorf("unable to run cmd: %s, err: %w, stdErr: %s", cmd.String(), err, stdErr.String()) } return cmd.String(), nil } @@ -378,39 +401,7 @@ func unmarshalLVs(out []byte) ([]internal.LVData, error) { return lvs, nil } -func VGChangeAddTag(vGName, tag string) (string, error) { - var outs, stdErr bytes.Buffer - cmd := exec.Command("vgchange", vGName, "--addtag", tag) - cmd.Stdout = &outs - cmd.Stderr = &stdErr - - if err := cmd.Run(); err != nil { - return cmd.String(), fmt.Errorf("unable to VGChangeAddTag, err: %w , stdErr: %s", err, stdErr.String()) - } - return cmd.String(), nil -} - -func VGChangeDelTag(vGName, tag string) (string, error) { - var outs, stdErr bytes.Buffer - cmd := exec.Command("vgchange", vGName, "--deltag", tag) - cmd.Stdout = &outs - cmd.Stderr = &stdErr - - if err := cmd.Run(); err != nil { - return cmd.String(), fmt.Errorf("unable to VGChangeDelTag, err: %w , stdErr: %s", err, stdErr.String()) - } - return cmd.String(), nil -} - -func LVChangeDelTag(lv internal.LVData, tag string) (string, error) { - tmpStr := fmt.Sprintf("/dev/%s/%s", lv.VGName, lv.LVName) - var outs, stdErr bytes.Buffer - cmd := exec.Command("lvchange", tmpStr, "--deltag", tag) - cmd.Stdout = &outs - cmd.Stderr = &stdErr - - if err := cmd.Run(); err != nil { - return cmd.String(), fmt.Errorf("unable to LVChangeDelTag, err: %w , stdErr: %s", err, stdErr.String()) - } - return cmd.String(), nil +func extendArgs(args []string) []string { + nsenterArgs := []string{"-t", "1", "-m", "-u", "-i", "-n", "-p"} + return append(nsenterArgs, args...) } diff --git a/templates/agent/daemonset.yaml b/templates/agent/daemonset.yaml index db681713..67b8dd02 100644 --- a/templates/agent/daemonset.yaml +++ b/templates/agent/daemonset.yaml @@ -23,6 +23,7 @@ spec: imagePullSecrets: - name: {{ .Chart.Name }}-module-registry serviceAccountName: sds-node-configurator + hostPID: true containers: - name: sds-node-configurator-agent image: {{ include "helm_lib_module_image" (list . "agent") }} @@ -49,54 +50,4 @@ spec: {{- else if eq .Values.sdsNodeConfigurator.logLevel "TRACE" }} value: "4" {{- end }} - volumeMounts: - - mountPath: /dev/ - name: host-device-dir - - mountPath: /lib/modules - name: host-modules-dir - mountPropagation: Bidirectional - - mountPath: /sys/ - name: host-sys-dir - - mountPath: /run/udev/ - name: host-run-udev-dir - - mountPath: /run/lvm/ - name: host-run-lvm-dir - - mountPath: /run/dmeventd-client - name: host-run-dmeventd-client - - mountPath: /run/dmeventd-server - name: host-run-dmeventd-server - - mountPath: /host-root/etc/machine-id - name: host-machine-id - readOnly: true - volumes: - - hostPath: - path: /dev/ - type: "" - name: host-device-dir - - hostPath: - path: /sys/ - type: Directory - name: host-sys-dir - - hostPath: - path: /run/udev/ - type: Directory - name: host-run-udev-dir - - hostPath: - path: /run/lvm - type: Directory - name: host-run-lvm-dir - - hostPath: - path: /run/dmeventd-client - name: host-run-dmeventd-client - - hostPath: - path: /run/dmeventd-server - name: host-run-dmeventd-server - - hostPath: - path: /etc/machine-id - type: File - name: host-machine-id - - hostPath: - path: /lib/modules/ - type: DirectoryOrCreate - name: host-modules-dir {{- end }}