Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactored cache
Browse files Browse the repository at this point in the history
Signed-off-by: Viktor Kramarenko <viktor.kramarenko@flant.com>
ViktorKram committed Aug 23, 2024
1 parent 6d3a8c8 commit a9a15bf
Showing 8 changed files with 99 additions and 56 deletions.
96 changes: 74 additions & 22 deletions images/agent/src/pkg/cache/cache.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,39 @@
package cache

import (
"agent/internal"
"agent/pkg/logger"
"bytes"
"fmt"
"sync"

"agent/internal"
"agent/pkg/logger"
)

const (
lvcount = 50
)

type Cache struct {
m sync.RWMutex
devices []internal.Device
deviceErrs bytes.Buffer
pvs []internal.PVData
pvsErrs bytes.Buffer
vgs []internal.VGData
vgsErrs bytes.Buffer
lvs []internal.LVData
lvs map[string]*LVData
lvsErrs bytes.Buffer
}

type LVData struct {
Data internal.LVData
Exist bool
}

func New() *Cache {
return &Cache{}
return &Cache{
lvs: make(map[string]*LVData, lvcount),
}
}

func (c *Cache) StoreDevices(devices []internal.Device, stdErr bytes.Buffer) {
@@ -59,37 +73,69 @@ func (c *Cache) GetVGs() ([]internal.VGData, bytes.Buffer) {
}

func (c *Cache) StoreLVs(lvs []internal.LVData, stdErr bytes.Buffer) {
c.lvs = lvs
lvsOnNode := make(map[string]internal.LVData, len(lvs))
for _, lv := range lvs {
lvsOnNode[c.configureLVKey(lv.VGName, lv.LVName)] = lv
}

c.m.Lock()
defer c.m.Unlock()

for _, lv := range lvsOnNode {
k := c.configureLVKey(lv.VGName, lv.LVName)
if cachedLV, exist := c.lvs[k]; !exist || cachedLV.Exist {
c.lvs[k] = &LVData{
Data: lv,
Exist: true,
}
}
}

for key, lv := range c.lvs {
if lv.Exist {
continue
}

if _, exist := lvsOnNode[key]; !exist {
delete(c.lvs, key)
}
}

c.lvsErrs = stdErr
}

func (c *Cache) GetLVs() ([]internal.LVData, bytes.Buffer) {
dst := make([]internal.LVData, len(c.lvs))
copy(dst, c.lvs)
dst := make([]internal.LVData, 0, len(c.lvs))

return dst, c.lvsErrs
}

func (c *Cache) FindLV(vgName, lvName string) *internal.LVData {
c.m.RLock()
defer c.m.RUnlock()
for _, lv := range c.lvs {
if lv.VGName == vgName && lv.LVName == lvName {
return &lv
}
dst = append(dst, lv.Data)
}

return nil
return dst, c.lvsErrs
}

func (c *Cache) FindLV(vgName, lvName string) *LVData {
c.m.RLock()
defer c.m.RUnlock()
return c.lvs[c.configureLVKey(vgName, lvName)]
}

func (c *Cache) AddLV(vgName, lvName string) {
c.lvs = append(c.lvs, internal.LVData{VGName: vgName, LVName: lvName})
c.m.Lock()
defer c.m.Unlock()
c.lvs[c.configureLVKey(vgName, lvName)] = &LVData{
Data: internal.LVData{VGName: vgName, LVName: lvName},
Exist: true,
}
}

func (c *Cache) RemoveLV(vgName, lvName string) {
for i, lv := range c.lvs {
if lv.VGName == vgName && lv.LVName == lvName {
c.lvs = append(c.lvs[:i], c.lvs[i+1:]...)
}
}
c.m.Lock()
defer c.m.Unlock()

c.lvs[c.configureLVKey(vgName, lvName)].Exist = false
}

func (c *Cache) FindVG(vgName string) *internal.VGData {
@@ -126,11 +172,17 @@ func (c *Cache) PrintTheCache(log logger.Logger) {
log.Cache(c.vgsErrs.String())
log.Cache("[VGs ENDS]")
log.Cache("[LVs BEGIN]")
c.m.RLock()
for _, lv := range c.lvs {
log.Cache(fmt.Sprintf(" LV Name: %s, VG name: %s, size: %s, tags: %s, attr: %s, pool: %s", lv.LVName, lv.VGName, lv.LVSize.String(), lv.LvTags, lv.LVAttr, lv.PoolName))
log.Cache(fmt.Sprintf(" Data Name: %s, VG name: %s, size: %s, tags: %s, attr: %s, pool: %s", lv.Data.LVName, lv.Data.VGName, lv.Data.LVSize.String(), lv.Data.LvTags, lv.Data.LVAttr, lv.Data.PoolName))
}
c.m.RUnlock()
log.Cache("[ERRS]")
log.Cache(c.lvsErrs.String())
log.Cache("[LVs ENDS]")
log.Cache("*****************CACHE ENDS*****************")
}

func (c *Cache) configureLVKey(vgName, lvName string) string {
return fmt.Sprintf("%s/%s", vgName, lvName)
}
Original file line number Diff line number Diff line change
@@ -149,7 +149,7 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m

lv := sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode)
if lv == nil {
err = fmt.Errorf("LV %s not found", llv.Spec.ActualLVNameOnTheNode)
err = fmt.Errorf("lv %s not found", llv.Spec.ActualLVNameOnTheNode)
log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to find LV %s of the LVMLogicalVolume %s", llv.Spec.ActualLVNameOnTheNode, llv.Name))
err = updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, &llv, LLVStatusPhaseFailed, err.Error())
if err != nil {
@@ -159,13 +159,13 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m
continue
}

if utils.AreSizesEqualWithinDelta(llvRequestedSize, lv.LVSize, internal.ResizeDelta) {
if utils.AreSizesEqualWithinDelta(llvRequestedSize, lv.Data.LVSize, internal.ResizeDelta) {
log.Info(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s should not be extended", llv.Name))
continue
}

if llvRequestedSize.Value() < lv.LVSize.Value() {
log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s requested size %s is less than actual one on the node %s", llv.Name, llvRequestedSize.String(), lv.LVSize.String()))
if llvRequestedSize.Value() < lv.Data.LVSize.Value() {
log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s requested size %s is less than actual one on the node %s", llv.Name, llvRequestedSize.String(), lv.Data.LVSize.String()))
continue
}

@@ -207,18 +207,18 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m
)
for currentAttempts < maxAttempts {
lv = sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode)
if utils.AreSizesEqualWithinDelta(lv.LVSize, llvRequestedSize, internal.ResizeDelta) {
log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s of the LVMLogicalVolume %s was successfully updated in the cache", lv.LVName, llv.Name))
if utils.AreSizesEqualWithinDelta(lv.Data.LVSize, llvRequestedSize, internal.ResizeDelta) {
log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s of the LVMLogicalVolume %s was successfully updated in the cache", lv.Data.LVName, llv.Name))
break
}

log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s size of the LVMLogicalVolume %s was not yet updated in the cache, retry...", lv.LVName, llv.Name))
log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s size of the LVMLogicalVolume %s was not yet updated in the cache, retry...", lv.Data.LVName, llv.Name))
currentAttempts++
time.Sleep(1 * time.Second)
}

if currentAttempts == maxAttempts {
err = fmt.Errorf("LV %s is not updated in the cache", lv.LVName)
err = fmt.Errorf("LV %s is not updated in the cache", lv.Data.LVName)
log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to resize the LVMLogicalVolume %s", llv.Name))
shouldRetry = true

@@ -228,7 +228,7 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m
continue
}

updated, err := updateLLVPhaseToCreatedIfNeeded(ctx, cl, &llv, lv.LVSize)
updated, err := updateLLVPhaseToCreatedIfNeeded(ctx, cl, &llv, lv.Data.LVSize)
if err != nil {
log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to update the LVMLogicalVolume %s", llv.Name))
shouldRetry = true
4 changes: 0 additions & 4 deletions images/agent/src/pkg/controller/lvm_logical_volume_watcher.go
Original file line number Diff line number Diff line change
@@ -145,10 +145,6 @@ func RunLVMLogicalVolumeWatcherController(
return reconcile.Result{RequeueAfter: cfg.LLVRequeueIntervalSec}, nil
}

// создаем LV и пытаемся получить его размер на 113278 строчке I0819 12:43:10.563596
// при этом у нас кэш начинает заполняться в I0819 12:43:09.486602 и заканчивает в I0819 12:43:22.070604, а потом только в I0819 12:43:22.081861
// при этом получаем ретрай в I0819 12:43:15.563851 (и на мо

log.Info(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] successfully ended reconciliation of the LVMLogicalVolume %s", request.Name))
return reconcile.Result{}, nil
}),
23 changes: 10 additions & 13 deletions images/agent/src/pkg/controller/lvm_logical_volume_watcher_func.go
Original file line number Diff line number Diff line change
@@ -152,14 +152,14 @@ func updateLLVPhaseToCreatedIfNeeded(ctx context.Context, cl client.Client, llv

func deleteLVIfNeeded(log logger.Logger, sdsCache *cache.Cache, vgName string, llv *v1alpha1.LVMLogicalVolume) error {
lv := sdsCache.FindLV(vgName, llv.Spec.ActualLVNameOnTheNode)
if lv == nil {
if lv == nil || !lv.Exist {
log.Warning(fmt.Sprintf("[deleteLVIfNeeded] did not find LV %s in VG %s", llv.Spec.ActualLVNameOnTheNode, vgName))
return nil
}

// this case prevents unexpected same-name LV deletions which does not actually belong to our LLV
if !checkIfLVBelongsToLLV(llv, lv) {
log.Warning(fmt.Sprintf("[deleteLVIfNeeded] no need to delete LV %s as it doesnt belong to LVMLogicalVolume %s", lv.LVName, llv.Name))
if !checkIfLVBelongsToLLV(llv, &lv.Data) {
log.Warning(fmt.Sprintf("[deleteLVIfNeeded] no need to delete LV %s as it doesnt belong to LVMLogicalVolume %s", lv.Data.LVName, llv.Name))
return nil
}

@@ -170,8 +170,8 @@ func deleteLVIfNeeded(log logger.Logger, sdsCache *cache.Cache, vgName string, l
return err
}

log.Debug(fmt.Sprintf("[deleteLVIfNeeded] removes LV %s from the cache", lv.LVName))
sdsCache.RemoveLV(lv.VGName, lv.LVName)
log.Debug(fmt.Sprintf("[deleteLVIfNeeded] removes LV %s from the cache", lv.Data.LVName))
sdsCache.RemoveLV(lv.Data.VGName, lv.Data.LVName)

return nil
}
@@ -182,7 +182,7 @@ func getLVActualSize(sdsCache *cache.Cache, vgName, lvName string) resource.Quan
return resource.Quantity{}
}

result := resource.NewQuantity(lv.LVSize.Value(), resource.BinarySI)
result := resource.NewQuantity(lv.Data.LVSize.Value(), resource.BinarySI)

return *result
}
@@ -297,12 +297,9 @@ func validateLVMLogicalVolume(sdsCache *cache.Cache, llv *v1alpha1.LVMLogicalVol

// if a specified Thick LV name matches the existing Thin one
lv := sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode)
if lv != nil {
if len(lv.LVAttr) == 0 {
reason.WriteString(fmt.Sprintf("LV %s was found on the node, but can't be validated due to its attributes is empty string. ", lv.LVName))
} else if !checkIfLVBelongsToLLV(llv, lv) {
reason.WriteString(fmt.Sprintf("Specified LV %s is already created and it is doesnt match the one on the node.", lv.LVName))
}
if lv != nil &&
len(lv.Data.LVAttr) != 0 && !checkIfLVBelongsToLLV(llv, &lv.Data) {
reason.WriteString(fmt.Sprintf("Specified LV %s is already created and it is doesnt match the one on the node.", lv.Data.LVName))
}

if reason.Len() > 0 {
@@ -347,7 +344,7 @@ func shouldReconcileByUpdateFunc(sdsCache *cache.Cache, vgName string, llv *v1al
}

lv := sdsCache.FindLV(vgName, llv.Spec.ActualLVNameOnTheNode)
return lv != nil
return lv != nil && lv.Exist
}

func isContiguous(llv *v1alpha1.LVMLogicalVolume) bool {
Original file line number Diff line number Diff line change
@@ -153,7 +153,7 @@ func TestLVMLogicaVolumeWatcher(t *testing.T) {

v, r := validateLVMLogicalVolume(sdsCache, llv, &v1alpha1.LvmVolumeGroup{})
if assert.False(t, v) {
assert.Equal(t, "Zero size for LV. Thin pool specified for Thick LV. LV test-lv was found on the node, but can't be validated due to its attributes is empty string. ", r)
assert.Equal(t, "Zero size for LV. Thin pool specified for Thick LV. ", r)
}
})

Original file line number Diff line number Diff line change
@@ -732,7 +732,7 @@ func getStatusThinPools(log logger.Logger, thinPools, sortedLVs map[string][]int

for _, thinPool := range tps {
usedSize, err := getThinPoolUsedSize(thinPool)
log.Trace(fmt.Sprintf("[getStatusThinPools] LV %v for VG name %s", thinPool, vg.VGName))
log.Trace(fmt.Sprintf("[getStatusThinPools] Data %v for VG name %s", thinPool, vg.VGName))
if err != nil {
log.Error(err, "[getStatusThinPools] unable to getThinPoolUsedSize")
}
Original file line number Diff line number Diff line change
@@ -412,12 +412,12 @@ func validateLVGForUpdateFunc(log logger.Logger, sdsCache *cache.Cache, lvg *v1a
for _, tp := range lvg.Spec.ThinPools {
lv := sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, tp.Name)
if lv != nil {
if !isThinPool(*lv) {
reason.WriteString(fmt.Sprintf("LV %s is already created on the node and it is not a thin-pool", lv.LVName))
if !isThinPool(lv.Data) {
reason.WriteString(fmt.Sprintf("Data %s is already created on the node and it is not a thin-pool", lv.Data.LVName))
continue
}

actualThinPools[lv.LVName] = *lv
actualThinPools[lv.Data.LVName] = lv.Data
}
}

4 changes: 1 addition & 3 deletions images/agent/src/pkg/scanner/scanner.go
Original file line number Diff line number Diff line change
@@ -5,10 +5,10 @@ import (
"context"
"errors"
"fmt"
"k8s.io/utils/clock"
"time"

"github.com/pilebones/go-udev/netlink"
"k8s.io/utils/clock"
kubeCtrl "sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

@@ -162,13 +162,11 @@ func fillTheCache(ctx context.Context, log logger.Logger, cache *cache.Cache, cf
// the scan operations order is very important as it guarantees the consistent and reliable data from the node
realClock := clock.RealClock{}
now := time.Now()
fmt.Printf("LVS TIME BEFORE: %s", time.Now().String())
lvs, lvsErr, err := scanLVs(ctx, log, cfg)
log.Trace(fmt.Sprintf("[fillTheCache] LVS command runs for: %s", realClock.Since(now).String()))
if err != nil {
return err
}
fmt.Printf("LVS TIME AFTER: %s", time.Now().String())

now = time.Now()
vgs, vgsErr, err := scanVGs(ctx, log, cfg)

0 comments on commit a9a15bf

Please sign in to comment.