diff --git a/images/agent/src/pkg/cache/cache.go b/images/agent/src/pkg/cache/cache.go index ea4962d4..a4b0c89c 100644 --- a/images/agent/src/pkg/cache/cache.go +++ b/images/agent/src/pkg/cache/cache.go @@ -3,24 +3,37 @@ package cache import ( "bytes" "fmt" + "sync" "agent/internal" "agent/pkg/logger" ) +const ( + lvcount = 50 +) + type Cache struct { + m sync.RWMutex devices []internal.Device deviceErrs bytes.Buffer pvs []internal.PVData pvsErrs bytes.Buffer vgs []internal.VGData vgsErrs bytes.Buffer - lvs []internal.LVData + lvs map[string]*LVData lvsErrs bytes.Buffer } +type LVData struct { + Data internal.LVData + Exist bool +} + func New() *Cache { - return &Cache{} + return &Cache{ + lvs: make(map[string]*LVData, lvcount), + } } func (c *Cache) StoreDevices(devices []internal.Device, stdErr bytes.Buffer) { @@ -60,25 +73,69 @@ func (c *Cache) GetVGs() ([]internal.VGData, bytes.Buffer) { } func (c *Cache) StoreLVs(lvs []internal.LVData, stdErr bytes.Buffer) { - c.lvs = lvs + lvsOnNode := make(map[string]internal.LVData, len(lvs)) + for _, lv := range lvs { + lvsOnNode[c.configureLVKey(lv.VGName, lv.LVName)] = lv + } + + c.m.Lock() + defer c.m.Unlock() + + for _, lv := range lvsOnNode { + k := c.configureLVKey(lv.VGName, lv.LVName) + if cachedLV, exist := c.lvs[k]; !exist || cachedLV.Exist { + c.lvs[k] = &LVData{ + Data: lv, + Exist: true, + } + } + } + + for key, lv := range c.lvs { + if lv.Exist { + continue + } + + if _, exist := lvsOnNode[key]; !exist { + delete(c.lvs, key) + } + } + c.lvsErrs = stdErr } func (c *Cache) GetLVs() ([]internal.LVData, bytes.Buffer) { - dst := make([]internal.LVData, len(c.lvs)) - copy(dst, c.lvs) + dst := make([]internal.LVData, 0, len(c.lvs)) + + c.m.RLock() + defer c.m.RUnlock() + for _, lv := range c.lvs { + dst = append(dst, lv.Data) + } return dst, c.lvsErrs } -func (c *Cache) FindLV(vgName, lvName string) *internal.LVData { - for _, lv := range c.lvs { - if lv.VGName == vgName && lv.LVName == lvName { - return &lv - } +func (c *Cache) FindLV(vgName, lvName string) *LVData { + c.m.RLock() + defer c.m.RUnlock() + return c.lvs[c.configureLVKey(vgName, lvName)] +} + +func (c *Cache) AddLV(vgName, lvName string) { + c.m.Lock() + defer c.m.Unlock() + c.lvs[c.configureLVKey(vgName, lvName)] = &LVData{ + Data: internal.LVData{VGName: vgName, LVName: lvName}, + Exist: true, } +} - return nil +func (c *Cache) MarkLVAsRemoved(vgName, lvName string) { + c.m.Lock() + defer c.m.Unlock() + + c.lvs[c.configureLVKey(vgName, lvName)].Exist = false } func (c *Cache) FindVG(vgName string) *internal.VGData { @@ -115,11 +172,16 @@ func (c *Cache) PrintTheCache(log logger.Logger) { log.Cache(c.vgsErrs.String()) log.Cache("[VGs ENDS]") log.Cache("[LVs BEGIN]") - for _, lv := range c.lvs { - log.Cache(fmt.Sprintf(" LV Name: %s, VG name: %s, size: %s, tags: %s, attr: %s, pool: %s", lv.LVName, lv.VGName, lv.LVSize.String(), lv.LvTags, lv.LVAttr, lv.PoolName)) + lvs, _ := c.GetLVs() + for _, lv := range lvs { + log.Cache(fmt.Sprintf(" Data Name: %s, VG name: %s, size: %s, tags: %s, attr: %s, pool: %s", lv.LVName, lv.VGName, lv.LVSize.String(), lv.LvTags, lv.LVAttr, lv.PoolName)) } log.Cache("[ERRS]") log.Cache(c.lvsErrs.String()) log.Cache("[LVs ENDS]") log.Cache("*****************CACHE ENDS*****************") } + +func (c *Cache) configureLVKey(vgName, lvName string) string { + return fmt.Sprintf("%s/%s", vgName, lvName) +} diff --git a/images/agent/src/pkg/controller/lvm_logical_volume_extender_watcher.go b/images/agent/src/pkg/controller/lvm_logical_volume_extender_watcher.go index 078ac6fe..39da5367 100644 --- a/images/agent/src/pkg/controller/lvm_logical_volume_extender_watcher.go +++ b/images/agent/src/pkg/controller/lvm_logical_volume_extender_watcher.go @@ -149,7 +149,7 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m lv := sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode) if lv == nil { - err = fmt.Errorf("LV %s not found", llv.Spec.ActualLVNameOnTheNode) + err = fmt.Errorf("lv %s not found", llv.Spec.ActualLVNameOnTheNode) log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to find LV %s of the LVMLogicalVolume %s", llv.Spec.ActualLVNameOnTheNode, llv.Name)) err = updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, &llv, LLVStatusPhaseFailed, err.Error()) if err != nil { @@ -159,13 +159,13 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m continue } - if utils.AreSizesEqualWithinDelta(llvRequestedSize, lv.LVSize, internal.ResizeDelta) { + if utils.AreSizesEqualWithinDelta(llvRequestedSize, lv.Data.LVSize, internal.ResizeDelta) { log.Info(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s should not be extended", llv.Name)) continue } - if llvRequestedSize.Value() < lv.LVSize.Value() { - log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s requested size %s is less than actual one on the node %s", llv.Name, llvRequestedSize.String(), lv.LVSize.String())) + if llvRequestedSize.Value() < lv.Data.LVSize.Value() { + log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s requested size %s is less than actual one on the node %s", llv.Name, llvRequestedSize.String(), lv.Data.LVSize.String())) continue } @@ -207,18 +207,18 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m ) for currentAttempts < maxAttempts { lv = sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode) - if utils.AreSizesEqualWithinDelta(lv.LVSize, llvRequestedSize, internal.ResizeDelta) { - log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s of the LVMLogicalVolume %s was successfully updated in the cache", lv.LVName, llv.Name)) + if utils.AreSizesEqualWithinDelta(lv.Data.LVSize, llvRequestedSize, internal.ResizeDelta) { + log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s of the LVMLogicalVolume %s was successfully updated in the cache", lv.Data.LVName, llv.Name)) break } - log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s size of the LVMLogicalVolume %s was not yet updated in the cache, retry...", lv.LVName, llv.Name)) + log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s size of the LVMLogicalVolume %s was not yet updated in the cache, retry...", lv.Data.LVName, llv.Name)) currentAttempts++ time.Sleep(1 * time.Second) } if currentAttempts == maxAttempts { - err = fmt.Errorf("LV %s is not updated in the cache", lv.LVName) + err = fmt.Errorf("LV %s is not updated in the cache", lv.Data.LVName) log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to resize the LVMLogicalVolume %s", llv.Name)) shouldRetry = true @@ -228,7 +228,7 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m continue } - updated, err := updateLLVPhaseToCreatedIfNeeded(ctx, cl, &llv, lv.LVSize) + updated, err := updateLLVPhaseToCreatedIfNeeded(ctx, cl, &llv, lv.Data.LVSize) if err != nil { log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to update the LVMLogicalVolume %s", llv.Name)) shouldRetry = true diff --git a/images/agent/src/pkg/controller/lvm_logical_volume_watcher.go b/images/agent/src/pkg/controller/lvm_logical_volume_watcher.go index 5323a005..3216e21b 100644 --- a/images/agent/src/pkg/controller/lvm_logical_volume_watcher.go +++ b/images/agent/src/pkg/controller/lvm_logical_volume_watcher.go @@ -274,6 +274,8 @@ func reconcileLLVCreateFunc( log.Info(fmt.Sprintf("[reconcileLLVCreateFunc] successfully created LV %s in VG %s for LVMLogicalVolume resource with name: %s", llv.Spec.ActualLVNameOnTheNode, lvg.Spec.ActualVGNameOnTheNode, llv.Name)) + log.Debug(fmt.Sprintf("[reconcileLLVCreateFunc] adds the LV %s to the cache", llv.Spec.ActualLVNameOnTheNode)) + sdsCache.AddLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode) log.Debug(fmt.Sprintf("[reconcileLLVCreateFunc] tries to get the LV %s actual size", llv.Spec.ActualLVNameOnTheNode)) actualSize := getLVActualSize(sdsCache, lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode) if actualSize.Value() == 0 { @@ -323,9 +325,8 @@ func reconcileLLVUpdateFunc( log.Debug(fmt.Sprintf("[reconcileLLVUpdateFunc] tries to get LVMLogicalVolume %s actual size before the extension", llv.Name)) actualSize := getLVActualSize(sdsCache, lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode) if actualSize.Value() == 0 { - err := fmt.Errorf("LV %s has zero size (likely LV was not found in the cache)", llv.Spec.ActualLVNameOnTheNode) - log.Error(err, fmt.Sprintf("[reconcileLLVUpdateFunc] unable to get actual LV %s size of the LVMLogicalVolume %s", llv.Spec.ActualLVNameOnTheNode, llv.Name)) - return true, err + log.Warning(fmt.Sprintf("[reconcileLLVUpdateFunc] LV %s of the LVMLogicalVolume %s has zero size (likely LV was not updated in the cache) ", llv.Spec.ActualLVNameOnTheNode, llv.Name)) + return true, nil } log.Debug(fmt.Sprintf("[reconcileLLVUpdateFunc] successfully got LVMLogicalVolume %s actual size %s before the extension", llv.Name, actualSize.String())) diff --git a/images/agent/src/pkg/controller/lvm_logical_volume_watcher_func.go b/images/agent/src/pkg/controller/lvm_logical_volume_watcher_func.go index 91ad1c81..4ceb4301 100644 --- a/images/agent/src/pkg/controller/lvm_logical_volume_watcher_func.go +++ b/images/agent/src/pkg/controller/lvm_logical_volume_watcher_func.go @@ -152,14 +152,14 @@ func updateLLVPhaseToCreatedIfNeeded(ctx context.Context, cl client.Client, llv func deleteLVIfNeeded(log logger.Logger, sdsCache *cache.Cache, vgName string, llv *v1alpha1.LVMLogicalVolume) error { lv := sdsCache.FindLV(vgName, llv.Spec.ActualLVNameOnTheNode) - if lv == nil { + if lv == nil || !lv.Exist { log.Warning(fmt.Sprintf("[deleteLVIfNeeded] did not find LV %s in VG %s", llv.Spec.ActualLVNameOnTheNode, vgName)) return nil } // this case prevents unexpected same-name LV deletions which does not actually belong to our LLV - if !checkIfLVBelongsToLLV(llv, lv) { - log.Warning(fmt.Sprintf("[deleteLVIfNeeded] no need to delete LV %s as it doesnt belong to LVMLogicalVolume %s", lv.LVName, llv.Name)) + if !checkIfLVBelongsToLLV(llv, &lv.Data) { + log.Warning(fmt.Sprintf("[deleteLVIfNeeded] no need to delete LV %s as it doesnt belong to LVMLogicalVolume %s", lv.Data.LVName, llv.Name)) return nil } @@ -170,6 +170,9 @@ func deleteLVIfNeeded(log logger.Logger, sdsCache *cache.Cache, vgName string, l return err } + log.Debug(fmt.Sprintf("[deleteLVIfNeeded] mark LV %s in the cache as removed", lv.Data.LVName)) + sdsCache.MarkLVAsRemoved(lv.Data.VGName, lv.Data.LVName) + return nil } @@ -179,7 +182,7 @@ func getLVActualSize(sdsCache *cache.Cache, vgName, lvName string) resource.Quan return resource.Quantity{} } - result := resource.NewQuantity(lv.LVSize.Value(), resource.BinarySI) + result := resource.NewQuantity(lv.Data.LVSize.Value(), resource.BinarySI) return *result } @@ -294,12 +297,9 @@ func validateLVMLogicalVolume(sdsCache *cache.Cache, llv *v1alpha1.LVMLogicalVol // if a specified Thick LV name matches the existing Thin one lv := sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode) - if lv != nil { - if len(lv.LVAttr) == 0 { - reason.WriteString(fmt.Sprintf("LV %s was found on the node, but can't be validated due to its attributes is empty string. ", lv.LVName)) - } else if !checkIfLVBelongsToLLV(llv, lv) { - reason.WriteString(fmt.Sprintf("Specified LV %s is already created and it is doesnt match the one on the node.", lv.LVName)) - } + if lv != nil && + len(lv.Data.LVAttr) != 0 && !checkIfLVBelongsToLLV(llv, &lv.Data) { + reason.WriteString(fmt.Sprintf("Specified LV %s is already created and it is doesnt match the one on the node.", lv.Data.LVName)) } if reason.Len() > 0 { @@ -344,7 +344,7 @@ func shouldReconcileByUpdateFunc(sdsCache *cache.Cache, vgName string, llv *v1al } lv := sdsCache.FindLV(vgName, llv.Spec.ActualLVNameOnTheNode) - return lv != nil + return lv != nil && lv.Exist } func isContiguous(llv *v1alpha1.LVMLogicalVolume) bool { diff --git a/images/agent/src/pkg/controller/lvm_logical_volume_watcher_test.go b/images/agent/src/pkg/controller/lvm_logical_volume_watcher_test.go index 196e7b1e..486653ed 100644 --- a/images/agent/src/pkg/controller/lvm_logical_volume_watcher_test.go +++ b/images/agent/src/pkg/controller/lvm_logical_volume_watcher_test.go @@ -153,7 +153,7 @@ func TestLVMLogicaVolumeWatcher(t *testing.T) { v, r := validateLVMLogicalVolume(sdsCache, llv, &v1alpha1.LvmVolumeGroup{}) if assert.False(t, v) { - assert.Equal(t, "Zero size for LV. Thin pool specified for Thick LV. LV test-lv was found on the node, but can't be validated due to its attributes is empty string. ", r) + assert.Equal(t, "Zero size for LV. Thin pool specified for Thick LV. ", r) } }) diff --git a/images/agent/src/pkg/controller/lvm_volume_group_watcher_func.go b/images/agent/src/pkg/controller/lvm_volume_group_watcher_func.go index 0e9ea991..d5c8288f 100644 --- a/images/agent/src/pkg/controller/lvm_volume_group_watcher_func.go +++ b/images/agent/src/pkg/controller/lvm_volume_group_watcher_func.go @@ -437,12 +437,12 @@ func validateLVGForUpdateFunc(log logger.Logger, sdsCache *cache.Cache, lvg *v1a for _, tp := range lvg.Spec.ThinPools { lv := sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, tp.Name) if lv != nil { - if !isThinPool(*lv) { - reason.WriteString(fmt.Sprintf("LV %s is already created on the node and it is not a thin-pool", lv.LVName)) + if !isThinPool(lv.Data) { + reason.WriteString(fmt.Sprintf("LV %s is already created on the node and it is not a thin-pool", lv.Data.LVName)) continue } - actualThinPools[lv.LVName] = *lv + actualThinPools[lv.Data.LVName] = lv.Data } } diff --git a/images/agent/src/pkg/scanner/scanner.go b/images/agent/src/pkg/scanner/scanner.go index 7610576a..a44bec86 100644 --- a/images/agent/src/pkg/scanner/scanner.go +++ b/images/agent/src/pkg/scanner/scanner.go @@ -8,6 +8,7 @@ import ( "time" "github.com/pilebones/go-udev/netlink" + "k8s.io/utils/clock" kubeCtrl "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -159,22 +160,31 @@ func runControllersReconcile(ctx context.Context, log logger.Logger, bdCtrl, lvg func fillTheCache(ctx context.Context, log logger.Logger, cache *cache.Cache, cfg config.Options) error { // the scan operations order is very important as it guarantees the consistent and reliable data from the node + realClock := clock.RealClock{} + now := time.Now() lvs, lvsErr, err := scanLVs(ctx, log, cfg) + log.Trace(fmt.Sprintf("[fillTheCache] LVS command runs for: %s", realClock.Since(now).String())) if err != nil { return err } + now = time.Now() vgs, vgsErr, err := scanVGs(ctx, log, cfg) + log.Trace(fmt.Sprintf("[fillTheCache] VGS command runs for: %s", realClock.Since(now).String())) if err != nil { return err } + now = time.Now() pvs, pvsErr, err := scanPVs(ctx, log, cfg) + log.Trace(fmt.Sprintf("[fillTheCache] PVS command runs for: %s", realClock.Since(now).String())) if err != nil { return err } + now = time.Now() devices, devErr, err := scanDevices(ctx, log, cfg) + log.Trace(fmt.Sprintf("[fillTheCache] LSBLK command runs for: %s", realClock.Since(now).String())) if err != nil { return err }