From a9a15bfefbf99923c685a5e51a479734d5b5cf8d Mon Sep 17 00:00:00 2001 From: Viktor Kramarenko Date: Thu, 22 Aug 2024 19:04:31 +0300 Subject: [PATCH] refactored cache Signed-off-by: Viktor Kramarenko --- images/agent/src/pkg/cache/cache.go | 96 ++++++++++++++----- .../lvm_logical_volume_extender_watcher.go | 18 ++-- .../controller/lvm_logical_volume_watcher.go | 4 - .../lvm_logical_volume_watcher_func.go | 23 ++--- .../lvm_logical_volume_watcher_test.go | 2 +- .../controller/lvm_volume_group_discover.go | 2 +- .../lvm_volume_group_watcher_func.go | 6 +- images/agent/src/pkg/scanner/scanner.go | 4 +- 8 files changed, 99 insertions(+), 56 deletions(-) diff --git a/images/agent/src/pkg/cache/cache.go b/images/agent/src/pkg/cache/cache.go index ac937347..ffff5e61 100644 --- a/images/agent/src/pkg/cache/cache.go +++ b/images/agent/src/pkg/cache/cache.go @@ -1,25 +1,39 @@ package cache import ( - "agent/internal" - "agent/pkg/logger" "bytes" "fmt" + "sync" + + "agent/internal" + "agent/pkg/logger" +) + +const ( + lvcount = 50 ) type Cache struct { + m sync.RWMutex devices []internal.Device deviceErrs bytes.Buffer pvs []internal.PVData pvsErrs bytes.Buffer vgs []internal.VGData vgsErrs bytes.Buffer - lvs []internal.LVData + lvs map[string]*LVData lvsErrs bytes.Buffer } +type LVData struct { + Data internal.LVData + Exist bool +} + func New() *Cache { - return &Cache{} + return &Cache{ + lvs: make(map[string]*LVData, lvcount), + } } func (c *Cache) StoreDevices(devices []internal.Device, stdErr bytes.Buffer) { @@ -59,37 +73,69 @@ func (c *Cache) GetVGs() ([]internal.VGData, bytes.Buffer) { } func (c *Cache) StoreLVs(lvs []internal.LVData, stdErr bytes.Buffer) { - c.lvs = lvs + lvsOnNode := make(map[string]internal.LVData, len(lvs)) + for _, lv := range lvs { + lvsOnNode[c.configureLVKey(lv.VGName, lv.LVName)] = lv + } + + c.m.Lock() + defer c.m.Unlock() + + for _, lv := range lvsOnNode { + k := c.configureLVKey(lv.VGName, lv.LVName) + if cachedLV, exist := c.lvs[k]; !exist || cachedLV.Exist { + c.lvs[k] = &LVData{ + Data: lv, + Exist: true, + } + } + } + + for key, lv := range c.lvs { + if lv.Exist { + continue + } + + if _, exist := lvsOnNode[key]; !exist { + delete(c.lvs, key) + } + } + c.lvsErrs = stdErr } func (c *Cache) GetLVs() ([]internal.LVData, bytes.Buffer) { - dst := make([]internal.LVData, len(c.lvs)) - copy(dst, c.lvs) + dst := make([]internal.LVData, 0, len(c.lvs)) - return dst, c.lvsErrs -} - -func (c *Cache) FindLV(vgName, lvName string) *internal.LVData { + c.m.RLock() + defer c.m.RUnlock() for _, lv := range c.lvs { - if lv.VGName == vgName && lv.LVName == lvName { - return &lv - } + dst = append(dst, lv.Data) } - return nil + return dst, c.lvsErrs +} + +func (c *Cache) FindLV(vgName, lvName string) *LVData { + c.m.RLock() + defer c.m.RUnlock() + return c.lvs[c.configureLVKey(vgName, lvName)] } func (c *Cache) AddLV(vgName, lvName string) { - c.lvs = append(c.lvs, internal.LVData{VGName: vgName, LVName: lvName}) + c.m.Lock() + defer c.m.Unlock() + c.lvs[c.configureLVKey(vgName, lvName)] = &LVData{ + Data: internal.LVData{VGName: vgName, LVName: lvName}, + Exist: true, + } } func (c *Cache) RemoveLV(vgName, lvName string) { - for i, lv := range c.lvs { - if lv.VGName == vgName && lv.LVName == lvName { - c.lvs = append(c.lvs[:i], c.lvs[i+1:]...) - } - } + c.m.Lock() + defer c.m.Unlock() + + c.lvs[c.configureLVKey(vgName, lvName)].Exist = false } func (c *Cache) FindVG(vgName string) *internal.VGData { @@ -126,11 +172,17 @@ func (c *Cache) PrintTheCache(log logger.Logger) { log.Cache(c.vgsErrs.String()) log.Cache("[VGs ENDS]") log.Cache("[LVs BEGIN]") + c.m.RLock() for _, lv := range c.lvs { - log.Cache(fmt.Sprintf(" LV Name: %s, VG name: %s, size: %s, tags: %s, attr: %s, pool: %s", lv.LVName, lv.VGName, lv.LVSize.String(), lv.LvTags, lv.LVAttr, lv.PoolName)) + log.Cache(fmt.Sprintf(" Data Name: %s, VG name: %s, size: %s, tags: %s, attr: %s, pool: %s", lv.Data.LVName, lv.Data.VGName, lv.Data.LVSize.String(), lv.Data.LvTags, lv.Data.LVAttr, lv.Data.PoolName)) } + c.m.RUnlock() log.Cache("[ERRS]") log.Cache(c.lvsErrs.String()) log.Cache("[LVs ENDS]") log.Cache("*****************CACHE ENDS*****************") } + +func (c *Cache) configureLVKey(vgName, lvName string) string { + return fmt.Sprintf("%s/%s", vgName, lvName) +} diff --git a/images/agent/src/pkg/controller/lvm_logical_volume_extender_watcher.go b/images/agent/src/pkg/controller/lvm_logical_volume_extender_watcher.go index 078ac6fe..39da5367 100644 --- a/images/agent/src/pkg/controller/lvm_logical_volume_extender_watcher.go +++ b/images/agent/src/pkg/controller/lvm_logical_volume_extender_watcher.go @@ -149,7 +149,7 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m lv := sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode) if lv == nil { - err = fmt.Errorf("LV %s not found", llv.Spec.ActualLVNameOnTheNode) + err = fmt.Errorf("lv %s not found", llv.Spec.ActualLVNameOnTheNode) log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to find LV %s of the LVMLogicalVolume %s", llv.Spec.ActualLVNameOnTheNode, llv.Name)) err = updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, &llv, LLVStatusPhaseFailed, err.Error()) if err != nil { @@ -159,13 +159,13 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m continue } - if utils.AreSizesEqualWithinDelta(llvRequestedSize, lv.LVSize, internal.ResizeDelta) { + if utils.AreSizesEqualWithinDelta(llvRequestedSize, lv.Data.LVSize, internal.ResizeDelta) { log.Info(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s should not be extended", llv.Name)) continue } - if llvRequestedSize.Value() < lv.LVSize.Value() { - log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s requested size %s is less than actual one on the node %s", llv.Name, llvRequestedSize.String(), lv.LVSize.String())) + if llvRequestedSize.Value() < lv.Data.LVSize.Value() { + log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s requested size %s is less than actual one on the node %s", llv.Name, llvRequestedSize.String(), lv.Data.LVSize.String())) continue } @@ -207,18 +207,18 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m ) for currentAttempts < maxAttempts { lv = sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode) - if utils.AreSizesEqualWithinDelta(lv.LVSize, llvRequestedSize, internal.ResizeDelta) { - log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s of the LVMLogicalVolume %s was successfully updated in the cache", lv.LVName, llv.Name)) + if utils.AreSizesEqualWithinDelta(lv.Data.LVSize, llvRequestedSize, internal.ResizeDelta) { + log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s of the LVMLogicalVolume %s was successfully updated in the cache", lv.Data.LVName, llv.Name)) break } - log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s size of the LVMLogicalVolume %s was not yet updated in the cache, retry...", lv.LVName, llv.Name)) + log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s size of the LVMLogicalVolume %s was not yet updated in the cache, retry...", lv.Data.LVName, llv.Name)) currentAttempts++ time.Sleep(1 * time.Second) } if currentAttempts == maxAttempts { - err = fmt.Errorf("LV %s is not updated in the cache", lv.LVName) + err = fmt.Errorf("LV %s is not updated in the cache", lv.Data.LVName) log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to resize the LVMLogicalVolume %s", llv.Name)) shouldRetry = true @@ -228,7 +228,7 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m continue } - updated, err := updateLLVPhaseToCreatedIfNeeded(ctx, cl, &llv, lv.LVSize) + updated, err := updateLLVPhaseToCreatedIfNeeded(ctx, cl, &llv, lv.Data.LVSize) if err != nil { log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to update the LVMLogicalVolume %s", llv.Name)) shouldRetry = true diff --git a/images/agent/src/pkg/controller/lvm_logical_volume_watcher.go b/images/agent/src/pkg/controller/lvm_logical_volume_watcher.go index 2d4c0f98..3216e21b 100644 --- a/images/agent/src/pkg/controller/lvm_logical_volume_watcher.go +++ b/images/agent/src/pkg/controller/lvm_logical_volume_watcher.go @@ -145,10 +145,6 @@ func RunLVMLogicalVolumeWatcherController( return reconcile.Result{RequeueAfter: cfg.LLVRequeueIntervalSec}, nil } - // создаем LV и пытаемся получить его размер на 113278 строчке I0819 12:43:10.563596 - // при этом у нас кэш начинает заполняться в I0819 12:43:09.486602 и заканчивает в I0819 12:43:22.070604, а потом только в I0819 12:43:22.081861 - // при этом получаем ретрай в I0819 12:43:15.563851 (и на мо - log.Info(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] successfully ended reconciliation of the LVMLogicalVolume %s", request.Name)) return reconcile.Result{}, nil }), diff --git a/images/agent/src/pkg/controller/lvm_logical_volume_watcher_func.go b/images/agent/src/pkg/controller/lvm_logical_volume_watcher_func.go index e3ea08bf..c2849828 100644 --- a/images/agent/src/pkg/controller/lvm_logical_volume_watcher_func.go +++ b/images/agent/src/pkg/controller/lvm_logical_volume_watcher_func.go @@ -152,14 +152,14 @@ func updateLLVPhaseToCreatedIfNeeded(ctx context.Context, cl client.Client, llv func deleteLVIfNeeded(log logger.Logger, sdsCache *cache.Cache, vgName string, llv *v1alpha1.LVMLogicalVolume) error { lv := sdsCache.FindLV(vgName, llv.Spec.ActualLVNameOnTheNode) - if lv == nil { + if lv == nil || !lv.Exist { log.Warning(fmt.Sprintf("[deleteLVIfNeeded] did not find LV %s in VG %s", llv.Spec.ActualLVNameOnTheNode, vgName)) return nil } // this case prevents unexpected same-name LV deletions which does not actually belong to our LLV - if !checkIfLVBelongsToLLV(llv, lv) { - log.Warning(fmt.Sprintf("[deleteLVIfNeeded] no need to delete LV %s as it doesnt belong to LVMLogicalVolume %s", lv.LVName, llv.Name)) + if !checkIfLVBelongsToLLV(llv, &lv.Data) { + log.Warning(fmt.Sprintf("[deleteLVIfNeeded] no need to delete LV %s as it doesnt belong to LVMLogicalVolume %s", lv.Data.LVName, llv.Name)) return nil } @@ -170,8 +170,8 @@ func deleteLVIfNeeded(log logger.Logger, sdsCache *cache.Cache, vgName string, l return err } - log.Debug(fmt.Sprintf("[deleteLVIfNeeded] removes LV %s from the cache", lv.LVName)) - sdsCache.RemoveLV(lv.VGName, lv.LVName) + log.Debug(fmt.Sprintf("[deleteLVIfNeeded] removes LV %s from the cache", lv.Data.LVName)) + sdsCache.RemoveLV(lv.Data.VGName, lv.Data.LVName) return nil } @@ -182,7 +182,7 @@ func getLVActualSize(sdsCache *cache.Cache, vgName, lvName string) resource.Quan return resource.Quantity{} } - result := resource.NewQuantity(lv.LVSize.Value(), resource.BinarySI) + result := resource.NewQuantity(lv.Data.LVSize.Value(), resource.BinarySI) return *result } @@ -297,12 +297,9 @@ func validateLVMLogicalVolume(sdsCache *cache.Cache, llv *v1alpha1.LVMLogicalVol // if a specified Thick LV name matches the existing Thin one lv := sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode) - if lv != nil { - if len(lv.LVAttr) == 0 { - reason.WriteString(fmt.Sprintf("LV %s was found on the node, but can't be validated due to its attributes is empty string. ", lv.LVName)) - } else if !checkIfLVBelongsToLLV(llv, lv) { - reason.WriteString(fmt.Sprintf("Specified LV %s is already created and it is doesnt match the one on the node.", lv.LVName)) - } + if lv != nil && + len(lv.Data.LVAttr) != 0 && !checkIfLVBelongsToLLV(llv, &lv.Data) { + reason.WriteString(fmt.Sprintf("Specified LV %s is already created and it is doesnt match the one on the node.", lv.Data.LVName)) } if reason.Len() > 0 { @@ -347,7 +344,7 @@ func shouldReconcileByUpdateFunc(sdsCache *cache.Cache, vgName string, llv *v1al } lv := sdsCache.FindLV(vgName, llv.Spec.ActualLVNameOnTheNode) - return lv != nil + return lv != nil && lv.Exist } func isContiguous(llv *v1alpha1.LVMLogicalVolume) bool { diff --git a/images/agent/src/pkg/controller/lvm_logical_volume_watcher_test.go b/images/agent/src/pkg/controller/lvm_logical_volume_watcher_test.go index 196e7b1e..486653ed 100644 --- a/images/agent/src/pkg/controller/lvm_logical_volume_watcher_test.go +++ b/images/agent/src/pkg/controller/lvm_logical_volume_watcher_test.go @@ -153,7 +153,7 @@ func TestLVMLogicaVolumeWatcher(t *testing.T) { v, r := validateLVMLogicalVolume(sdsCache, llv, &v1alpha1.LvmVolumeGroup{}) if assert.False(t, v) { - assert.Equal(t, "Zero size for LV. Thin pool specified for Thick LV. LV test-lv was found on the node, but can't be validated due to its attributes is empty string. ", r) + assert.Equal(t, "Zero size for LV. Thin pool specified for Thick LV. ", r) } }) diff --git a/images/agent/src/pkg/controller/lvm_volume_group_discover.go b/images/agent/src/pkg/controller/lvm_volume_group_discover.go index f4f3d25d..af8c22cf 100644 --- a/images/agent/src/pkg/controller/lvm_volume_group_discover.go +++ b/images/agent/src/pkg/controller/lvm_volume_group_discover.go @@ -732,7 +732,7 @@ func getStatusThinPools(log logger.Logger, thinPools, sortedLVs map[string][]int for _, thinPool := range tps { usedSize, err := getThinPoolUsedSize(thinPool) - log.Trace(fmt.Sprintf("[getStatusThinPools] LV %v for VG name %s", thinPool, vg.VGName)) + log.Trace(fmt.Sprintf("[getStatusThinPools] Data %v for VG name %s", thinPool, vg.VGName)) if err != nil { log.Error(err, "[getStatusThinPools] unable to getThinPoolUsedSize") } diff --git a/images/agent/src/pkg/controller/lvm_volume_group_watcher_func.go b/images/agent/src/pkg/controller/lvm_volume_group_watcher_func.go index a28f9ecb..4148aca2 100644 --- a/images/agent/src/pkg/controller/lvm_volume_group_watcher_func.go +++ b/images/agent/src/pkg/controller/lvm_volume_group_watcher_func.go @@ -412,12 +412,12 @@ func validateLVGForUpdateFunc(log logger.Logger, sdsCache *cache.Cache, lvg *v1a for _, tp := range lvg.Spec.ThinPools { lv := sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, tp.Name) if lv != nil { - if !isThinPool(*lv) { - reason.WriteString(fmt.Sprintf("LV %s is already created on the node and it is not a thin-pool", lv.LVName)) + if !isThinPool(lv.Data) { + reason.WriteString(fmt.Sprintf("Data %s is already created on the node and it is not a thin-pool", lv.Data.LVName)) continue } - actualThinPools[lv.LVName] = *lv + actualThinPools[lv.Data.LVName] = lv.Data } } diff --git a/images/agent/src/pkg/scanner/scanner.go b/images/agent/src/pkg/scanner/scanner.go index dfb78b27..a44bec86 100644 --- a/images/agent/src/pkg/scanner/scanner.go +++ b/images/agent/src/pkg/scanner/scanner.go @@ -5,10 +5,10 @@ import ( "context" "errors" "fmt" - "k8s.io/utils/clock" "time" "github.com/pilebones/go-udev/netlink" + "k8s.io/utils/clock" kubeCtrl "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -162,13 +162,11 @@ func fillTheCache(ctx context.Context, log logger.Logger, cache *cache.Cache, cf // the scan operations order is very important as it guarantees the consistent and reliable data from the node realClock := clock.RealClock{} now := time.Now() - fmt.Printf("LVS TIME BEFORE: %s", time.Now().String()) lvs, lvsErr, err := scanLVs(ctx, log, cfg) log.Trace(fmt.Sprintf("[fillTheCache] LVS command runs for: %s", realClock.Since(now).String())) if err != nil { return err } - fmt.Printf("LVS TIME AFTER: %s", time.Now().String()) now = time.Now() vgs, vgsErr, err := scanVGs(ctx, log, cfg)