diff --git a/images/agent/src/pkg/controller/block_device.go b/images/agent/src/pkg/controller/block_device.go index 8f616d15..c0e5bc32 100644 --- a/images/agent/src/pkg/controller/block_device.go +++ b/images/agent/src/pkg/controller/block_device.go @@ -21,7 +21,9 @@ import ( "crypto/sha1" "fmt" "os" + "reflect" "regexp" + "strconv" "strings" "time" @@ -42,7 +44,8 @@ import ( ) const ( - BlockDeviceCtrlName = "block-device-controller" + BlockDeviceCtrlName = "block-device-controller" + BlockDeviceLabelPrefix = "status.blockdevice.storage.deckhouse.io" ) func RunBlockDeviceController( @@ -101,29 +104,31 @@ func BlockDeviceReconcile(ctx context.Context, cl kclient.Client, log logger.Log // create new API devices for _, candidate := range candidates { - if blockDevice, exist := apiBlockDevices[candidate.Name]; exist { - if !hasBlockDeviceDiff(blockDevice.Status, candidate) { + blockDevice, exist := apiBlockDevices[candidate.Name] + if exist { + if !hasBlockDeviceDiff(blockDevice, candidate) { log.Debug(fmt.Sprintf(`[RunBlockDeviceController] no data to update for block device, name: "%s"`, candidate.Name)) continue } - if err := UpdateAPIBlockDevice(ctx, cl, metrics, blockDevice, candidate); err != nil { + if err = UpdateAPIBlockDevice(ctx, cl, metrics, blockDevice, candidate); err != nil { log.Error(err, "[RunBlockDeviceController] unable to update blockDevice, name: %s", blockDevice.Name) continue } log.Info(fmt.Sprintf(`[RunBlockDeviceController] updated APIBlockDevice, name: %s`, blockDevice.Name)) - } else { - device, err := CreateAPIBlockDevice(ctx, cl, metrics, candidate) - if err != nil { - log.Error(err, fmt.Sprintf("[RunBlockDeviceController] unable to create block device blockDevice, name: %s", candidate.Name)) - continue - } - log.Info(fmt.Sprintf("[RunBlockDeviceController] created new APIBlockDevice: %s", candidate.Name)) + continue + } - // add new api device to the map, so it won't be deleted as fantom - apiBlockDevices[candidate.Name] = *device + device, err := CreateAPIBlockDevice(ctx, cl, metrics, candidate) + if err != nil { + log.Error(err, fmt.Sprintf("[RunBlockDeviceController] unable to create block device blockDevice, name: %s", candidate.Name)) + continue } + log.Info(fmt.Sprintf("[RunBlockDeviceController] created new APIBlockDevice: %s", candidate.Name)) + + // add new api device to the map, so it won't be deleted as fantom + apiBlockDevices[candidate.Name] = *device } // delete api device if device no longer exists, but we still have its api resource @@ -136,24 +141,25 @@ func BlockDeviceReconcile(ctx context.Context, cl kclient.Client, log logger.Log return false } -func hasBlockDeviceDiff(res v1alpha1.BlockDeviceStatus, candidate internal.BlockDeviceCandidate) bool { - return candidate.NodeName != res.NodeName || - candidate.Consumable != res.Consumable || - candidate.PVUuid != res.PVUuid || - candidate.VGUuid != res.VGUuid || - candidate.PartUUID != res.PartUUID || - candidate.LvmVolumeGroupName != res.LvmVolumeGroupName || - candidate.ActualVGNameOnTheNode != res.ActualVGNameOnTheNode || - candidate.Wwn != res.Wwn || - candidate.Serial != res.Serial || - candidate.Path != res.Path || - candidate.Size.Value() != res.Size.Value() || - candidate.Rota != res.Rota || - candidate.Model != res.Model || - candidate.HotPlug != res.HotPlug || - candidate.Type != res.Type || - candidate.FSType != res.FsType || - candidate.MachineID != res.MachineID +func hasBlockDeviceDiff(blockDevice v1alpha1.BlockDevice, candidate internal.BlockDeviceCandidate) bool { + return candidate.NodeName != blockDevice.Status.NodeName || + candidate.Consumable != blockDevice.Status.Consumable || + candidate.PVUuid != blockDevice.Status.PVUuid || + candidate.VGUuid != blockDevice.Status.VGUuid || + candidate.PartUUID != blockDevice.Status.PartUUID || + candidate.LvmVolumeGroupName != blockDevice.Status.LvmVolumeGroupName || + candidate.ActualVGNameOnTheNode != blockDevice.Status.ActualVGNameOnTheNode || + candidate.Wwn != blockDevice.Status.Wwn || + candidate.Serial != blockDevice.Status.Serial || + candidate.Path != blockDevice.Status.Path || + candidate.Size.Value() != blockDevice.Status.Size.Value() || + candidate.Rota != blockDevice.Status.Rota || + candidate.Model != blockDevice.Status.Model || + candidate.HotPlug != blockDevice.Status.HotPlug || + candidate.Type != blockDevice.Status.Type || + candidate.FSType != blockDevice.Status.FsType || + candidate.MachineID != blockDevice.Status.MachineID || + !reflect.DeepEqual(ConfigureBlockDeviceLabels(blockDevice), blockDevice.Labels) } func GetAPIBlockDevices(ctx context.Context, kc kclient.Client, metrics monitoring.Metrics) (map[string]v1alpha1.BlockDevice, error) { @@ -517,6 +523,8 @@ func UpdateAPIBlockDevice(ctx context.Context, kc kclient.Client, metrics monito MachineID: candidate.MachineID, } + blockDevice.Labels = ConfigureBlockDeviceLabels(blockDevice) + start := time.Now() err := kc.Update(ctx, &blockDevice) metrics.APIMethodsDuration(BlockDeviceCtrlName, "update").Observe(metrics.GetEstimatedTimeInSeconds(start)) @@ -529,8 +537,40 @@ func UpdateAPIBlockDevice(ctx context.Context, kc kclient.Client, metrics monito return nil } +func ConfigureBlockDeviceLabels(blockDevice v1alpha1.BlockDevice) map[string]string { + var labels map[string]string + if blockDevice.Labels == nil { + labels = make(map[string]string, 16) + } else { + labels = make(map[string]string, len(blockDevice.Labels)) + } + + for key, value := range blockDevice.Labels { + labels[key] = value + } + + labels["kubernetes.io/metadata.name"] = blockDevice.ObjectMeta.Name + labels["kubernetes.io/hostname"] = blockDevice.Status.NodeName + labels[BlockDeviceLabelPrefix+"/type"] = blockDevice.Status.Type + labels[BlockDeviceLabelPrefix+"/fstype"] = blockDevice.Status.FsType + labels[BlockDeviceLabelPrefix+"/pvuuid"] = blockDevice.Status.PVUuid + labels[BlockDeviceLabelPrefix+"/vguuid"] = blockDevice.Status.VGUuid + labels[BlockDeviceLabelPrefix+"/partuuid"] = blockDevice.Status.PartUUID + labels[BlockDeviceLabelPrefix+"/lvmvolumegroupname"] = blockDevice.Status.LvmVolumeGroupName + labels[BlockDeviceLabelPrefix+"/actualvgnameonthenode"] = blockDevice.Status.ActualVGNameOnTheNode + labels[BlockDeviceLabelPrefix+"/wwn"] = blockDevice.Status.Wwn + labels[BlockDeviceLabelPrefix+"/serial"] = blockDevice.Status.Serial + labels[BlockDeviceLabelPrefix+"/size"] = blockDevice.Status.Size.String() + labels[BlockDeviceLabelPrefix+"/model"] = blockDevice.Status.Model + labels[BlockDeviceLabelPrefix+"/rota"] = strconv.FormatBool(blockDevice.Status.Rota) + labels[BlockDeviceLabelPrefix+"/hotplug"] = strconv.FormatBool(blockDevice.Status.HotPlug) + labels[BlockDeviceLabelPrefix+"/machineid"] = blockDevice.Status.MachineID + + return labels +} + func CreateAPIBlockDevice(ctx context.Context, kc kclient.Client, metrics monitoring.Metrics, candidate internal.BlockDeviceCandidate) (*v1alpha1.BlockDevice, error) { - device := &v1alpha1.BlockDevice{ + blockDevice := &v1alpha1.BlockDevice{ ObjectMeta: metav1.ObjectMeta{ Name: candidate.Name, }, @@ -554,15 +594,17 @@ func CreateAPIBlockDevice(ctx context.Context, kc kclient.Client, metrics monito }, } + blockDevice.Labels = ConfigureBlockDeviceLabels(*blockDevice) start := time.Now() - err := kc.Create(ctx, device) + + err := kc.Create(ctx, blockDevice) metrics.APIMethodsDuration(BlockDeviceCtrlName, "create").Observe(metrics.GetEstimatedTimeInSeconds(start)) metrics.APIMethodsExecutionCount(BlockDeviceCtrlName, "create").Inc() if err != nil { metrics.APIMethodsErrors(BlockDeviceCtrlName, "create").Inc() return nil, err } - return device, nil + return blockDevice, nil } func DeleteAPIBlockDevice(ctx context.Context, kc kclient.Client, metrics monitoring.Metrics, device *v1alpha1.BlockDevice) error { diff --git a/images/agent/src/pkg/controller/block_device_test.go b/images/agent/src/pkg/controller/block_device_test.go index c70a802e..7bb31398 100644 --- a/images/agent/src/pkg/controller/block_device_test.go +++ b/images/agent/src/pkg/controller/block_device_test.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "fmt" + "strconv" "testing" "github.com/deckhouse/sds-node-configurator/api/v1alpha1" @@ -318,6 +319,56 @@ func TestBlockDeviceCtrl(t *testing.T) { } }) + t.Run("ConfigureBlockDeviceLabels", func(t *testing.T) { + blockDevice := v1alpha1.BlockDevice{ + Status: v1alpha1.BlockDeviceStatus{ + Type: "testTYPE", + FsType: "testFS", + NodeName: "test_node", + Consumable: false, + PVUuid: "testPV", + VGUuid: "testVGUID", + LvmVolumeGroupName: "testLVGName", + ActualVGNameOnTheNode: "testNameOnNode", + Wwn: "testWWN", + Serial: "testSERIAL", + Path: "testPATH", + Size: resource.MustParse("0"), + Model: "testMODEL", + Rota: false, + HotPlug: false, + MachineID: "testMACHINE", + }, + } + blockDevice.Labels = map[string]string{ + "some-custom-label1": "v", + "some-custom-label2": "v", + } + + expectedLabels := map[string]string{ + "kubernetes.io/metadata.name": blockDevice.ObjectMeta.Name, + "kubernetes.io/hostname": blockDevice.Status.NodeName, + BlockDeviceLabelPrefix + "/type": blockDevice.Status.Type, + BlockDeviceLabelPrefix + "/fstype": blockDevice.Status.FsType, + BlockDeviceLabelPrefix + "/pvuuid": blockDevice.Status.PVUuid, + BlockDeviceLabelPrefix + "/vguuid": blockDevice.Status.VGUuid, + BlockDeviceLabelPrefix + "/partuuid": blockDevice.Status.PartUUID, + BlockDeviceLabelPrefix + "/lvmvolumegroupname": blockDevice.Status.LvmVolumeGroupName, + BlockDeviceLabelPrefix + "/actualvgnameonthenode": blockDevice.Status.ActualVGNameOnTheNode, + BlockDeviceLabelPrefix + "/wwn": blockDevice.Status.Wwn, + BlockDeviceLabelPrefix + "/serial": blockDevice.Status.Serial, + BlockDeviceLabelPrefix + "/size": blockDevice.Status.Size.String(), + BlockDeviceLabelPrefix + "/model": blockDevice.Status.Model, + BlockDeviceLabelPrefix + "/rota": strconv.FormatBool(blockDevice.Status.Rota), + BlockDeviceLabelPrefix + "/hotplug": strconv.FormatBool(blockDevice.Status.HotPlug), + BlockDeviceLabelPrefix + "/machineid": blockDevice.Status.MachineID, + "some-custom-label1": "v", + "some-custom-label2": "v", + } + + assert.Equal(t, expectedLabels, ConfigureBlockDeviceLabels(blockDevice)) + }) + t.Run("hasBlockDeviceDiff", func(t *testing.T) { candidates := []internal.BlockDeviceCandidate{ // same state @@ -385,10 +436,12 @@ func TestBlockDeviceCtrl(t *testing.T) { MachineID: "testMACHINE", }, } + labels := ConfigureBlockDeviceLabels(blockDevice) + blockDevice.Labels = labels expected := []bool{false, true} for i, candidate := range candidates { - actual := hasBlockDeviceDiff(blockDevice.Status, candidate) + actual := hasBlockDeviceDiff(blockDevice, candidate) assert.Equal(t, expected[i], actual) } }) diff --git a/images/agent/src/pkg/controller/lvm_volume_group_watcher.go b/images/agent/src/pkg/controller/lvm_volume_group_watcher.go index 484fa0eb..380c2e50 100644 --- a/images/agent/src/pkg/controller/lvm_volume_group_watcher.go +++ b/images/agent/src/pkg/controller/lvm_volume_group_watcher.go @@ -43,6 +43,7 @@ import ( const ( LVMVolumeGroupWatcherCtrlName = "lvm-volume-group-watcher-controller" + LVGMetadateNameLabelKey = "kubernetes.io/metadata.name" ) func RunLVMVolumeGroupWatcherController( @@ -123,6 +124,19 @@ func RunLVMVolumeGroupWatcherController( } log.Debug(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] the LVMVolumeGroup %s belongs to the node %s. Starts to reconcile", lvg.Name, cfg.NodeName)) + log.Debug(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] tries to add label %s to the LVMVolumeGroup %s", LVGMetadateNameLabelKey, cfg.NodeName)) + added, err = addLVGLabelIfNeeded(ctx, cl, log, lvg, LVGMetadateNameLabelKey, lvg.Name) + if err != nil { + log.Error(err, fmt.Sprintf("[RunLVMVolumeGroupWatcherController] unable to add label %s to the LVMVolumeGroup %s", LVGMetadateNameLabelKey, lvg.Name)) + return reconcile.Result{}, err + } + + if added { + log.Debug(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] successfully added label %s to the LVMVolumeGroup %s", LVGMetadateNameLabelKey, lvg.Name)) + } else { + log.Debug(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] no need to add label %s to the LVMVolumeGroup %s", LVGMetadateNameLabelKey, lvg.Name)) + } + // this case handles the situation when a user decides to remove LVMVolumeGroup resource without created VG vgs, _ := sdsCache.GetVGs() if !checkIfVGExist(lvg.Spec.ActualVGNameOnTheNode, vgs) && lvg.DeletionTimestamp != nil { diff --git a/images/agent/src/pkg/controller/lvm_volume_group_watcher_func.go b/images/agent/src/pkg/controller/lvm_volume_group_watcher_func.go index a28f9ecb..0e9ea991 100644 --- a/images/agent/src/pkg/controller/lvm_volume_group_watcher_func.go +++ b/images/agent/src/pkg/controller/lvm_volume_group_watcher_func.go @@ -74,12 +74,37 @@ func checkIfVGExist(vgName string, vgs []internal.VGData) bool { return false } +func shouldUpdateLVGLabels(log logger.Logger, lvg *v1alpha1.LvmVolumeGroup, labelKey, labelValue string) bool { + if lvg.Labels == nil { + log.Debug(fmt.Sprintf("[shouldUpdateLVGLabels] the LVMVolumeGroup %s has no labels.", lvg.Name)) + return true + } + + val, exist := lvg.Labels[labelKey] + if !exist { + log.Debug(fmt.Sprintf("[shouldUpdateLVGLabels] the LVMVolumeGroup %s has no label %s.", lvg.Name, labelKey)) + return true + } + + if val != labelValue { + log.Debug(fmt.Sprintf("[shouldUpdateLVGLabels] the LVMVolumeGroup %s has label %s but the value is incorrect - %s (should be %s)", lvg.Name, labelKey, val, labelValue)) + return true + } + + return false +} + func shouldLVGWatcherReconcileUpdateEvent(log logger.Logger, oldLVG, newLVG *v1alpha1.LvmVolumeGroup) bool { if newLVG.DeletionTimestamp != nil { log.Debug(fmt.Sprintf("[shouldLVGWatcherReconcileUpdateEvent] update event should be reconciled as the LVMVolumeGroup %s has deletionTimestamp", newLVG.Name)) return true } + if shouldUpdateLVGLabels(log, newLVG, LVGMetadateNameLabelKey, newLVG.Name) { + log.Debug(fmt.Sprintf("[shouldLVGWatcherReconcileUpdateEvent] update event should be reconciled as the LVMVolumeGroup's %s labels have been changed", newLVG.Name)) + return true + } + if !reflect.DeepEqual(oldLVG.Spec, newLVG.Spec) { log.Debug(fmt.Sprintf("[shouldLVGWatcherReconcileUpdateEvent] update event should be reconciled as the LVMVolumeGroup %s configuration has been changed", newLVG.Name)) return true @@ -947,3 +972,21 @@ func ExtendThinPool(log logger.Logger, metrics monitoring.Metrics, lvg *v1alpha1 return nil } + +func addLVGLabelIfNeeded(ctx context.Context, cl client.Client, log logger.Logger, lvg *v1alpha1.LvmVolumeGroup, labelKey, labelValue string) (bool, error) { + if !shouldUpdateLVGLabels(log, lvg, labelKey, labelValue) { + return false, nil + } + + if lvg.Labels == nil { + lvg.Labels = make(map[string]string) + } + + lvg.Labels[labelKey] = labelValue + err := cl.Update(ctx, lvg) + if err != nil { + return false, err + } + + return true, nil +} diff --git a/images/agent/src/pkg/controller/lvm_volume_group_watcher_test.go b/images/agent/src/pkg/controller/lvm_volume_group_watcher_test.go index 3427d098..b1fa2bad 100644 --- a/images/agent/src/pkg/controller/lvm_volume_group_watcher_test.go +++ b/images/agent/src/pkg/controller/lvm_volume_group_watcher_test.go @@ -997,27 +997,45 @@ func TestLVMVolumeGroupWatcherCtrl(t *testing.T) { t.Run("condition_vg_configuration_applied_is_updating_returns_false", func(t *testing.T) { oldLVG := &v1alpha1.LvmVolumeGroup{} newLVG := &v1alpha1.LvmVolumeGroup{} + newLVG.Name = "test-name" newLVG.Status.Conditions = []v1.Condition{ { Type: internal.TypeVGConfigurationApplied, Reason: internal.ReasonUpdating, }, } + newLVG.Labels = map[string]string{LVGMetadateNameLabelKey: newLVG.Name} assert.False(t, shouldLVGWatcherReconcileUpdateEvent(log, oldLVG, newLVG)) }) t.Run("condition_vg_configuration_applied_is_creating_returns_false", func(t *testing.T) { oldLVG := &v1alpha1.LvmVolumeGroup{} newLVG := &v1alpha1.LvmVolumeGroup{} + newLVG.Name = "test-name" newLVG.Status.Conditions = []v1.Condition{ { Type: internal.TypeVGConfigurationApplied, Reason: internal.ReasonCreating, }, } + newLVG.Labels = map[string]string{LVGMetadateNameLabelKey: newLVG.Name} assert.False(t, shouldLVGWatcherReconcileUpdateEvent(log, oldLVG, newLVG)) }) + t.Run("label_is_not_the_same_returns_true", func(t *testing.T) { + oldLVG := &v1alpha1.LvmVolumeGroup{} + newLVG := &v1alpha1.LvmVolumeGroup{} + newLVG.Name = "test-name" + newLVG.Status.Conditions = []v1.Condition{ + { + Type: internal.TypeVGConfigurationApplied, + Reason: internal.ReasonCreating, + }, + } + newLVG.Labels = map[string]string{LVGMetadateNameLabelKey: "some-other-name"} + assert.True(t, shouldLVGWatcherReconcileUpdateEvent(log, oldLVG, newLVG)) + }) + t.Run("dev_size_and_pv_size_are_diff_returns_true", func(t *testing.T) { oldLVG := &v1alpha1.LvmVolumeGroup{} newLVG := &v1alpha1.LvmVolumeGroup{} @@ -1037,6 +1055,33 @@ func TestLVMVolumeGroupWatcherCtrl(t *testing.T) { }) }) + t.Run("shouldUpdateLVGLabels", func(t *testing.T) { + t.Run("labels_nil_returns_true", func(t *testing.T) { + lvg := &v1alpha1.LvmVolumeGroup{} + assert.True(t, shouldUpdateLVGLabels(log, lvg, "key", "value")) + }) + t.Run("no_such_label_returns_true", func(t *testing.T) { + lvg := &v1alpha1.LvmVolumeGroup{} + lvg.Labels = map[string]string{"key": "value"} + assert.True(t, shouldUpdateLVGLabels(log, lvg, "other-key", "value")) + }) + t.Run("key_exists_other_value_returns_true", func(t *testing.T) { + const key = "key" + lvg := &v1alpha1.LvmVolumeGroup{} + lvg.Labels = map[string]string{key: "value"} + assert.True(t, shouldUpdateLVGLabels(log, lvg, key, "other-value")) + }) + t.Run("all_good_returns_false", func(t *testing.T) { + const ( + key = "key" + value = "value" + ) + lvg := &v1alpha1.LvmVolumeGroup{} + lvg.Labels = map[string]string{key: value} + assert.False(t, shouldUpdateLVGLabels(log, lvg, key, value)) + }) + }) + t.Run("checkIfVGExist", func(t *testing.T) { const targetName = "test" vgs := []internal.VGData{