Skip to content

Commit

Permalink
refactored cache
Browse files Browse the repository at this point in the history
Signed-off-by: Viktor Kramarenko <[email protected]>
  • Loading branch information
ViktorKram committed Aug 22, 2024
1 parent 6d3a8c8 commit 1d3f597
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 69 deletions.
89 changes: 69 additions & 20 deletions images/agent/src/pkg/cache/cache.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,31 @@
package cache

import (
"agent/internal"
"agent/pkg/logger"
"bytes"
"fmt"
"sync"

"agent/internal"
"agent/pkg/logger"
)

type Cache struct {
m sync.RWMutex
devices []internal.Device
deviceErrs bytes.Buffer
pvs []internal.PVData
pvsErrs bytes.Buffer
vgs []internal.VGData
vgsErrs bytes.Buffer
lvs []internal.LVData
lvs map[string]*LVData
lvsErrs bytes.Buffer
}

type LVData struct {
Data internal.LVData
Exist bool
}

func New() *Cache {
return &Cache{}
}
Expand Down Expand Up @@ -59,37 +67,72 @@ func (c *Cache) GetVGs() ([]internal.VGData, bytes.Buffer) {
}

func (c *Cache) StoreLVs(lvs []internal.LVData, stdErr bytes.Buffer) {
c.lvs = lvs
lvsOnNode := make(map[string]internal.LVData, len(lvs))
for _, lv := range lvs {
lvsOnNode[c.configureLVKey(lv.VGName, lv.LVName)] = lv
}

c.m.Lock()
defer c.m.Unlock()
if c.lvs == nil {
c.lvs = make(map[string]*LVData, len(lvs))
}

for _, lv := range lvsOnNode {
k := c.configureLVKey(lv.VGName, lv.LVName)
if cachedLV, exist := c.lvs[k]; !exist || cachedLV.Exist {
c.lvs[k] = &LVData{
Data: lv,
Exist: true,
}
}
}

for key, lv := range c.lvs {
if lv.Exist {
continue
}

if _, exist := lvsOnNode[key]; !exist {
delete(c.lvs, key)
}
}

c.lvsErrs = stdErr
}

func (c *Cache) GetLVs() ([]internal.LVData, bytes.Buffer) {
dst := make([]internal.LVData, len(c.lvs))
copy(dst, c.lvs)

return dst, c.lvsErrs
}

func (c *Cache) FindLV(vgName, lvName string) *internal.LVData {
c.m.RLock()
defer c.m.RUnlock()
for _, lv := range c.lvs {
if lv.VGName == vgName && lv.LVName == lvName {
return &lv
}
dst = append(dst, lv.Data)
}

return nil
return dst, c.lvsErrs
}

func (c *Cache) FindLV(vgName, lvName string) *LVData {
c.m.RLock()
defer c.m.RUnlock()
return c.lvs[c.configureLVKey(vgName, lvName)]
}

func (c *Cache) AddLV(vgName, lvName string) {
c.lvs = append(c.lvs, internal.LVData{VGName: vgName, LVName: lvName})
c.m.Lock()
defer c.m.Unlock()
c.lvs[c.configureLVKey(vgName, lvName)] = &LVData{
Data: internal.LVData{VGName: vgName, LVName: lvName},
Exist: true,
}
}

func (c *Cache) RemoveLV(vgName, lvName string) {
for i, lv := range c.lvs {
if lv.VGName == vgName && lv.LVName == lvName {
c.lvs = append(c.lvs[:i], c.lvs[i+1:]...)
}
}
c.m.Lock()
defer c.m.Unlock()

c.lvs[c.configureLVKey(vgName, lvName)].Exist = false
}

func (c *Cache) FindVG(vgName string) *internal.VGData {
Expand Down Expand Up @@ -126,11 +169,17 @@ func (c *Cache) PrintTheCache(log logger.Logger) {
log.Cache(c.vgsErrs.String())
log.Cache("[VGs ENDS]")
log.Cache("[LVs BEGIN]")
c.m.RLock()
for _, lv := range c.lvs {
log.Cache(fmt.Sprintf(" LV Name: %s, VG name: %s, size: %s, tags: %s, attr: %s, pool: %s", lv.LVName, lv.VGName, lv.LVSize.String(), lv.LvTags, lv.LVAttr, lv.PoolName))
log.Cache(fmt.Sprintf(" Data Name: %s, VG name: %s, size: %s, tags: %s, attr: %s, pool: %s", lv.Data.LVName, lv.Data.VGName, lv.Data.LVSize.String(), lv.Data.LvTags, lv.Data.LVAttr, lv.Data.PoolName))
}
c.m.RUnlock()
log.Cache("[ERRS]")
log.Cache(c.lvsErrs.String())
log.Cache("[LVs ENDS]")
log.Cache("*****************CACHE ENDS*****************")
}

func (c *Cache) configureLVKey(vgName, lvName string) string {
return fmt.Sprintf("%s/%s", vgName, lvName)
}
4 changes: 2 additions & 2 deletions images/agent/src/pkg/controller/block_device.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ func DeleteAPIBlockDevice(ctx context.Context, kc kclient.Client, metrics monito

func ReTag(ctx context.Context, log logger.Logger, metrics monitoring.Metrics) error {
// thin pool
log.Debug("[ReTag] start re-tagging LV")
log.Debug("[ReTag] start re-tagging Data")
start := time.Now()
lvs, cmdStr, _, err := utils.GetAllLVs(ctx)
metrics.UtilsCommandsDuration(BlockDeviceCtrlName, "lvs").Observe(metrics.GetEstimatedTimeInSeconds(start))
Expand Down Expand Up @@ -623,7 +623,7 @@ func ReTag(ctx context.Context, log logger.Logger, metrics monitoring.Metrics) e
}
}
}
log.Debug("[ReTag] end re-tagging LV")
log.Debug("[ReTag] end re-tagging Data")

log.Debug("[ReTag] start re-tagging LVM")
start = time.Now()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m

lv := sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode)
if lv == nil {
err = fmt.Errorf("LV %s not found", llv.Spec.ActualLVNameOnTheNode)
err = fmt.Errorf("lv %s not found", llv.Spec.ActualLVNameOnTheNode)
log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to find LV %s of the LVMLogicalVolume %s", llv.Spec.ActualLVNameOnTheNode, llv.Name))
err = updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, &llv, LLVStatusPhaseFailed, err.Error())
if err != nil {
Expand All @@ -159,13 +159,13 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m
continue
}

if utils.AreSizesEqualWithinDelta(llvRequestedSize, lv.LVSize, internal.ResizeDelta) {
if utils.AreSizesEqualWithinDelta(llvRequestedSize, lv.Data.LVSize, internal.ResizeDelta) {
log.Info(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s should not be extended", llv.Name))
continue
}

if llvRequestedSize.Value() < lv.LVSize.Value() {
log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s requested size %s is less than actual one on the node %s", llv.Name, llvRequestedSize.String(), lv.LVSize.String()))
if llvRequestedSize.Value() < lv.Data.LVSize.Value() {
log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s requested size %s is less than actual one on the node %s", llv.Name, llvRequestedSize.String(), lv.Data.LVSize.String()))
continue
}

Expand Down Expand Up @@ -207,18 +207,18 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m
)
for currentAttempts < maxAttempts {
lv = sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode)
if utils.AreSizesEqualWithinDelta(lv.LVSize, llvRequestedSize, internal.ResizeDelta) {
log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s of the LVMLogicalVolume %s was successfully updated in the cache", lv.LVName, llv.Name))
if utils.AreSizesEqualWithinDelta(lv.Data.LVSize, llvRequestedSize, internal.ResizeDelta) {
log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s of the LVMLogicalVolume %s was successfully updated in the cache", lv.Data.LVName, llv.Name))
break
}

log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s size of the LVMLogicalVolume %s was not yet updated in the cache, retry...", lv.LVName, llv.Name))
log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s size of the LVMLogicalVolume %s was not yet updated in the cache, retry...", lv.Data.LVName, llv.Name))
currentAttempts++
time.Sleep(1 * time.Second)
}

if currentAttempts == maxAttempts {
err = fmt.Errorf("LV %s is not updated in the cache", lv.LVName)
err = fmt.Errorf("LV %s is not updated in the cache", lv.Data.LVName)
log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to resize the LVMLogicalVolume %s", llv.Name))
shouldRetry = true

Expand All @@ -228,7 +228,7 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m
continue
}

updated, err := updateLLVPhaseToCreatedIfNeeded(ctx, cl, &llv, lv.LVSize)
updated, err := updateLLVPhaseToCreatedIfNeeded(ctx, cl, &llv, lv.Data.LVSize)
if err != nil {
log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to update the LVMLogicalVolume %s", llv.Name))
shouldRetry = true
Expand Down
4 changes: 0 additions & 4 deletions images/agent/src/pkg/controller/lvm_logical_volume_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,6 @@ func RunLVMLogicalVolumeWatcherController(
return reconcile.Result{RequeueAfter: cfg.LLVRequeueIntervalSec}, nil
}

// создаем LV и пытаемся получить его размер на 113278 строчке I0819 12:43:10.563596
// при этом у нас кэш начинает заполняться в I0819 12:43:09.486602 и заканчивает в I0819 12:43:22.070604, а потом только в I0819 12:43:22.081861
// при этом получаем ретрай в I0819 12:43:15.563851 (и на мо

log.Info(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] successfully ended reconciliation of the LVMLogicalVolume %s", request.Name))
return reconcile.Result{}, nil
}),
Expand Down
38 changes: 19 additions & 19 deletions images/agent/src/pkg/controller/lvm_logical_volume_watcher_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,26 +152,26 @@ func updateLLVPhaseToCreatedIfNeeded(ctx context.Context, cl client.Client, llv

func deleteLVIfNeeded(log logger.Logger, sdsCache *cache.Cache, vgName string, llv *v1alpha1.LVMLogicalVolume) error {
lv := sdsCache.FindLV(vgName, llv.Spec.ActualLVNameOnTheNode)
if lv == nil {
log.Warning(fmt.Sprintf("[deleteLVIfNeeded] did not find LV %s in VG %s", llv.Spec.ActualLVNameOnTheNode, vgName))
if lv == nil || !lv.Exist {
log.Warning(fmt.Sprintf("[deleteLVIfNeeded] did not find Data %s in VG %s", llv.Spec.ActualLVNameOnTheNode, vgName))
return nil
}

// this case prevents unexpected same-name LV deletions which does not actually belong to our LLV
if !checkIfLVBelongsToLLV(llv, lv) {
log.Warning(fmt.Sprintf("[deleteLVIfNeeded] no need to delete LV %s as it doesnt belong to LVMLogicalVolume %s", lv.LVName, llv.Name))
// this case prevents unexpected same-name Data deletions which does not actually belong to our LLV
if !checkIfLVBelongsToLLV(llv, &lv.Data) {
log.Warning(fmt.Sprintf("[deleteLVIfNeeded] no need to delete Data %s as it doesnt belong to LVMLogicalVolume %s", lv.Data.LVName, llv.Name))
return nil
}

cmd, err := utils.RemoveLV(vgName, llv.Spec.ActualLVNameOnTheNode)
log.Debug(fmt.Sprintf("[deleteLVIfNeeded] runs cmd: %s", cmd))
if err != nil {
log.Error(err, fmt.Sprintf("[deleteLVIfNeeded] unable to remove LV %s from VG %s", llv.Spec.ActualLVNameOnTheNode, vgName))
log.Error(err, fmt.Sprintf("[deleteLVIfNeeded] unable to remove Data %s from VG %s", llv.Spec.ActualLVNameOnTheNode, vgName))
return err
}

log.Debug(fmt.Sprintf("[deleteLVIfNeeded] removes LV %s from the cache", lv.LVName))
sdsCache.RemoveLV(lv.VGName, lv.LVName)
log.Debug(fmt.Sprintf("[deleteLVIfNeeded] removes Data %s from the cache", lv.Data.LVName))
sdsCache.RemoveLV(lv.Data.VGName, lv.Data.LVName)

return nil
}
Expand All @@ -182,7 +182,7 @@ func getLVActualSize(sdsCache *cache.Cache, vgName, lvName string) resource.Quan
return resource.Quantity{}
}

result := resource.NewQuantity(lv.LVSize.Value(), resource.BinarySI)
result := resource.NewQuantity(lv.Data.LVSize.Value(), resource.BinarySI)

return *result
}
Expand Down Expand Up @@ -253,7 +253,7 @@ func validateLVMLogicalVolume(sdsCache *cache.Cache, llv *v1alpha1.LVMLogicalVol
reason := strings.Builder{}

if len(llv.Spec.ActualLVNameOnTheNode) == 0 {
reason.WriteString("No LV name specified. ")
reason.WriteString("No Data name specified. ")
}

llvRequestedSize, err := getLLVRequestedSize(llv, lvg)
Expand All @@ -262,12 +262,12 @@ func validateLVMLogicalVolume(sdsCache *cache.Cache, llv *v1alpha1.LVMLogicalVol
}

if llvRequestedSize.Value() == 0 {
reason.WriteString("Zero size for LV. ")
reason.WriteString("Zero size for Data. ")
}

if llv.Status != nil {
if llvRequestedSize.Value()+internal.ResizeDelta.Value() < llv.Status.ActualSize.Value() {
reason.WriteString("Desired LV size is less than actual one. ")
reason.WriteString("Desired Data size is less than actual one. ")
}
}

Expand All @@ -291,17 +291,17 @@ func validateLVMLogicalVolume(sdsCache *cache.Cache, llv *v1alpha1.LVMLogicalVol
}
case Thick:
if llv.Spec.Thin != nil {
reason.WriteString("Thin pool specified for Thick LV. ")
reason.WriteString("Thin pool specified for Thick Data. ")
}
}

// if a specified Thick LV name matches the existing Thin one
// if a specified Thick Data name matches the existing Thin one
lv := sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode)
if lv != nil {
if len(lv.LVAttr) == 0 {
reason.WriteString(fmt.Sprintf("LV %s was found on the node, but can't be validated due to its attributes is empty string. ", lv.LVName))
} else if !checkIfLVBelongsToLLV(llv, lv) {
reason.WriteString(fmt.Sprintf("Specified LV %s is already created and it is doesnt match the one on the node.", lv.LVName))
if len(lv.Data.LVAttr) == 0 {
// this means we found a LV added by the controller (it is empty template)
} else if !checkIfLVBelongsToLLV(llv, &lv.Data) {
reason.WriteString(fmt.Sprintf("Specified Data %s is already created and it is doesnt match the one on the node.", lv.Data.LVName))
}
}

Expand Down Expand Up @@ -347,7 +347,7 @@ func shouldReconcileByUpdateFunc(sdsCache *cache.Cache, vgName string, llv *v1al
}

lv := sdsCache.FindLV(vgName, llv.Spec.ActualLVNameOnTheNode)
return lv != nil
return lv != nil && lv.Exist
}

func isContiguous(llv *v1alpha1.LVMLogicalVolume) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestLVMLogicaVolumeWatcher(t *testing.T) {

v, r := validateLVMLogicalVolume(sdsCache, llv, &v1alpha1.LvmVolumeGroup{})
if assert.False(t, v) {
assert.Equal(t, "Zero size for LV. Thin pool specified for Thick LV. LV test-lv was found on the node, but can't be validated due to its attributes is empty string. ", r)
assert.Equal(t, "Zero size for Data. Thin pool specified for Thick Data. Data test-lv was found on the node, but can't be validated due to its attributes is empty string. ", r)
}
})

Expand Down Expand Up @@ -212,7 +212,7 @@ func TestLVMLogicaVolumeWatcher(t *testing.T) {

v, r := validateLVMLogicalVolume(sdsCache, llv, &v1alpha1.LvmVolumeGroup{})
if assert.False(t, v) {
assert.Equal(t, "No LV name specified. Zero size for LV. No thin pool specified. ", r)
assert.Equal(t, "No Data name specified. Zero size for Data. No thin pool specified. ", r)
}
})
})
Expand Down
Loading

0 comments on commit 1d3f597

Please sign in to comment.