Skip to content

Commit

Permalink
first draft
Browse files Browse the repository at this point in the history
Signed-off-by: Viktor Kramarenko <[email protected]>
  • Loading branch information
ViktorKram committed Sep 18, 2024
1 parent 084c1ae commit 2e4a5e4
Show file tree
Hide file tree
Showing 9 changed files with 539 additions and 5 deletions.
2 changes: 2 additions & 0 deletions images/agent/src/internal/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package internal
import "k8s.io/apimachinery/pkg/api/resource"

const (
LVGUpdateTriggerLabel = "update-trigger"

resizeDelta = "32Mi"
PartType = "part"
MultiPathType = "mpath"
Expand Down
3 changes: 2 additions & 1 deletion images/agent/src/pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package cache

import (
"bytes"
"github.com/stretchr/testify/assert"
"testing"

"github.com/stretchr/testify/assert"

"agent/internal"
)

Expand Down
6 changes: 4 additions & 2 deletions images/agent/src/pkg/controller/lvm_volume_group_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"fmt"

"github.com/deckhouse/sds-node-configurator/api/v1alpha1"
errors2 "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -63,7 +63,7 @@ func RunLVMVolumeGroupWatcherController(
lvg := &v1alpha1.LVMVolumeGroup{}
err := cl.Get(ctx, request.NamespacedName, lvg)
if err != nil {
if errors2.IsNotFound(err) {
if errors.IsNotFound(err) {
log.Warning(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] seems like the LVMVolumeGroup was deleted as unable to get it, err: %s. Stop to reconcile", err.Error()))
return reconcile.Result{}, nil
}
Expand Down Expand Up @@ -108,6 +108,8 @@ func RunLVMVolumeGroupWatcherController(
return reconcile.Result{}, nil
}

delete(lvg.Labels, internal.LVGUpdateTriggerLabel)

log.Debug(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] tries to get block device resources for the LVMVolumeGroup %s by the selector %v", lvg.Name, lvg.Spec.BlockDeviceSelector.MatchLabels))
blockDevices, err := GetAPIBlockDevicesBySelector(ctx, cl, metrics, lvg.Spec.BlockDeviceSelector)
if err != nil {
Expand Down
21 changes: 21 additions & 0 deletions images/agent/src/pkg/controller/lvm_volume_group_watcher_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ func shouldLVGWatcherReconcileUpdateEvent(log logger.Logger, oldLVG, newLVG *v1a
return true
}

if _, exist := newLVG.Labels[internal.LVGUpdateTriggerLabel]; exist {
log.Debug(fmt.Sprintf("[shouldLVGWatcherReconcileUpdateEvent] updte event should be reconciled as the LVMVolumeGroup %s has the label %s", newLVG.Name, internal.LVGUpdateTriggerLabel))
return true
}

if shouldUpdateLVGLabels(log, newLVG, LVGMetadateNameLabelKey, newLVG.Name) {
log.Debug(fmt.Sprintf("[shouldLVGWatcherReconcileUpdateEvent] update event should be reconciled as the LVMVolumeGroup's %s labels have been changed", newLVG.Name))
return true
Expand Down Expand Up @@ -258,6 +263,22 @@ func validateSpecBlockDevices(lvg *v1alpha1.LVMVolumeGroup, blockDevices map[str
return false, "none of specified BlockDevices were found"
}

if len(lvg.Status.Nodes) > 0 {
lostBdNames := make([]string, 0, len(lvg.Status.Nodes[0].Devices))
for _, n := range lvg.Status.Nodes {
for _, d := range n.Devices {
if _, found := blockDevices[d.BlockDevice]; !found {
lostBdNames = append(lostBdNames, d.BlockDevice)
}
}
}

// that means some of the used BlockDevices no longer match the blockDeviceSelector
if len(lostBdNames) > 0 {
return false, fmt.Sprintf("these BlockDevices no longer match the blockDeviceSelector: %s", strings.Join(lostBdNames, ","))
}
}

for _, me := range lvg.Spec.BlockDeviceSelector.MatchExpressions {
if me.Key == internal.MetadataNameLabelKey {
if len(me.Values) != len(blockDevices) {
Expand Down
38 changes: 36 additions & 2 deletions images/agent/src/pkg/controller/lvm_volume_group_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,38 @@ func TestLVMVolumeGroupWatcherCtrl(t *testing.T) {
}
})

t.Run("validation_fails_due_to_bd_left_the_selector", func(t *testing.T) {
lvg := &v1alpha1.LVMVolumeGroup{
Status: v1alpha1.LVMVolumeGroupStatus{
Nodes: []v1alpha1.LVMVolumeGroupNode{
{
Devices: []v1alpha1.LVMVolumeGroupDevice{
{
BlockDevice: "first",
},
{
BlockDevice: "second",
},
},
Name: "some-node",
},
},
},
}

bds := map[string]v1alpha1.BlockDevice{
"second": {
ObjectMeta: v1.ObjectMeta{
Name: "second",
},
},
}

valid, reason := validateSpecBlockDevices(lvg, bds)
assert.False(t, valid)
assert.Equal(t, "these BlockDevices no longer match the blockDeviceSelector: first", reason)
})

t.Run("validation_fails_due_to_bd_has_dif_node", func(t *testing.T) {
const (
nodeName = "nodeName"
Expand Down Expand Up @@ -762,8 +794,9 @@ func TestLVMVolumeGroupWatcherCtrl(t *testing.T) {
},
}

valid, _ := validateSpecBlockDevices(lvg, bds)
valid, reason := validateSpecBlockDevices(lvg, bds)
assert.False(t, valid)
assert.Equal(t, "block devices second have different node names from LVMVolumeGroup Local.NodeName", reason)
})

t.Run("validation_fails_due_to_no_block_devices_were_found", func(t *testing.T) {
Expand All @@ -787,8 +820,9 @@ func TestLVMVolumeGroupWatcherCtrl(t *testing.T) {
},
}

valid, _ := validateSpecBlockDevices(lvg, nil)
valid, reason := validateSpecBlockDevices(lvg, nil)
assert.False(t, valid)
assert.Equal(t, "none of specified BlockDevices were found", reason)
})

t.Run("validation_fails_due_to_some_blockdevice_were_not_found", func(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions images/sds-health-watcher-controller/src/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ func main() {
os.Exit(1)
}

err = controller.RunBlockDeviceLabelsWatcher(mgr, *log)
if err != nil {
log.Error(err, "[main] unable to run BlockDeviceWatcher controller")
os.Exit(1)
}

if err = mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
log.Error(err, "[main] unable to mgr.AddHealthzCheck")
os.Exit(1)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package controller

import (
"context"
"fmt"
"reflect"

"github.com/deckhouse/sds-node-configurator/api/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

"sds-health-watcher-controller/pkg/logger"
)

const (
BlockDeviceWatcherCtrlName = "block-device-labels-watcher-controller"

// LVGUpdateTriggerLabel if you change this value, you must change its value in agent/src/internal/const.go as well
LVGUpdateTriggerLabel = "update-trigger"
)

func RunBlockDeviceLabelsWatcher(
mgr manager.Manager,
log logger.Logger,
) error {
cl := mgr.GetClient()

c, err := controller.New(BlockDeviceWatcherCtrlName, mgr, controller.Options{
Reconciler: reconcile.Func(func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
log.Info(fmt.Sprintf("[RunBlockDeviceLabelsWatcher] starts to reconcile the BlockDevice %s", request.Name))

bd := &v1alpha1.BlockDevice{}
err := cl.Get(ctx, request.NamespacedName, bd)
if err != nil {
if errors.IsNotFound(err) {
log.Warning(fmt.Sprintf("[RunBlockDeviceLabelsWatcher] seems like the BlockDevice %s was removed as it was not found. Stop the reconcile", request.Name))
return reconcile.Result{}, nil
}

log.Error(err, fmt.Sprintf("[RunBlockDeviceLabelsWatcher] unable to get the BlockDevice %s", request.Name))
return reconcile.Result{}, err
}

err = ReconcileBlockDeviceLabels(ctx, cl, log, bd)
if err != nil {
log.Error(err, fmt.Sprintf("[RunBlockDeviceLabelsWatcher] unable to reconcile the BlockDevice %s", bd.Name))
return reconcile.Result{}, err
}
log.Info(fmt.Sprintf("[RunBlockDeviceLabelsWatcher] the BlockDevice %s was successfully reconciled", bd.Name))
return reconcile.Result{}, nil
}),
})
if err != nil {
log.Error(err, "[RunBlockDeviceLabelsWatcher] unable to create the controller")
return err
}

err = c.Watch(source.Kind(mgr.GetCache(), &v1alpha1.BlockDevice{}, handler.TypedFuncs[*v1alpha1.BlockDevice, reconcile.Request]{
CreateFunc: func(_ context.Context, e event.TypedCreateEvent[*v1alpha1.BlockDevice], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
log.Debug(fmt.Sprintf("[RunBlockDeviceLabelsWatcher] got a Create event for the BlockDevice %s", e.Object.Name))
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Namespace: e.Object.GetNamespace(), Name: e.Object.GetName()}})
log.Debug(fmt.Sprintf("[RunBlockDeviceLabelsWatcher] the BlockDevice %s was added to the Reconciler's queue", e.Object.Name))
},
UpdateFunc: func(_ context.Context, e event.TypedUpdateEvent[*v1alpha1.BlockDevice], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
log.Debug(fmt.Sprintf("[RunBlockDeviceLabelsWatcher] got an Update event for the BlockDevice %s", e.ObjectNew.Name))

if reflect.DeepEqual(e.ObjectOld.Labels, e.ObjectNew.Labels) {
log.Debug(fmt.Sprintf("[RunBlockDeviceLabelsWatcher] no need to reconcile the BlockDevice %s as its labels are the same", e.ObjectNew.Name))
return
}

q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Namespace: e.ObjectNew.GetNamespace(), Name: e.ObjectNew.GetName()}})
log.Debug(fmt.Sprintf("[RunBlockDeviceLabelsWatcher] the BlockDevice %s was added to the Reconciler's queue", e.ObjectNew.Name))
},
}))
if err != nil {
log.Error(err, "[RunBlockDeviceLabelsWatcher] unable to controller.Watch")
return err
}

return nil
}

func ReconcileBlockDeviceLabels(ctx context.Context, cl client.Client, log logger.Logger, blockDevice *v1alpha1.BlockDevice) error {
log.Info(fmt.Sprintf("[ReconcileBlockDeviceLabels] starts the reconciliation for the BlockDevice %s", blockDevice.Name))
log.Debug("[ReconcileBlockDeviceLabels] tries to list LVMVolumeGroups")
lvgList := &v1alpha1.LVMVolumeGroupList{}
err := cl.List(ctx, lvgList)
if err != nil {
return err
}
log.Debug("[ReconcileBlockDeviceLabels] successfully listed LVMVolumeGroups")

for _, lvg := range lvgList.Items {
log.Debug(fmt.Sprintf("[ReconcileBlockDeviceLabels] tries to configure a selector from blockDeviceSelector of the LVMVolumeGroup %s", lvg.Name))
selector, err := metav1.LabelSelectorAsSelector(lvg.Spec.BlockDeviceSelector)
if err != nil {
return err
}
log.Debug(fmt.Sprintf("[ReconcileBlockDeviceLabels] successfully configured a selector from blockDeviceSelector of the LVMVolumeGroup %s", lvg.Name))

usedBdNames := make(map[string]struct{}, len(lvg.Status.Nodes[0].Devices))
for _, n := range lvg.Status.Nodes {
for _, d := range n.Devices {
usedBdNames[d.BlockDevice] = struct{}{}
}
}
if lvg.Labels == nil {
lvg.Labels = make(map[string]string)
}
if selector.Matches(labels.Set(blockDevice.Labels)) {
log.Debug(fmt.Sprintf("[ReconcileBlockDeviceLabels] BlockDevice %s matches a blockDeviceSelector of the LVMVolumeGroup %s", blockDevice.Name, lvg.Name))
if _, used := usedBdNames[blockDevice.Name]; !used {
log.Debug(fmt.Sprintf("[ReconcileBlockDeviceLabels] the BlockDevice %s matches the LVMVolumeGroup %s blockDeviceSelector, but is not used yet. Add the label %s to provide LVMVolumeGroup resource configuration update", blockDevice.Name, lvg.Name, LVGUpdateTriggerLabel))
lvg.Labels[LVGUpdateTriggerLabel] = "true"

err = cl.Update(ctx, &lvg)
if err != nil {
return err
}
log.Info(fmt.Sprintf("[ReconcileBlockDeviceLabels] successfully added the label %s to provide LVMVolumeGroup %s resource configuration update", LVGUpdateTriggerLabel, lvg.Name))
continue
}

log.Info(fmt.Sprintf("[ReconcileBlockDeviceLabels] the BlockDevice %s matches the LVMVolumeGroup %s blockDeviceSelector and already used by the resource", blockDevice.Name, lvg.Name))
continue
}

log.Debug(fmt.Sprintf("[ReconcileBlockDeviceLabels] BlockDevice %s does not match a blockDeviceSelector of the LVMVolumeGroup %s", blockDevice.Name, lvg.Name))
if _, used := usedBdNames[blockDevice.Name]; used {
log.Warning(fmt.Sprintf("[ReconcileBlockDeviceLabels] the BlockDevice %s does not match the LVMVolumeGroup %s blockDeviceSelector, but is used by the resource. Add the label %s to provide LVMVolumeGroup resource configuration update", blockDevice.Name, lvg.Name, LVGUpdateTriggerLabel))
lvg.Labels[LVGUpdateTriggerLabel] = "true"

err = cl.Update(ctx, &lvg)
if err != nil {
return err
}
log.Info(fmt.Sprintf("[ReconcileBlockDeviceLabels] successfully added the label %s to provide LVMVolumeGroup %s resource configuration update", LVGUpdateTriggerLabel, lvg.Name))
continue
}

log.Info(fmt.Sprintf("[ReconcileBlockDeviceLabels] the BlockDevice %s does not match the LVMVolumeGroup %s blockDeviceSelector and is not used by the resource", blockDevice.Name, lvg.Name))
}

return nil
}
Loading

0 comments on commit 2e4a5e4

Please sign in to comment.