Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Add parallel reconciliation to lvm_logical_volume_watcher. Add VG size validation to lvm_volume_group_watcher #40

Merged
merged 21 commits into from
Apr 25, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactor func
Signed-off-by: Aleksandr Zimin <alexandr.zimin@flant.com>
AleksZimin committed Apr 8, 2024
commit 6a3f54e4620cde31db8e20c11d50a96a5972124d
2 changes: 2 additions & 0 deletions images/agent/config/config.go
Original file line number Diff line number Diff line change
@@ -41,6 +41,7 @@ type Options struct {
MetricsPort string
BlockDeviceScanInterval time.Duration
VolumeGroupScanInterval time.Duration
LLVRequeInterval time.Duration
}

func NewConfig() (*Options, error) {
@@ -71,6 +72,7 @@ func NewConfig() (*Options, error) {

opts.BlockDeviceScanInterval = 5
opts.VolumeGroupScanInterval = 5
opts.LLVRequeInterval = 5

return &opts, nil
}
179 changes: 95 additions & 84 deletions images/agent/pkg/controller/lvm_logical_volume_watcher.go
Original file line number Diff line number Diff line change
@@ -16,7 +16,9 @@ import (
"time"

"github.com/google/go-cmp/cmp"
k8serr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/strings/slices"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -59,49 +61,37 @@ func RunLVMLogicalVolumeWatcherController(

c, err := controller.New(lvmLogicalVolumeWatcherCtrlName, mgr, controller.Options{
Reconciler: reconcile.Func(func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
return reconcile.Result{}, nil
log.Debug(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] Reconciler starts reconciliation of LLV: %s", request.Name))
shouldRequeue, err := ReconcileLVMLogicalVolume(ctx, cl, log, metrics, cfg, request)
if err != nil {
log.Error(err, fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] an error occurred while reconciling LLV: %s", request.Name))
}
if shouldRequeue {
log.Info(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] requeue reconciliation of LLV: %s after %s", request.Name, cfg.LLVRequeInterval))
return reconcile.Result{RequeueAfter: cfg.LLVRequeInterval}, nil
}
log.Debug(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] ends reconciliation of LLV: %s without requeue", request.Name))
return reconcile.Result{Requeue: false}, nil
}),
MaxConcurrentReconciles: 10,
})

if err != nil {
log.Error(err, "[RunLVMLogicalVolumeWatcherController] unable to create controller")
return nil, err
}

err = c.Watch(source.Kind(cache, &v1alpha1.LVMLogicalVolume{}), handler.Funcs{
CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
log.Info(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] CreateFunc starts reconciliation of LLV: %s", e.Object.GetName()))

llv, ok := e.Object.(*v1alpha1.LVMLogicalVolume)
if !ok {
err = errors.New("unable to cast event object to a given type")
log.Error(err, "[CreateFunc] an error occurred while handling create event")
return
}

lvg, err := getLVMVolumeGroup(ctx, cl, metrics, "", llv.Spec.LvmVolumeGroupName)
if err != nil {
log.Error(err, "[CreateFunc] unable to get a LVMVolumeGroup")
err = updateLVMLogicalVolumePhase(ctx, cl, log, metrics, llv, failedStatusPhase, fmt.Sprintf("Unable to get selected LVMVolumeGroup, err: %s", err.Error()))
if err != nil {
log.Error(err, "[CreateFunc] unable to update a LVMLogicalVolume Phase")
}
return
}

if !belongsToNode(lvg, cfg.NodeName) {
log.Debug(fmt.Sprintf("[CreateFunc] the LVMVolumeGroup %s does not belongs to the current node: %s. Reconciliation stopped", lvg.Name, cfg.NodeName))
return
}
log.Debug(fmt.Sprintf("[CreateFunc] the LVMVolumeGroup %s belongs to the current node: %s", lvg.Name, cfg.NodeName))

runEventReconcile(ctx, cl, log, metrics, llv, lvg)
log.Debug(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] CreateFunc starts reconciliation of LLV: %s", e.Object.GetName()))
request := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: e.Object.GetNamespace(), Name: e.Object.GetName()}}

log.Info(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] CreateFunc ends reconciliation of LLV: %s", llv.Name))
q.Add(request)
log.Debug(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] CreateFunc ends reconciliation of LLV: %s", e.Object.GetName()))
},

UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
log.Info(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] UpdateFunc starts reconciliation of LLV: %s", e.ObjectNew.GetName()))
log.Debug(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] UpdateFunc starts reconciliation of LLV: %s", e.ObjectNew.GetName()))

oldLLV, ok := e.ObjectOld.(*v1alpha1.LVMLogicalVolume)
if !ok {
@@ -127,30 +117,15 @@ func RunLVMLogicalVolumeWatcherController(
}

if reflect.DeepEqual(oldLLV.Spec, newLLV.Spec) && newLLV.DeletionTimestamp == nil {
log.Info(fmt.Sprintf("[UpdateFunc] the LVMLogicalVolume %s has not been changed", newLLV.Name))
log.Info(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] UpdateFunc ends reconciliation of LLV: %s", newLLV.Name))
return
}

lvg, err := getLVMVolumeGroup(ctx, cl, metrics, "", newLLV.Spec.LvmVolumeGroupName)
if err != nil {
log.Error(err, fmt.Sprintf("[UpdateFunc] unable to get the LVMVolumeGroup, name: %s", newLLV.Spec.LvmVolumeGroupName))
err = updateLVMLogicalVolumePhase(ctx, cl, log, metrics, newLLV, failedStatusPhase, fmt.Sprintf("Unable to get selected LVMVolumeGroup, err: %s", err.Error()))
if err != nil {
log.Error(err, "[UpdateFunc] unable to updateLVMLogicalVolumePhase")
}
return
}

if !belongsToNode(lvg, cfg.NodeName) {
log.Debug(fmt.Sprintf("[UpdateFunc] the LVMVolumeGroup %s does not belongs to the current node: %s. Reconciliation stopped", lvg.Name, cfg.NodeName))
log.Debug(fmt.Sprintf("[UpdateFunc] the LVMLogicalVolume %s has not been changed", newLLV.Name))
log.Debug(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] UpdateFunc ends reconciliation of LLV: %s", newLLV.Name))
return
}
log.Debug(fmt.Sprintf("[UpdateFunc] the LVMVolumeGroup %s belongs to the current node: %s", lvg.Name, cfg.NodeName))
log.Debug(fmt.Sprintf("[UpdateFunc] the LVMLogicalVolume %s has been changed. Add to the queue", newLLV.Name))

runEventReconcile(ctx, cl, log, metrics, newLLV, lvg)

log.Info(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] UpdateFunc ends reconciliation of LLV: %s", newLLV.Name))
request := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: newLLV.Namespace, Name: newLLV.Name}}
q.Add(request)
log.Debug(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] UpdateFunc ends reconciliation of LLV: %s", newLLV.Name))
},
})
if err != nil {
@@ -161,40 +136,6 @@ func RunLVMLogicalVolumeWatcherController(
return c, err
}

func runEventReconcile(ctx context.Context, cl client.Client, log logger.Logger, metrics monitoring.Metrics, llv *v1alpha1.LVMLogicalVolume, lvg *v1alpha1.LvmVolumeGroup) {
log.Trace("[runEventReconcile] starts reconciliation. Identify reconcile func. vgName: "+lvg.Spec.ActualVGNameOnTheNode+", llv:", llv.Name, llv)
recType, err := identifyReconcileFunc(log, lvg.Spec.ActualVGNameOnTheNode, llv)
if err != nil {
log.Error(err, "[runEventReconcile] an error occurs while identify reconcile func")
err = updateLVMLogicalVolumePhase(ctx, cl, log, metrics, llv, failedStatusPhase, fmt.Sprintf("An error occurred while identifying the reconcile func, err: %s", err.Error()))
if err != nil {
log.Error(err, "[runEventReconcile] unable to update a LVMLogicalVolume Phase")
}
return
}

switch recType {
case CreateReconcile:
log.Debug(fmt.Sprintf("[runEventReconcile] CreateReconcile starts reconciliation for the LVMLogicalVolume: %s", llv.Name))
reconcileLLVCreateFunc(ctx, cl, log, metrics, llv, lvg)
case UpdateReconcile:
log.Debug(fmt.Sprintf("[runEventReconcile] UpdateReconcile starts reconciliation for the LVMLogicalVolume: %s", llv.Name))
reconcileLLVUpdateFunc(ctx, cl, log, metrics, llv, lvg)
case DeleteReconcile:
log.Debug(fmt.Sprintf("[runEventReconcile] DeleteReconcile starts reconciliation for the LVMLogicalVolume: %s", llv.Name))
reconcileLLVDeleteFunc(ctx, cl, log, metrics, llv, lvg)
default:
log.Debug(fmt.Sprintf("[runEventReconcile] the LVMLogicalVolume %s should not be reconciled", llv.Name))
if llv.Status.Phase != createdStatusPhase {
log.Warning(fmt.Sprintf("[runEventReconcile] the LVMLogicalVolume %s should not be reconciled but has an unexpected phase: %s. Setting the phase to %s", llv.Name, llv.Status.Phase, createdStatusPhase))
err = updateLVMLogicalVolumePhase(ctx, cl, log, metrics, llv, createdStatusPhase, "")
if err != nil {
log.Error(err, fmt.Sprintf("[runEventReconcile] unable to update the LVMLogicalVolume %s", llv.Name))
}
}
}
}

func identifyReconcileFunc(log logger.Logger, vgName string, llv *v1alpha1.LVMLogicalVolume) (reconcileType, error) {
should, err := shouldReconcileByCreateFunc(log, vgName, llv)
if err != nil {
@@ -778,3 +719,73 @@ func FindLV(log logger.Logger, vgName, lvName string) (*internal.LVData, error)
return &lv, nil

}

func ReconcileLVMLogicalVolume(ctx context.Context, cl client.Client, log logger.Logger, metrics monitoring.Metrics, cfg config.Options, request reconcile.Request) (bool, error) {
llv := &v1alpha1.LVMLogicalVolume{}
err := cl.Get(ctx, request.NamespacedName, llv)
if err != nil {
if k8serr.IsNotFound(err) {
log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolume] LVMLogicalVolume %s not found. Object has probably been deleted.", request.NamespacedName))
return false, nil
}
return true, fmt.Errorf("[ReconcileLVMLogicalVolume] unable to get LVMLogicalVolume: %w", err)
}

lvg, err := getLVMVolumeGroup(ctx, cl, metrics, "", llv.Spec.LvmVolumeGroupName)
if err != nil {
if k8serr.IsNotFound(err) {
log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolume] LVMVolumeGroup %s not found for LVMLogicalVolume %s", llv.Spec.LvmVolumeGroupName, llv.Name))
err = updateLVMLogicalVolumePhase(ctx, cl, log, metrics, llv, failedStatusPhase, fmt.Sprintf("Unable to get selected LVMVolumeGroup %s, err: %s", llv.Spec.LvmVolumeGroupName, err.Error()))
if err != nil {
return true, fmt.Errorf("[ReconcileLVMLogicalVolume] unable to update the LVMLogicalVolume %s status.phase to %s: %w", llv.Name, failedStatusPhase, err)
}
return true, nil
}
firstErr := fmt.Errorf("[ReconcileLVMLogicalVolume] unable to get LVMVolumeGroup %s: %w", llv.Spec.LvmVolumeGroupName, err)
secondErr := updateLVMLogicalVolumePhase(ctx, cl, log, metrics, llv, failedStatusPhase, fmt.Sprintf("Unable to get selected LVMVolumeGroup, err: %s", err.Error()))
if secondErr != nil {
return true, fmt.Errorf("%s. Also failed to update LVMLogicalVolume %s status.phase to %s: %w", firstErr, llv.Name, failedStatusPhase, secondErr)
}
return false, firstErr
}

if !belongsToNode(lvg, cfg.NodeName) {
log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolume] the LVMVolumeGroup %s does not belongs to the current node: %s. Reconciliation stopped", lvg.Name, cfg.NodeName))
return false, nil
}

log.Info(fmt.Sprintf("[ReconcileLVMLogicalVolume] the LVMVolumeGroup %s belongs to the current node: %s. Reconciliation continues", lvg.Name, cfg.NodeName))
log.Debug("[ReconcileLVMLogicalVolume] Identify reconcile func. vgName: "+lvg.Spec.ActualVGNameOnTheNode+", llv:", llv.Name, llv)

recType, err := identifyReconcileFunc(log, lvg.Spec.ActualVGNameOnTheNode, llv)
if err != nil {
firstErr := fmt.Errorf("[ReconcileLVMLogicalVolume] unable to identify the reconcile func: %w", err)
secondErr := updateLVMLogicalVolumePhase(ctx, cl, log, metrics, llv, failedStatusPhase, fmt.Sprintf("An error occurred while identifying the reconcile func, err: %s", err.Error()))
if secondErr != nil {
return true, fmt.Errorf("%s. Also failed to update LVMLogicalVolume %s status.phase to %s: %w", firstErr, llv.Name, failedStatusPhase, secondErr)
}
return false, firstErr
}

switch recType {
case CreateReconcile:
log.Debug(fmt.Sprintf("[runEventReconcile] CreateReconcile starts reconciliation for the LVMLogicalVolume: %s", llv.Name))
reconcileLLVCreateFunc(ctx, cl, log, metrics, llv, lvg)
case UpdateReconcile:
log.Debug(fmt.Sprintf("[runEventReconcile] UpdateReconcile starts reconciliation for the LVMLogicalVolume: %s", llv.Name))
reconcileLLVUpdateFunc(ctx, cl, log, metrics, llv, lvg)
case DeleteReconcile:
log.Debug(fmt.Sprintf("[runEventReconcile] DeleteReconcile starts reconciliation for the LVMLogicalVolume: %s", llv.Name))
reconcileLLVDeleteFunc(ctx, cl, log, metrics, llv, lvg)
default:
log.Debug(fmt.Sprintf("[runEventReconcile] the LVMLogicalVolume %s should not be reconciled", llv.Name))
if llv.Status.Phase != createdStatusPhase {
log.Warning(fmt.Sprintf("[runEventReconcile] the LVMLogicalVolume %s should not be reconciled but has an unexpected phase: %s. Setting the phase to %s", llv.Name, llv.Status.Phase, createdStatusPhase))
err = updateLVMLogicalVolumePhase(ctx, cl, log, metrics, llv, createdStatusPhase, "")
if err != nil {
return true, fmt.Errorf("[runEventReconcile] unable to update the LVMLogicalVolume %s status.phase to %s: %w", llv.Name, createdStatusPhase, err)
}
}
}
return false, nil
}