Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Add labels to BlockDevices #76

Merged
merged 16 commits into from
Aug 28, 2024
110 changes: 76 additions & 34 deletions images/agent/src/pkg/controller/block_device.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"crypto/sha1"
"fmt"
"os"
"reflect"
"regexp"
"strconv"
"strings"
"time"

Expand All @@ -42,7 +44,8 @@ import (
)

const (
BlockDeviceCtrlName = "block-device-controller"
BlockDeviceCtrlName = "block-device-controller"
BlockDeviceLabelPrefix = "status.blockdevice.storage.deckhouse.io"
)

func RunBlockDeviceController(
Expand Down Expand Up @@ -101,29 +104,31 @@ func BlockDeviceReconcile(ctx context.Context, cl kclient.Client, log logger.Log

// create new API devices
for _, candidate := range candidates {
if blockDevice, exist := apiBlockDevices[candidate.Name]; exist {
if !hasBlockDeviceDiff(blockDevice.Status, candidate) {
blockDevice, exist := apiBlockDevices[candidate.Name]
if exist {
if !hasBlockDeviceDiff(blockDevice, candidate) {
log.Debug(fmt.Sprintf(`[RunBlockDeviceController] no data to update for block device, name: "%s"`, candidate.Name))
continue
}

if err := UpdateAPIBlockDevice(ctx, cl, metrics, blockDevice, candidate); err != nil {
if err = UpdateAPIBlockDevice(ctx, cl, metrics, blockDevice, candidate); err != nil {
log.Error(err, "[RunBlockDeviceController] unable to update blockDevice, name: %s", blockDevice.Name)
continue
}

log.Info(fmt.Sprintf(`[RunBlockDeviceController] updated APIBlockDevice, name: %s`, blockDevice.Name))
} else {
device, err := CreateAPIBlockDevice(ctx, cl, metrics, candidate)
if err != nil {
log.Error(err, fmt.Sprintf("[RunBlockDeviceController] unable to create block device blockDevice, name: %s", candidate.Name))
continue
}
log.Info(fmt.Sprintf("[RunBlockDeviceController] created new APIBlockDevice: %s", candidate.Name))
continue
}

// add new api device to the map, so it won't be deleted as fantom
apiBlockDevices[candidate.Name] = *device
device, err := CreateAPIBlockDevice(ctx, cl, metrics, candidate)
if err != nil {
log.Error(err, fmt.Sprintf("[RunBlockDeviceController] unable to create block device blockDevice, name: %s", candidate.Name))
continue
}
log.Info(fmt.Sprintf("[RunBlockDeviceController] created new APIBlockDevice: %s", candidate.Name))

// add new api device to the map, so it won't be deleted as fantom
apiBlockDevices[candidate.Name] = *device
}

// delete api device if device no longer exists, but we still have its api resource
Expand All @@ -136,24 +141,25 @@ func BlockDeviceReconcile(ctx context.Context, cl kclient.Client, log logger.Log
return false
}

func hasBlockDeviceDiff(res v1alpha1.BlockDeviceStatus, candidate internal.BlockDeviceCandidate) bool {
return candidate.NodeName != res.NodeName ||
candidate.Consumable != res.Consumable ||
candidate.PVUuid != res.PVUuid ||
candidate.VGUuid != res.VGUuid ||
candidate.PartUUID != res.PartUUID ||
candidate.LvmVolumeGroupName != res.LvmVolumeGroupName ||
candidate.ActualVGNameOnTheNode != res.ActualVGNameOnTheNode ||
candidate.Wwn != res.Wwn ||
candidate.Serial != res.Serial ||
candidate.Path != res.Path ||
candidate.Size.Value() != res.Size.Value() ||
candidate.Rota != res.Rota ||
candidate.Model != res.Model ||
candidate.HotPlug != res.HotPlug ||
candidate.Type != res.Type ||
candidate.FSType != res.FsType ||
candidate.MachineID != res.MachineID
func hasBlockDeviceDiff(blockDevice v1alpha1.BlockDevice, candidate internal.BlockDeviceCandidate) bool {
return candidate.NodeName != blockDevice.Status.NodeName ||
candidate.Consumable != blockDevice.Status.Consumable ||
candidate.PVUuid != blockDevice.Status.PVUuid ||
candidate.VGUuid != blockDevice.Status.VGUuid ||
candidate.PartUUID != blockDevice.Status.PartUUID ||
candidate.LvmVolumeGroupName != blockDevice.Status.LvmVolumeGroupName ||
candidate.ActualVGNameOnTheNode != blockDevice.Status.ActualVGNameOnTheNode ||
candidate.Wwn != blockDevice.Status.Wwn ||
candidate.Serial != blockDevice.Status.Serial ||
candidate.Path != blockDevice.Status.Path ||
candidate.Size.Value() != blockDevice.Status.Size.Value() ||
candidate.Rota != blockDevice.Status.Rota ||
candidate.Model != blockDevice.Status.Model ||
candidate.HotPlug != blockDevice.Status.HotPlug ||
candidate.Type != blockDevice.Status.Type ||
candidate.FSType != blockDevice.Status.FsType ||
candidate.MachineID != blockDevice.Status.MachineID ||
!reflect.DeepEqual(ConfigureBlockDeviceLabels(blockDevice), blockDevice.Labels)
}

func GetAPIBlockDevices(ctx context.Context, kc kclient.Client, metrics monitoring.Metrics) (map[string]v1alpha1.BlockDevice, error) {
Expand Down Expand Up @@ -517,6 +523,8 @@ func UpdateAPIBlockDevice(ctx context.Context, kc kclient.Client, metrics monito
MachineID: candidate.MachineID,
}

blockDevice.Labels = ConfigureBlockDeviceLabels(blockDevice)

start := time.Now()
err := kc.Update(ctx, &blockDevice)
metrics.APIMethodsDuration(BlockDeviceCtrlName, "update").Observe(metrics.GetEstimatedTimeInSeconds(start))
Expand All @@ -529,8 +537,40 @@ func UpdateAPIBlockDevice(ctx context.Context, kc kclient.Client, metrics monito
return nil
}

func ConfigureBlockDeviceLabels(blockDevice v1alpha1.BlockDevice) map[string]string {
var labels map[string]string
if blockDevice.Labels == nil {
labels = make(map[string]string, 16)
} else {
labels = make(map[string]string, len(blockDevice.Labels))
}

for key, value := range blockDevice.Labels {
labels[key] = value
}

labels["kubernetes.io/metadata.name"] = blockDevice.ObjectMeta.Name
labels["kubernetes.io/hostname"] = blockDevice.Status.NodeName
labels[BlockDeviceLabelPrefix+"/type"] = blockDevice.Status.Type
labels[BlockDeviceLabelPrefix+"/fstype"] = blockDevice.Status.FsType
labels[BlockDeviceLabelPrefix+"/pvuuid"] = blockDevice.Status.PVUuid
labels[BlockDeviceLabelPrefix+"/vguuid"] = blockDevice.Status.VGUuid
labels[BlockDeviceLabelPrefix+"/partuuid"] = blockDevice.Status.PartUUID
labels[BlockDeviceLabelPrefix+"/lvmvolumegroupname"] = blockDevice.Status.LvmVolumeGroupName
labels[BlockDeviceLabelPrefix+"/actualvgnameonthenode"] = blockDevice.Status.ActualVGNameOnTheNode
labels[BlockDeviceLabelPrefix+"/wwn"] = blockDevice.Status.Wwn
labels[BlockDeviceLabelPrefix+"/serial"] = blockDevice.Status.Serial
labels[BlockDeviceLabelPrefix+"/size"] = blockDevice.Status.Size.String()
labels[BlockDeviceLabelPrefix+"/model"] = blockDevice.Status.Model
labels[BlockDeviceLabelPrefix+"/rota"] = strconv.FormatBool(blockDevice.Status.Rota)
labels[BlockDeviceLabelPrefix+"/hotplug"] = strconv.FormatBool(blockDevice.Status.HotPlug)
labels[BlockDeviceLabelPrefix+"/machineid"] = blockDevice.Status.MachineID

return labels
}

func CreateAPIBlockDevice(ctx context.Context, kc kclient.Client, metrics monitoring.Metrics, candidate internal.BlockDeviceCandidate) (*v1alpha1.BlockDevice, error) {
device := &v1alpha1.BlockDevice{
blockDevice := &v1alpha1.BlockDevice{
ObjectMeta: metav1.ObjectMeta{
Name: candidate.Name,
},
Expand All @@ -554,15 +594,17 @@ func CreateAPIBlockDevice(ctx context.Context, kc kclient.Client, metrics monito
},
}

blockDevice.Labels = ConfigureBlockDeviceLabels(*blockDevice)
start := time.Now()
err := kc.Create(ctx, device)

err := kc.Create(ctx, blockDevice)
metrics.APIMethodsDuration(BlockDeviceCtrlName, "create").Observe(metrics.GetEstimatedTimeInSeconds(start))
metrics.APIMethodsExecutionCount(BlockDeviceCtrlName, "create").Inc()
if err != nil {
metrics.APIMethodsErrors(BlockDeviceCtrlName, "create").Inc()
return nil, err
}
return device, nil
return blockDevice, nil
}

func DeleteAPIBlockDevice(ctx context.Context, kc kclient.Client, metrics monitoring.Metrics, device *v1alpha1.BlockDevice) error {
Expand Down
55 changes: 54 additions & 1 deletion images/agent/src/pkg/controller/block_device_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"context"
"fmt"
"strconv"
"testing"

"github.com/deckhouse/sds-node-configurator/api/v1alpha1"
Expand Down Expand Up @@ -318,6 +319,56 @@ func TestBlockDeviceCtrl(t *testing.T) {
}
})

t.Run("ConfigureBlockDeviceLabels", func(t *testing.T) {
blockDevice := v1alpha1.BlockDevice{
Status: v1alpha1.BlockDeviceStatus{
Type: "testTYPE",
FsType: "testFS",
NodeName: "test_node",
Consumable: false,
PVUuid: "testPV",
VGUuid: "testVGUID",
LvmVolumeGroupName: "testLVGName",
ActualVGNameOnTheNode: "testNameOnNode",
Wwn: "testWWN",
Serial: "testSERIAL",
Path: "testPATH",
Size: resource.MustParse("0"),
Model: "testMODEL",
Rota: false,
HotPlug: false,
MachineID: "testMACHINE",
},
}
blockDevice.Labels = map[string]string{
"some-custom-label1": "v",
"some-custom-label2": "v",
}

expectedLabels := map[string]string{
"kubernetes.io/metadata.name": blockDevice.ObjectMeta.Name,
"kubernetes.io/hostname": blockDevice.Status.NodeName,
BlockDeviceLabelPrefix + "/type": blockDevice.Status.Type,
BlockDeviceLabelPrefix + "/fstype": blockDevice.Status.FsType,
BlockDeviceLabelPrefix + "/pvuuid": blockDevice.Status.PVUuid,
BlockDeviceLabelPrefix + "/vguuid": blockDevice.Status.VGUuid,
BlockDeviceLabelPrefix + "/partuuid": blockDevice.Status.PartUUID,
BlockDeviceLabelPrefix + "/lvmvolumegroupname": blockDevice.Status.LvmVolumeGroupName,
BlockDeviceLabelPrefix + "/actualvgnameonthenode": blockDevice.Status.ActualVGNameOnTheNode,
BlockDeviceLabelPrefix + "/wwn": blockDevice.Status.Wwn,
BlockDeviceLabelPrefix + "/serial": blockDevice.Status.Serial,
BlockDeviceLabelPrefix + "/size": blockDevice.Status.Size.String(),
BlockDeviceLabelPrefix + "/model": blockDevice.Status.Model,
BlockDeviceLabelPrefix + "/rota": strconv.FormatBool(blockDevice.Status.Rota),
BlockDeviceLabelPrefix + "/hotplug": strconv.FormatBool(blockDevice.Status.HotPlug),
BlockDeviceLabelPrefix + "/machineid": blockDevice.Status.MachineID,
"some-custom-label1": "v",
"some-custom-label2": "v",
}

assert.Equal(t, expectedLabels, ConfigureBlockDeviceLabels(blockDevice))
})

t.Run("hasBlockDeviceDiff", func(t *testing.T) {
candidates := []internal.BlockDeviceCandidate{
// same state
Expand Down Expand Up @@ -385,10 +436,12 @@ func TestBlockDeviceCtrl(t *testing.T) {
MachineID: "testMACHINE",
},
}
labels := ConfigureBlockDeviceLabels(blockDevice)
blockDevice.Labels = labels
expected := []bool{false, true}

for i, candidate := range candidates {
actual := hasBlockDeviceDiff(blockDevice.Status, candidate)
actual := hasBlockDeviceDiff(blockDevice, candidate)
assert.Equal(t, expected[i], actual)
}
})
Expand Down
14 changes: 14 additions & 0 deletions images/agent/src/pkg/controller/lvm_volume_group_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (

const (
LVMVolumeGroupWatcherCtrlName = "lvm-volume-group-watcher-controller"
LVGMetadateNameLabelKey = "kubernetes.io/metadata.name"
)

func RunLVMVolumeGroupWatcherController(
Expand Down Expand Up @@ -123,6 +124,19 @@ func RunLVMVolumeGroupWatcherController(
}
log.Debug(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] the LVMVolumeGroup %s belongs to the node %s. Starts to reconcile", lvg.Name, cfg.NodeName))

log.Debug(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] tries to add label %s to the LVMVolumeGroup %s", LVGMetadateNameLabelKey, cfg.NodeName))
added, err = addLVGLabelIfNeeded(ctx, cl, log, lvg, LVGMetadateNameLabelKey, lvg.Name)
if err != nil {
log.Error(err, fmt.Sprintf("[RunLVMVolumeGroupWatcherController] unable to add label %s to the LVMVolumeGroup %s", LVGMetadateNameLabelKey, lvg.Name))
return reconcile.Result{}, err
}

if added {
log.Debug(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] successfully added label %s to the LVMVolumeGroup %s", LVGMetadateNameLabelKey, lvg.Name))
} else {
log.Debug(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] no need to add label %s to the LVMVolumeGroup %s", LVGMetadateNameLabelKey, lvg.Name))
}

// this case handles the situation when a user decides to remove LVMVolumeGroup resource without created VG
vgs, _ := sdsCache.GetVGs()
if !checkIfVGExist(lvg.Spec.ActualVGNameOnTheNode, vgs) && lvg.DeletionTimestamp != nil {
Expand Down
43 changes: 43 additions & 0 deletions images/agent/src/pkg/controller/lvm_volume_group_watcher_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,37 @@ func checkIfVGExist(vgName string, vgs []internal.VGData) bool {
return false
}

func shouldUpdateLVGLabels(log logger.Logger, lvg *v1alpha1.LvmVolumeGroup, labelKey, labelValue string) bool {
if lvg.Labels == nil {
log.Debug(fmt.Sprintf("[shouldUpdateLVGLabels] the LVMVolumeGroup %s has no labels.", lvg.Name))
return true
}

val, exist := lvg.Labels[labelKey]
if !exist {
log.Debug(fmt.Sprintf("[shouldUpdateLVGLabels] the LVMVolumeGroup %s has no label %s.", lvg.Name, labelKey))
return true
}

if val != labelValue {
log.Debug(fmt.Sprintf("[shouldUpdateLVGLabels] the LVMVolumeGroup %s has label %s but the value is incorrect - %s (should be %s)", lvg.Name, labelKey, val, labelValue))
return true
}

return false
}

func shouldLVGWatcherReconcileUpdateEvent(log logger.Logger, oldLVG, newLVG *v1alpha1.LvmVolumeGroup) bool {
if newLVG.DeletionTimestamp != nil {
log.Debug(fmt.Sprintf("[shouldLVGWatcherReconcileUpdateEvent] update event should be reconciled as the LVMVolumeGroup %s has deletionTimestamp", newLVG.Name))
return true
}

if shouldUpdateLVGLabels(log, newLVG, LVGMetadateNameLabelKey, newLVG.Name) {
log.Debug(fmt.Sprintf("[shouldLVGWatcherReconcileUpdateEvent] update event should be reconciled as the LVMVolumeGroup's %s labels have been changed", newLVG.Name))
return true
}

if !reflect.DeepEqual(oldLVG.Spec, newLVG.Spec) {
log.Debug(fmt.Sprintf("[shouldLVGWatcherReconcileUpdateEvent] update event should be reconciled as the LVMVolumeGroup %s configuration has been changed", newLVG.Name))
return true
Expand Down Expand Up @@ -947,3 +972,21 @@ func ExtendThinPool(log logger.Logger, metrics monitoring.Metrics, lvg *v1alpha1

return nil
}

func addLVGLabelIfNeeded(ctx context.Context, cl client.Client, log logger.Logger, lvg *v1alpha1.LvmVolumeGroup, labelKey, labelValue string) (bool, error) {
if !shouldUpdateLVGLabels(log, lvg, labelKey, labelValue) {
return false, nil
}

if lvg.Labels == nil {
lvg.Labels = make(map[string]string)
}

lvg.Labels[labelKey] = labelValue
err := cl.Update(ctx, lvg)
if err != nil {
return false, err
}

return true, nil
}
Loading
Loading