Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Add parallel reconciliation to lvm_logical_volume_watcher. Add VG size validation to lvm_volume_group_watcher #40

Merged
merged 21 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crds/lvmlogicalvolume.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ spec:
x-kubernetes-validations:
- rule: self == oldSelf
message: Value is immutable.
minLength: 1
pattern: '^[a-z0-9]([a-z0-9-.]{0,251}[a-z0-9])?$'
type:
type: string
x-kubernetes-validations:
Expand Down
2 changes: 2 additions & 0 deletions images/agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Options struct {
MetricsPort string
BlockDeviceScanInterval time.Duration
VolumeGroupScanInterval time.Duration
LLVRequeInterval time.Duration
}

func NewConfig() (*Options, error) {
Expand Down Expand Up @@ -71,6 +72,7 @@ func NewConfig() (*Options, error) {

opts.BlockDeviceScanInterval = 5
opts.VolumeGroupScanInterval = 5
opts.LLVRequeInterval = 5

return &opts, nil
}
Expand Down
201 changes: 106 additions & 95 deletions images/agent/pkg/controller/lvm_logical_volume_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"reflect"
"strings"

"sds-node-configurator/api/v1alpha1"
"sds-node-configurator/config"
Expand All @@ -15,7 +16,9 @@ import (
"time"

"github.com/google/go-cmp/cmp"
k8serr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/strings/slices"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -58,48 +61,37 @@ func RunLVMLogicalVolumeWatcherController(

c, err := controller.New(lvmLogicalVolumeWatcherCtrlName, mgr, controller.Options{
Reconciler: reconcile.Func(func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
return reconcile.Result{}, nil
log.Debug(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] Reconciler starts reconciliation of LLV: %s", request.Name))
shouldRequeue, err := ReconcileLVMLogicalVolume(ctx, cl, log, metrics, cfg, request)
if err != nil {
log.Error(err, fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] an error occurred while reconciling LLV: %s", request.Name))
}
if shouldRequeue {
log.Info(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] requeue reconciliation of LLV: %s after %s", request.Name, cfg.LLVRequeInterval))
return reconcile.Result{RequeueAfter: cfg.LLVRequeInterval}, nil
}
log.Debug(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] ends reconciliation of LLV: %s without requeue", request.Name))
return reconcile.Result{Requeue: false}, nil
}),
MaxConcurrentReconciles: 10,
})

if err != nil {
log.Error(err, "[RunLVMLogicalVolumeWatcherController] unable to create controller")
return nil, err
}

err = c.Watch(source.Kind(cache, &v1alpha1.LVMLogicalVolume{}), handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
log.Info(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] CreateFunc starts reconciliation of LLV: %s", e.Object.GetName()))

llv, ok := e.Object.(*v1alpha1.LVMLogicalVolume)
if !ok {
err = errors.New("unable to cast event object to a given type")
log.Error(err, "[CreateFunc] an error occurred while handling create event")
return
}

lvg, err := getLVMVolumeGroup(ctx, cl, metrics, "", llv.Spec.LvmVolumeGroupName)
if err != nil {
log.Error(err, "[CreateFunc] unable to get a LVMVolumeGroup")
err = updateLVMLogicalVolumePhase(ctx, cl, log, metrics, llv, failedStatusPhase, fmt.Sprintf("Unable to get selected LVMVolumeGroup, err: %s", err.Error()))
if err != nil {
log.Error(err, "[CreateFunc] unable to update a LVMLogicalVolume Phase")
}
return
}
log.Debug(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] CreateFunc starts reconciliation of LLV: %s", e.Object.GetName()))
request := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: e.Object.GetNamespace(), Name: e.Object.GetName()}}

if !belongsToNode(lvg, cfg.NodeName) {
log.Debug(fmt.Sprintf("[CreateFunc] the LVMVolumeGroup %s does not belongs to the current node: %s. Reconciliation stopped", lvg.Name, cfg.NodeName))
return
}
log.Debug(fmt.Sprintf("[CreateFunc] the LVMVolumeGroup %s belongs to the current node: %s", lvg.Name, cfg.NodeName))

runEventReconcile(ctx, cl, log, metrics, llv, lvg)

log.Info(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] CreateFunc ends reconciliation of LLV: %s", llv.Name))
q.Add(request)
log.Debug(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] CreateFunc ends reconciliation of LLV: %s", e.Object.GetName()))
},

UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
log.Info(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] UpdateFunc starts reconciliation of LLV: %s", e.ObjectNew.GetName()))
log.Debug(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] UpdateFunc starts reconciliation of LLV: %s", e.ObjectNew.GetName()))

oldLLV, ok := e.ObjectOld.(*v1alpha1.LVMLogicalVolume)
if !ok {
Expand All @@ -125,30 +117,15 @@ func RunLVMLogicalVolumeWatcherController(
}

if reflect.DeepEqual(oldLLV.Spec, newLLV.Spec) && newLLV.DeletionTimestamp == nil {
log.Info(fmt.Sprintf("[UpdateFunc] the LVMLogicalVolume %s has not been changed", newLLV.Name))
log.Info(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] UpdateFunc ends reconciliation of LLV: %s", newLLV.Name))
return
}

lvg, err := getLVMVolumeGroup(ctx, cl, metrics, "", newLLV.Spec.LvmVolumeGroupName)
if err != nil {
log.Error(err, fmt.Sprintf("[UpdateFunc] unable to get the LVMVolumeGroup, name: %s", newLLV.Spec.LvmVolumeGroupName))
err = updateLVMLogicalVolumePhase(ctx, cl, log, metrics, newLLV, failedStatusPhase, fmt.Sprintf("Unable to get selected LVMVolumeGroup, err: %s", err.Error()))
if err != nil {
log.Error(err, "[UpdateFunc] unable to updateLVMLogicalVolumePhase")
}
return
}

if !belongsToNode(lvg, cfg.NodeName) {
log.Debug(fmt.Sprintf("[UpdateFunc] the LVMVolumeGroup %s does not belongs to the current node: %s. Reconciliation stopped", lvg.Name, cfg.NodeName))
log.Debug(fmt.Sprintf("[UpdateFunc] the LVMLogicalVolume %s has not been changed", newLLV.Name))
log.Debug(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] UpdateFunc ends reconciliation of LLV: %s", newLLV.Name))
return
}
log.Debug(fmt.Sprintf("[UpdateFunc] the LVMVolumeGroup %s belongs to the current node: %s", lvg.Name, cfg.NodeName))

runEventReconcile(ctx, cl, log, metrics, newLLV, lvg)
log.Debug(fmt.Sprintf("[UpdateFunc] the LVMLogicalVolume %s has been changed. Add to the queue", newLLV.Name))

log.Info(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] UpdateFunc ends reconciliation of LLV: %s", newLLV.Name))
request := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: newLLV.Namespace, Name: newLLV.Name}}
q.Add(request)
log.Debug(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] UpdateFunc ends reconciliation of LLV: %s", newLLV.Name))
},
})
if err != nil {
Expand All @@ -159,40 +136,6 @@ func RunLVMLogicalVolumeWatcherController(
return c, err
}

func runEventReconcile(ctx context.Context, cl client.Client, log logger.Logger, metrics monitoring.Metrics, llv *v1alpha1.LVMLogicalVolume, lvg *v1alpha1.LvmVolumeGroup) {
log.Trace("[runEventReconcile] starts reconciliation. Identify reconcile func. vgName: "+lvg.Spec.ActualVGNameOnTheNode+", llv:", llv.Name, llv)
recType, err := identifyReconcileFunc(log, lvg.Spec.ActualVGNameOnTheNode, llv)
if err != nil {
log.Error(err, "[runEventReconcile] an error occurs while identify reconcile func")
err = updateLVMLogicalVolumePhase(ctx, cl, log, metrics, llv, failedStatusPhase, fmt.Sprintf("An error occurred while identifying the reconcile func, err: %s", err.Error()))
if err != nil {
log.Error(err, "[runEventReconcile] unable to update a LVMLogicalVolume Phase")
}
return
}

switch recType {
case CreateReconcile:
log.Debug(fmt.Sprintf("[runEventReconcile] CreateReconcile starts reconciliation for the LVMLogicalVolume: %s", llv.Name))
reconcileLLVCreateFunc(ctx, cl, log, metrics, llv, lvg)
case UpdateReconcile:
log.Debug(fmt.Sprintf("[runEventReconcile] UpdateReconcile starts reconciliation for the LVMLogicalVolume: %s", llv.Name))
reconcileLLVUpdateFunc(ctx, cl, log, metrics, llv, lvg)
case DeleteReconcile:
log.Debug(fmt.Sprintf("[runEventReconcile] DeleteReconcile starts reconciliation for the LVMLogicalVolume: %s", llv.Name))
reconcileLLVDeleteFunc(ctx, cl, log, metrics, llv, lvg)
default:
log.Debug(fmt.Sprintf("[runEventReconcile] the LVMLogicalVolume %s should not be reconciled", llv.Name))
if llv.Status.Phase != createdStatusPhase {
log.Warning(fmt.Sprintf("[runEventReconcile] the LVMLogicalVolume %s should not be reconciled but has an unexpected phase: %s. Setting the phase to %s", llv.Name, llv.Status.Phase, createdStatusPhase))
err = updateLVMLogicalVolumePhase(ctx, cl, log, metrics, llv, createdStatusPhase, "")
if err != nil {
log.Error(err, fmt.Sprintf("[runEventReconcile] unable to update the LVMLogicalVolume %s", llv.Name))
}
}
}
}

func identifyReconcileFunc(log logger.Logger, vgName string, llv *v1alpha1.LVMLogicalVolume) (reconcileType, error) {
should, err := shouldReconcileByCreateFunc(log, vgName, llv)
if err != nil {
Expand Down Expand Up @@ -649,8 +592,8 @@ func shouldReconcileByCreateFunc(log logger.Logger, vgName string, llv *v1alpha1
}

lv, err := FindLV(log, vgName, llv.Spec.ActualLVNameOnTheNode)
if err != nil {
return false, err
if err == nil && lv != nil && lv.LVName == llv.Spec.ActualLVNameOnTheNode {
return false, nil
}

if lv != nil {
Expand Down Expand Up @@ -761,20 +704,88 @@ func updateLVMLogicalVolume(ctx context.Context, metrics monitoring.Metrics, cl
}

func FindLV(log logger.Logger, vgName, lvName string) (*internal.LVData, error) {
lvs, cmd, _, err := utils.GetAllLVs()
log.Debug(fmt.Sprintf("[FindLV] Try to find LV: %s in VG: %s", lvName, vgName))
lv, cmd, _, err := utils.GetLV(vgName, lvName)

log.Debug(fmt.Sprintf("[FindLV] runs cmd: %s", cmd))
if err != nil {
log.Error(err, "[shouldReconcileByCreateFunc] unable to GetAllLVs")
return &internal.LVData{}, err
if strings.Contains(err.Error(), "Failed to find logical volume") {
log.Debug("[FindLV] LV not found")
return nil, nil
}
log.Error(err, "[shouldReconcileByCreateFunc] unable to GetLV")
return nil, err
}
return &lv, nil

log.Debug(fmt.Sprintf("[FindLV] Try to find LV: %s in VG: %s", lvName, vgName))
for _, lv := range lvs {
log.Trace(fmt.Sprintf("[FindLV] processing LV: %s, VG: %s", lv.LVName, lv.VGName))
if lv.VGName == vgName && lv.LVName == lvName {
return &lv, nil
}

func ReconcileLVMLogicalVolume(ctx context.Context, cl client.Client, log logger.Logger, metrics monitoring.Metrics, cfg config.Options, request reconcile.Request) (bool, error) {
llv := &v1alpha1.LVMLogicalVolume{}
err := cl.Get(ctx, request.NamespacedName, llv)
if err != nil {
if k8serr.IsNotFound(err) {
log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolume] LVMLogicalVolume %s not found. Object has probably been deleted.", request.NamespacedName))
return false, nil
}
return true, fmt.Errorf("[ReconcileLVMLogicalVolume] unable to get LVMLogicalVolume: %w", err)
}

lvg, err := getLVMVolumeGroup(ctx, cl, metrics, "", llv.Spec.LvmVolumeGroupName)
if err != nil {
if k8serr.IsNotFound(err) {
log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolume] LVMVolumeGroup %s not found for LVMLogicalVolume %s", llv.Spec.LvmVolumeGroupName, llv.Name))
err = updateLVMLogicalVolumePhase(ctx, cl, log, metrics, llv, failedStatusPhase, fmt.Sprintf("Unable to get selected LVMVolumeGroup %s, err: %s", llv.Spec.LvmVolumeGroupName, err.Error()))
if err != nil {
return true, fmt.Errorf("[ReconcileLVMLogicalVolume] unable to update the LVMLogicalVolume %s status.phase to %s: %w", llv.Name, failedStatusPhase, err)
}
return true, nil
}
reconcileErr := fmt.Errorf("[ReconcileLVMLogicalVolume] unable to get LVMVolumeGroup %s: %w", llv.Spec.LvmVolumeGroupName, err)
updateErr := updateLVMLogicalVolumePhase(ctx, cl, log, metrics, llv, failedStatusPhase, fmt.Sprintf("Unable to get selected LVMVolumeGroup, err: %s", err.Error()))
if updateErr != nil {
return true, fmt.Errorf("%s. Also failed to update LVMLogicalVolume %s status.phase to %s: %w", reconcileErr, llv.Name, failedStatusPhase, updateErr)
}
return false, reconcileErr
}

if !belongsToNode(lvg, cfg.NodeName) {
log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolume] the LVMVolumeGroup %s does not belongs to the current node: %s. Reconciliation stopped", lvg.Name, cfg.NodeName))
return false, nil
}

log.Info(fmt.Sprintf("[ReconcileLVMLogicalVolume] the LVMVolumeGroup %s belongs to the current node: %s. Reconciliation continues", lvg.Name, cfg.NodeName))
log.Debug("[ReconcileLVMLogicalVolume] Identify reconcile func. vgName: "+lvg.Spec.ActualVGNameOnTheNode+", llv:", llv.Name, llv)

recType, err := identifyReconcileFunc(log, lvg.Spec.ActualVGNameOnTheNode, llv)
if err != nil {
reconcileErr := fmt.Errorf("[ReconcileLVMLogicalVolume] unable to identify the reconcile func: %w", err)
updateErr := updateLVMLogicalVolumePhase(ctx, cl, log, metrics, llv, failedStatusPhase, fmt.Sprintf("An error occurred while identifying the reconcile func, err: %s", err.Error()))
if updateErr != nil {
return true, fmt.Errorf("%s. Also failed to update LVMLogicalVolume %s status.phase to %s: %w", reconcileErr, llv.Name, failedStatusPhase, updateErr)
}
return false, reconcileErr
}

return &internal.LVData{}, nil
switch recType {
case CreateReconcile:
log.Debug(fmt.Sprintf("[runEventReconcile] CreateReconcile starts reconciliation for the LVMLogicalVolume: %s", llv.Name))
reconcileLLVCreateFunc(ctx, cl, log, metrics, llv, lvg)
case UpdateReconcile:
log.Debug(fmt.Sprintf("[runEventReconcile] UpdateReconcile starts reconciliation for the LVMLogicalVolume: %s", llv.Name))
reconcileLLVUpdateFunc(ctx, cl, log, metrics, llv, lvg)
case DeleteReconcile:
log.Debug(fmt.Sprintf("[runEventReconcile] DeleteReconcile starts reconciliation for the LVMLogicalVolume: %s", llv.Name))
reconcileLLVDeleteFunc(ctx, cl, log, metrics, llv, lvg)
default:
log.Debug(fmt.Sprintf("[runEventReconcile] the LVMLogicalVolume %s should not be reconciled", llv.Name))
if llv.Status.Phase != createdStatusPhase {
log.Warning(fmt.Sprintf("[runEventReconcile] the LVMLogicalVolume %s should not be reconciled but has an unexpected phase: %s. Setting the phase to %s", llv.Name, llv.Status.Phase, createdStatusPhase))
err = updateLVMLogicalVolumePhase(ctx, cl, log, metrics, llv, createdStatusPhase, "")
if err != nil {
return true, fmt.Errorf("[runEventReconcile] unable to update the LVMLogicalVolume %s status.phase to %s: %w", llv.Name, createdStatusPhase, err)
}
}
}
return false, nil
}
Loading