Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Add a BlockDevice labels watcher controller #94

Merged
merged 28 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions images/agent/src/internal/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package internal
import "k8s.io/apimachinery/pkg/api/resource"

const (
// LVGUpdateTriggerLabel if you change this value, you must change its value in sds-health-watcher-controller/src/pkg/block_device_labels_watcher.go as well
LVGUpdateTriggerLabel = "update-trigger"

resizeDelta = "32Mi"
PartType = "part"
MultiPathType = "mpath"
Expand Down
10 changes: 5 additions & 5 deletions images/agent/src/pkg/controller/lvm_volume_group_discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ func LVMVolumeGroupDiscoverReconcile(ctx context.Context, cl client.Client, metr

log.Info(fmt.Sprintf(`[RunLVMVolumeGroupDiscoverController] updated LVMVolumeGroup, name: "%s"`, lvg.Name))
} else {
log.Debug(fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] the LVMVolumeGroup %s is not yet created. Create it", lvg.Name))
lvm, err := CreateLVMVolumeGroupByCandidate(ctx, log, metrics, cl, candidate, cfg.NodeName)
log.Debug(fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] the LVMVolumeGroup %s is not yet created. Create it", candidate.LVMVGName))
createdLvg, err := CreateLVMVolumeGroupByCandidate(ctx, log, metrics, cl, candidate, cfg.NodeName)
if err != nil {
log.Error(err, fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] unable to CreateLVMVolumeGroupByCandidate %s. Requeue the request in %s", candidate.LVMVGName, cfg.VolumeGroupScanIntervalSec.String()))
shouldRequeue = true
Expand All @@ -173,19 +173,19 @@ func LVMVolumeGroupDiscoverReconcile(ctx context.Context, cl client.Client, metr

err = updateLVGConditionIfNeeded(ctx, cl, log, &lvg, metav1.ConditionTrue, internal.TypeVGConfigurationApplied, "Success", "all configuration has been applied")
if err != nil {
log.Error(err, fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] unable to add a condition %s to the LVMVolumeGroup %s", internal.TypeVGConfigurationApplied, lvg.Name))
log.Error(err, fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] unable to add a condition %s to the LVMVolumeGroup %s", internal.TypeVGConfigurationApplied, createdLvg.Name))
shouldRequeue = true
continue
}

err = updateLVGConditionIfNeeded(ctx, cl, log, &lvg, metav1.ConditionTrue, internal.TypeVGReady, internal.ReasonUpdated, "ready to create LV")
if err != nil {
log.Error(err, fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] unable to add a condition %s to the LVMVolumeGroup %s", internal.TypeVGReady, lvg.Name))
log.Error(err, fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] unable to add a condition %s to the LVMVolumeGroup %s", internal.TypeVGReady, createdLvg.Name))
shouldRequeue = true
continue
}

log.Info(fmt.Sprintf(`[RunLVMVolumeGroupDiscoverController] created new APILVMVolumeGroup, name: "%s"`, lvm.Name))
log.Info(fmt.Sprintf(`[RunLVMVolumeGroupDiscoverController] created new APILVMVolumeGroup, name: "%s"`, createdLvg.Name))
}
}

Expand Down
27 changes: 12 additions & 15 deletions images/agent/src/pkg/controller/lvm_volume_group_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"fmt"

"github.com/deckhouse/sds-node-configurator/api/v1alpha1"
errors2 "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -63,7 +63,7 @@ func RunLVMVolumeGroupWatcherController(
lvg := &v1alpha1.LVMVolumeGroup{}
err := cl.Get(ctx, request.NamespacedName, lvg)
if err != nil {
if errors2.IsNotFound(err) {
if errors.IsNotFound(err) {
log.Warning(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] seems like the LVMVolumeGroup was deleted as unable to get it, err: %s. Stop to reconcile", err.Error()))
return reconcile.Result{}, nil
}
Expand Down Expand Up @@ -108,6 +108,16 @@ func RunLVMVolumeGroupWatcherController(
return reconcile.Result{}, nil
}

if _, exist := lvg.Labels[internal.LVGUpdateTriggerLabel]; exist {
delete(lvg.Labels, internal.LVGUpdateTriggerLabel)
err = cl.Update(ctx, lvg)
if err != nil {
log.Error(err, fmt.Sprintf("[RunLVMVolumeGroupWatcherController] unable to update the LVMVolumeGroup %s", lvg.Name))
return reconcile.Result{}, err
}
log.Debug(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] successfully removed the label %s from the LVMVolumeGroup %s", internal.LVGUpdateTriggerLabel, lvg.Name))
}

log.Debug(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] tries to get block device resources for the LVMVolumeGroup %s by the selector %v", lvg.Name, lvg.Spec.BlockDeviceSelector.MatchLabels))
blockDevices, err := GetAPIBlockDevices(ctx, cl, metrics, lvg.Spec.BlockDeviceSelector)
if err != nil {
Expand Down Expand Up @@ -147,19 +157,6 @@ func RunLVMVolumeGroupWatcherController(
log.Debug(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] no need to add label %s to the LVMVolumeGroup %s", LVGMetadateNameLabelKey, lvg.Name))
}

log.Debug(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] tries to add label %s to the LVMVolumeGroup %s", LVGMetadateNameLabelKey, cfg.NodeName))
added, err = addLVGLabelIfNeeded(ctx, cl, log, lvg, LVGMetadateNameLabelKey, lvg.Name)
if err != nil {
log.Error(err, fmt.Sprintf("[RunLVMVolumeGroupWatcherController] unable to add label %s to the LVMVolumeGroup %s", LVGMetadateNameLabelKey, lvg.Name))
return reconcile.Result{}, err
}

if added {
log.Debug(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] successfully added label %s to the LVMVolumeGroup %s", LVGMetadateNameLabelKey, lvg.Name))
} else {
log.Debug(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] no need to add label %s to the LVMVolumeGroup %s", LVGMetadateNameLabelKey, lvg.Name))
}

// We do this after BlockDevices validation and node belonging check to prevent multiple updates by all agents pods
bds, _ := sdsCache.GetDevices()
if len(bds) == 0 {
Expand Down
23 changes: 22 additions & 1 deletion images/agent/src/pkg/controller/lvm_volume_group_watcher_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ func shouldLVGWatcherReconcileUpdateEvent(log logger.Logger, oldLVG, newLVG *v1a
return true
}

if _, exist := newLVG.Labels[internal.LVGUpdateTriggerLabel]; exist {
log.Debug(fmt.Sprintf("[shouldLVGWatcherReconcileUpdateEvent] update event should be reconciled as the LVMVolumeGroup %s has the label %s", newLVG.Name, internal.LVGUpdateTriggerLabel))
return true
}

if shouldUpdateLVGLabels(log, newLVG, LVGMetadateNameLabelKey, newLVG.Name) {
log.Debug(fmt.Sprintf("[shouldLVGWatcherReconcileUpdateEvent] update event should be reconciled as the LVMVolumeGroup's %s labels have been changed", newLVG.Name))
return true
Expand Down Expand Up @@ -241,7 +246,7 @@ func syncThinPoolsAllocationLimit(ctx context.Context, cl client.Client, log log
if updated {
fmt.Printf("%+v", lvg.Status.ThinPools)
log.Debug(fmt.Sprintf("[syncThinPoolsAllocationLimit] tries to update the LVMVolumeGroup %s", lvg.Name))
err := cl.Status().Update(ctx, lvg)
err = cl.Status().Update(ctx, lvg)
if err != nil {
return err
}
Expand All @@ -258,6 +263,22 @@ func validateSpecBlockDevices(lvg *v1alpha1.LVMVolumeGroup, blockDevices map[str
return false, "none of specified BlockDevices were found"
}

if len(lvg.Status.Nodes) > 0 {
lostBdNames := make([]string, 0, len(lvg.Status.Nodes[0].Devices))
for _, n := range lvg.Status.Nodes {
for _, d := range n.Devices {
if _, found := blockDevices[d.BlockDevice]; !found {
lostBdNames = append(lostBdNames, d.BlockDevice)
}
}
}

// that means some of the used BlockDevices no longer match the blockDeviceSelector
if len(lostBdNames) > 0 {
return false, fmt.Sprintf("these BlockDevices no longer match the blockDeviceSelector: %s", strings.Join(lostBdNames, ","))
}
}

for _, me := range lvg.Spec.BlockDeviceSelector.MatchExpressions {
if me.Key == internal.MetadataNameLabelKey {
if len(me.Values) != len(blockDevices) {
Expand Down
38 changes: 36 additions & 2 deletions images/agent/src/pkg/controller/lvm_volume_group_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,38 @@ func TestLVMVolumeGroupWatcherCtrl(t *testing.T) {
}
})

t.Run("validation_fails_due_to_bd_left_the_selector", func(t *testing.T) {
lvg := &v1alpha1.LVMVolumeGroup{
Status: v1alpha1.LVMVolumeGroupStatus{
Nodes: []v1alpha1.LVMVolumeGroupNode{
{
Devices: []v1alpha1.LVMVolumeGroupDevice{
{
BlockDevice: "first",
},
{
BlockDevice: "second",
},
},
Name: "some-node",
},
},
},
}

bds := map[string]v1alpha1.BlockDevice{
"second": {
ObjectMeta: v1.ObjectMeta{
Name: "second",
},
},
}

valid, reason := validateSpecBlockDevices(lvg, bds)
assert.False(t, valid)
assert.Equal(t, "these BlockDevices no longer match the blockDeviceSelector: first", reason)
})

t.Run("validation_fails_due_to_bd_has_dif_node", func(t *testing.T) {
const (
nodeName = "nodeName"
Expand Down Expand Up @@ -762,8 +794,9 @@ func TestLVMVolumeGroupWatcherCtrl(t *testing.T) {
},
}

valid, _ := validateSpecBlockDevices(lvg, bds)
valid, reason := validateSpecBlockDevices(lvg, bds)
assert.False(t, valid)
assert.Equal(t, "block devices second have different node names from LVMVolumeGroup Local.NodeName", reason)
})

t.Run("validation_fails_due_to_no_block_devices_were_found", func(t *testing.T) {
Expand All @@ -787,8 +820,9 @@ func TestLVMVolumeGroupWatcherCtrl(t *testing.T) {
},
}

valid, _ := validateSpecBlockDevices(lvg, nil)
valid, reason := validateSpecBlockDevices(lvg, nil)
assert.False(t, valid)
assert.Equal(t, "none of specified BlockDevices were found", reason)
})

t.Run("validation_fails_due_to_some_blockdevice_were_not_found", func(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions images/sds-health-watcher-controller/src/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ func main() {
os.Exit(1)
}

err = controller.RunBlockDeviceLabelsWatcher(mgr, *log, *cfgParams)
if err != nil {
log.Error(err, "[main] unable to run BlockDeviceWatcher controller")
os.Exit(1)
}

if err = mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
log.Error(err, "[main] unable to mgr.AddHealthzCheck")
os.Exit(1)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package controller

import (
"context"
"fmt"
"reflect"

"github.com/deckhouse/sds-node-configurator/api/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

"sds-health-watcher-controller/config"
"sds-health-watcher-controller/internal"
"sds-health-watcher-controller/pkg/logger"
)

const (
BlockDeviceLabelsWatcherCtrlName = "block-device-labels-watcher-controller"

// LVGUpdateTriggerLabel if you change this value, you must change its value in agent/src/internal/const.go as well
LVGUpdateTriggerLabel = "update-trigger"
)

func RunBlockDeviceLabelsWatcher(
mgr manager.Manager,
log logger.Logger,
cfg config.Options,
) error {
cl := mgr.GetClient()

c, err := controller.New(BlockDeviceLabelsWatcherCtrlName, mgr, controller.Options{
Reconciler: reconcile.Func(func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
log.Info(fmt.Sprintf("[RunBlockDeviceLabelsWatcher] starts to reconcile the BlockDevice %s", request.Name))

bd := &v1alpha1.BlockDevice{}
err := cl.Get(ctx, request.NamespacedName, bd)
if err != nil {
if errors.IsNotFound(err) {
log.Warning(fmt.Sprintf("[RunBlockDeviceLabelsWatcher] seems like the BlockDevice %s was removed as it was not found. Stop the reconcile", request.Name))
return reconcile.Result{}, nil
}

log.Error(err, fmt.Sprintf("[RunBlockDeviceLabelsWatcher] unable to get the BlockDevice %s", request.Name))
return reconcile.Result{}, err
}

shouldRequeue, err := ReconcileBlockDeviceLabels(ctx, cl, log, bd)
if err != nil {
log.Error(err, fmt.Sprintf("[RunBlockDeviceLabelsWatcher] unable to reconcile the BlockDevice %s", bd.Name))
return reconcile.Result{}, err
}

if shouldRequeue {
log.Warning(fmt.Sprintf("[RunBlockDeviceLabelsWatcher] the request for the BlockDevice %s should be requeued in %s", bd.Name, cfg.ScanIntervalSec.String()))
return reconcile.Result{RequeueAfter: cfg.ScanIntervalSec}, nil
}

log.Info(fmt.Sprintf("[RunBlockDeviceLabelsWatcher] the BlockDevice %s was successfully reconciled", bd.Name))
return reconcile.Result{}, nil
}),
})
if err != nil {
log.Error(err, "[RunBlockDeviceLabelsWatcher] unable to create the controller")
return err
}

err = c.Watch(source.Kind(mgr.GetCache(), &v1alpha1.BlockDevice{}, handler.TypedFuncs[*v1alpha1.BlockDevice, reconcile.Request]{
CreateFunc: func(_ context.Context, e event.TypedCreateEvent[*v1alpha1.BlockDevice], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
log.Debug(fmt.Sprintf("[RunBlockDeviceLabelsWatcher] got a Create event for the BlockDevice %s", e.Object.Name))
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Namespace: e.Object.GetNamespace(), Name: e.Object.GetName()}})
log.Debug(fmt.Sprintf("[RunBlockDeviceLabelsWatcher] the BlockDevice %s was added to the Reconciler's queue", e.Object.Name))
},
UpdateFunc: func(_ context.Context, e event.TypedUpdateEvent[*v1alpha1.BlockDevice], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
log.Debug(fmt.Sprintf("[RunBlockDeviceLabelsWatcher] got an Update event for the BlockDevice %s", e.ObjectNew.Name))

if reflect.DeepEqual(e.ObjectOld.Labels, e.ObjectNew.Labels) {
log.Debug(fmt.Sprintf("[RunBlockDeviceLabelsWatcher] no need to reconcile the BlockDevice %s as its labels are the same", e.ObjectNew.Name))
return
}

q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Namespace: e.ObjectNew.GetNamespace(), Name: e.ObjectNew.GetName()}})
log.Debug(fmt.Sprintf("[RunBlockDeviceLabelsWatcher] the BlockDevice %s was added to the Reconciler's queue", e.ObjectNew.Name))
},
}))
if err != nil {
log.Error(err, "[RunBlockDeviceLabelsWatcher] unable to controller.Watch")
return err
}

return nil
}

func ReconcileBlockDeviceLabels(ctx context.Context, cl client.Client, log logger.Logger, blockDevice *v1alpha1.BlockDevice) (bool, error) {
astef marked this conversation as resolved.
Show resolved Hide resolved
log.Info(fmt.Sprintf("[ReconcileBlockDeviceLabels] starts the reconciliation for the BlockDevice %s", blockDevice.Name))
shouldRetry := false

log.Debug("[ReconcileBlockDeviceLabels] tries to list LVMVolumeGroups")
lvgList := &v1alpha1.LVMVolumeGroupList{}
err := cl.List(ctx, lvgList)
if err != nil {
return false, err
}
log.Debug("[ReconcileBlockDeviceLabels] successfully listed LVMVolumeGroups")

for _, lvg := range lvgList.Items {
if len(lvg.Status.Nodes) == 0 {
log.Info(fmt.Sprintf("[ReconcileBlockDeviceLabels] LVMVolumeGroup %s nodes are not configured yet, retry later...", lvg.Name))
shouldRetry = true
continue
}

log.Debug(fmt.Sprintf("[ReconcileBlockDeviceLabels] tries to configure a selector from blockDeviceSelector of the LVMVolumeGroup %s", lvg.Name))
selector, err := metav1.LabelSelectorAsSelector(lvg.Spec.BlockDeviceSelector)
if err != nil {
return false, err
}
log.Debug(fmt.Sprintf("[ReconcileBlockDeviceLabels] successfully configured a selector from blockDeviceSelector of the LVMVolumeGroup %s", lvg.Name))

usedBdNames := make(map[string]struct{}, len(lvg.Status.Nodes[0].Devices))
for _, n := range lvg.Status.Nodes {
for _, d := range n.Devices {
usedBdNames[d.BlockDevice] = struct{}{}
}
}
if lvg.Labels == nil {
lvg.Labels = make(map[string]string)
}

shouldTrigger := false
switch selector.Matches(labels.Set(blockDevice.Labels)) {
astef marked this conversation as resolved.
Show resolved Hide resolved
case true:
log.Debug(fmt.Sprintf("[ReconcileBlockDeviceLabels] BlockDevice %s matches a blockDeviceSelector of the LVMVolumeGroup %s", blockDevice.Name, lvg.Name))
if _, used := usedBdNames[blockDevice.Name]; !used {
log.Info(fmt.Sprintf("[ReconcileBlockDeviceLabels] the BlockDevice %s matches the LVMVolumeGroup %s blockDeviceSelector, but is not used yet. Add the label %s to provide LVMVolumeGroup resource configuration update", blockDevice.Name, lvg.Name, LVGUpdateTriggerLabel))
shouldTrigger = true
break
}

// for the case when BlockDevice stopped match the LVG blockDeviceSelector and then start again
for _, c := range lvg.Status.Conditions {
if c.Type == internal.TypeVGConfigurationApplied && c.Status == metav1.ConditionFalse {
log.Warning(fmt.Sprintf("[ReconcileBlockDeviceLabels] the BlockDevice %s matches the LVMVolumeGroup %s blockDeviceSelector, but the LVMVolumeGroup has condition %s in status False. Add the label %s to provide LVMVolumeGroup resource configuration update", blockDevice.Name, lvg.Name, c.Type, LVGUpdateTriggerLabel))
shouldTrigger = true
break
}
}

log.Debug(fmt.Sprintf("[ReconcileBlockDeviceLabels] the BlockDevice %s matches the LVMVolumeGroup %s blockDeviceSelector and already used by the resource", blockDevice.Name, lvg.Name))
case false:
log.Debug(fmt.Sprintf("[ReconcileBlockDeviceLabels] BlockDevice %s does not match a blockDeviceSelector of the LVMVolumeGroup %s", blockDevice.Name, lvg.Name))
if _, used := usedBdNames[blockDevice.Name]; used {
log.Warning(fmt.Sprintf("[ReconcileBlockDeviceLabels] the BlockDevice %s does not match the LVMVolumeGroup %s blockDeviceSelector, but is used by the resource. Add the label %s to provide LVMVolumeGroup resource configuration update", blockDevice.Name, lvg.Name, LVGUpdateTriggerLabel))
shouldTrigger = true
break
}

log.Debug(fmt.Sprintf("[ReconcileBlockDeviceLabels] the BlockDevice %s does not match the LVMVolumeGroup %s blockDeviceSelector and is not used by the resource", blockDevice.Name, lvg.Name))
}
astef marked this conversation as resolved.
Show resolved Hide resolved

if shouldTrigger {
lvg.Labels[LVGUpdateTriggerLabel] = "true"
log.Info(fmt.Sprintf("[ReconcileBlockDeviceLabels] the LVMVolumeGroup %s should be triggered to update its configuration", lvg.Name))
err = cl.Update(ctx, &lvg)
if err != nil {
return false, err
}
log.Info(fmt.Sprintf("[ReconcileBlockDeviceLabels] successfully added the label %s to provide LVMVolumeGroup %s resource configuration update", LVGUpdateTriggerLabel, lvg.Name))
}
}

return shouldRetry, nil
}
Loading
Loading