Skip to content

Commit

Permalink
refactored cache
Browse files Browse the repository at this point in the history
Signed-off-by: Viktor Kramarenko <[email protected]>
  • Loading branch information
ViktorKram committed Aug 22, 2024
1 parent 6d3a8c8 commit fc272c5
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 94 deletions.
78 changes: 63 additions & 15 deletions images/agent/src/pkg/cache/cache.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,31 @@
package cache

import (
"agent/internal"
"agent/pkg/logger"
"bytes"
"fmt"
"sync"

"agent/internal"
"agent/pkg/logger"
)

type Cache struct {
m sync.RWMutex
devices []internal.Device
deviceErrs bytes.Buffer
pvs []internal.PVData
pvsErrs bytes.Buffer
vgs []internal.VGData
vgsErrs bytes.Buffer
lvs []internal.LVData
lvs map[string]*LVData
lvsErrs bytes.Buffer
}

type LVData struct {
Data internal.LVData
Exist bool
}

func New() *Cache {
return &Cache{}
}
Expand Down Expand Up @@ -59,37 +67,71 @@ func (c *Cache) GetVGs() ([]internal.VGData, bytes.Buffer) {
}

func (c *Cache) StoreLVs(lvs []internal.LVData, stdErr bytes.Buffer) {
c.lvs = lvs
lvsOnNode := make(map[string]internal.LVData, len(lvs))
for _, lv := range lvs {
lvsOnNode[c.configureLVKey(lv.VGName, lv.LVName)] = lv
}

c.m.Lock()
defer c.m.Unlock()
for _, lv := range lvsOnNode {
k := c.configureLVKey(lv.VGName, lv.LVName)
if cachedLV, exist := c.lvs[k]; !exist || cachedLV.Exist {
c.lvs[k].Data = lv
}
}

for key, lv := range c.lvs {
if lv.Exist {
continue
}

if _, exist := lvsOnNode[key]; !exist {
delete(c.lvs, key)
}
}

c.lvsErrs = stdErr
}

func (c *Cache) GetLVs() ([]internal.LVData, bytes.Buffer) {
dst := make([]internal.LVData, len(c.lvs))
copy(dst, c.lvs)

c.m.RLock()
defer c.m.RUnlock()
for _, lv := range c.lvs {
dst = append(dst, lv.Data)
}

return dst, c.lvsErrs
}

func (c *Cache) FindLV(vgName, lvName string) *internal.LVData {
func (c *Cache) FindLV(vgName, lvName string) *LVData {
c.m.RLock()
defer c.m.RUnlock()
for _, lv := range c.lvs {
if lv.VGName == vgName && lv.LVName == lvName {
return &lv
if lv.Data.VGName == vgName && lv.Data.LVName == lvName {
return lv
}
}

return nil
}

func (c *Cache) AddLV(vgName, lvName string) {
c.lvs = append(c.lvs, internal.LVData{VGName: vgName, LVName: lvName})
c.m.Lock()
defer c.m.Unlock()
c.lvs[c.configureLVKey(vgName, lvName)] = &LVData{
Data: internal.LVData{VGName: vgName, LVName: lvName},
Exist: true,
}
}

func (c *Cache) RemoveLV(vgName, lvName string) {
for i, lv := range c.lvs {
if lv.VGName == vgName && lv.LVName == lvName {
c.lvs = append(c.lvs[:i], c.lvs[i+1:]...)
}
}
c.m.Lock()
defer c.m.Unlock()

c.lvs[c.configureLVKey(vgName, lvName)].Exist = false
}

func (c *Cache) FindVG(vgName string) *internal.VGData {
Expand Down Expand Up @@ -126,11 +168,17 @@ func (c *Cache) PrintTheCache(log logger.Logger) {
log.Cache(c.vgsErrs.String())
log.Cache("[VGs ENDS]")
log.Cache("[LVs BEGIN]")
c.m.RLock()
for _, lv := range c.lvs {
log.Cache(fmt.Sprintf(" LV Name: %s, VG name: %s, size: %s, tags: %s, attr: %s, pool: %s", lv.LVName, lv.VGName, lv.LVSize.String(), lv.LvTags, lv.LVAttr, lv.PoolName))
log.Cache(fmt.Sprintf(" Data Name: %s, VG name: %s, size: %s, tags: %s, attr: %s, pool: %s", lv.Data.LVName, lv.Data.VGName, lv.Data.LVSize.String(), lv.Data.LvTags, lv.Data.LVAttr, lv.Data.PoolName))
}
c.m.RUnlock()
log.Cache("[ERRS]")
log.Cache(c.lvsErrs.String())
log.Cache("[LVs ENDS]")
log.Cache("*****************CACHE ENDS*****************")
}

func (c *Cache) configureLVKey(vgName, lvName string) string {
return fmt.Sprintf("%s/%s", vgName, lvName)
}
4 changes: 2 additions & 2 deletions images/agent/src/pkg/controller/block_device.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ func DeleteAPIBlockDevice(ctx context.Context, kc kclient.Client, metrics monito

func ReTag(ctx context.Context, log logger.Logger, metrics monitoring.Metrics) error {
// thin pool
log.Debug("[ReTag] start re-tagging LV")
log.Debug("[ReTag] start re-tagging Data")
start := time.Now()
lvs, cmdStr, _, err := utils.GetAllLVs(ctx)
metrics.UtilsCommandsDuration(BlockDeviceCtrlName, "lvs").Observe(metrics.GetEstimatedTimeInSeconds(start))
Expand Down Expand Up @@ -623,7 +623,7 @@ func ReTag(ctx context.Context, log logger.Logger, metrics monitoring.Metrics) e
}
}
}
log.Debug("[ReTag] end re-tagging LV")
log.Debug("[ReTag] end re-tagging Data")

log.Debug("[ReTag] start re-tagging LVM")
start = time.Now()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m

lv := sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode)
if lv == nil {
err = fmt.Errorf("LV %s not found", llv.Spec.ActualLVNameOnTheNode)
log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to find LV %s of the LVMLogicalVolume %s", llv.Spec.ActualLVNameOnTheNode, llv.Name))
err = fmt.Errorf("Data %s not found", llv.Spec.ActualLVNameOnTheNode)
log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to find Data %s of the LVMLogicalVolume %s", llv.Spec.ActualLVNameOnTheNode, llv.Name))
err = updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, &llv, LLVStatusPhaseFailed, err.Error())
if err != nil {
log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to update the LVMLogicalVolume %s", llv.Name))
Expand All @@ -159,21 +159,21 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m
continue
}

if utils.AreSizesEqualWithinDelta(llvRequestedSize, lv.LVSize, internal.ResizeDelta) {
if utils.AreSizesEqualWithinDelta(llvRequestedSize, lv.Data.LVSize, internal.ResizeDelta) {
log.Info(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s should not be extended", llv.Name))
continue
}

if llvRequestedSize.Value() < lv.LVSize.Value() {
log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s requested size %s is less than actual one on the node %s", llv.Name, llvRequestedSize.String(), lv.LVSize.String()))
if llvRequestedSize.Value() < lv.Data.LVSize.Value() {
log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s requested size %s is less than actual one on the node %s", llv.Name, llvRequestedSize.String(), lv.Data.LVSize.String()))
continue
}

freeSpace := getFreeLVGSpaceForLLV(lvg, &llv)
if llvRequestedSize.Value()+internal.ResizeDelta.Value() > freeSpace.Value() {
err = errors.New("not enough space")
log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to extend the LV %s of the LVMLogicalVolume %s", llv.Spec.ActualLVNameOnTheNode, llv.Name))
err = updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, &llv, LLVStatusPhaseFailed, fmt.Sprintf("unable to extend LV, err: %s", err.Error()))
log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to extend the Data %s of the LVMLogicalVolume %s", llv.Spec.ActualLVNameOnTheNode, llv.Name))
err = updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, &llv, LLVStatusPhaseFailed, fmt.Sprintf("unable to extend Data, err: %s", err.Error()))
if err != nil {
log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to update the LVMLogicalVolume %s", llv.Name))
shouldRetry = true
Expand All @@ -191,8 +191,8 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m

cmd, err := utils.ExtendLV(llvRequestedSize.Value(), lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode)
if err != nil {
log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to extend LV %s of the LVMLogicalVolume %s, cmd: %s", llv.Spec.ActualLVNameOnTheNode, llv.Name, cmd))
err = updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, &llv, LLVStatusPhaseFailed, fmt.Sprintf("unable to extend LV, err: %s", err.Error()))
log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to extend Data %s of the LVMLogicalVolume %s, cmd: %s", llv.Spec.ActualLVNameOnTheNode, llv.Name, cmd))
err = updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, &llv, LLVStatusPhaseFailed, fmt.Sprintf("unable to extend Data, err: %s", err.Error()))
if err != nil {
log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to update the LVMLogicalVolume %s", llv.Name))
}
Expand All @@ -207,18 +207,18 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m
)
for currentAttempts < maxAttempts {
lv = sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode)
if utils.AreSizesEqualWithinDelta(lv.LVSize, llvRequestedSize, internal.ResizeDelta) {
log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s of the LVMLogicalVolume %s was successfully updated in the cache", lv.LVName, llv.Name))
if utils.AreSizesEqualWithinDelta(lv.Data.LVSize, llvRequestedSize, internal.ResizeDelta) {
log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] Data %s of the LVMLogicalVolume %s was successfully updated in the cache", lv.Data.LVName, llv.Name))
break
}

log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s size of the LVMLogicalVolume %s was not yet updated in the cache, retry...", lv.LVName, llv.Name))
log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] Data %s size of the LVMLogicalVolume %s was not yet updated in the cache, retry...", lv.Data.LVName, llv.Name))
currentAttempts++
time.Sleep(1 * time.Second)
}

if currentAttempts == maxAttempts {
err = fmt.Errorf("LV %s is not updated in the cache", lv.LVName)
err = fmt.Errorf("Data %s is not updated in the cache", lv.Data.LVName)
log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to resize the LVMLogicalVolume %s", llv.Name))
shouldRetry = true

Expand All @@ -228,7 +228,7 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m
continue
}

updated, err := updateLLVPhaseToCreatedIfNeeded(ctx, cl, &llv, lv.LVSize)
updated, err := updateLLVPhaseToCreatedIfNeeded(ctx, cl, &llv, lv.Data.LVSize)
if err != nil {
log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to update the LVMLogicalVolume %s", llv.Name))
shouldRetry = true
Expand Down
Loading

0 comments on commit fc272c5

Please sign in to comment.