Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Fix bug with unexpected LLV Failed statuses and phantom resourses #84

Merged
merged 4 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 75 additions & 13 deletions images/agent/src/pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,37 @@ package cache
import (
"bytes"
"fmt"
"sync"

"agent/internal"
"agent/pkg/logger"
)

const (
lvcount = 50
)

type Cache struct {
m sync.RWMutex
devices []internal.Device
deviceErrs bytes.Buffer
pvs []internal.PVData
pvsErrs bytes.Buffer
vgs []internal.VGData
vgsErrs bytes.Buffer
lvs []internal.LVData
lvs map[string]*LVData
lvsErrs bytes.Buffer
}

type LVData struct {
Data internal.LVData
Exist bool
}

func New() *Cache {
return &Cache{}
return &Cache{
lvs: make(map[string]*LVData, lvcount),
}
}

func (c *Cache) StoreDevices(devices []internal.Device, stdErr bytes.Buffer) {
Expand Down Expand Up @@ -60,25 +73,69 @@ func (c *Cache) GetVGs() ([]internal.VGData, bytes.Buffer) {
}

func (c *Cache) StoreLVs(lvs []internal.LVData, stdErr bytes.Buffer) {
c.lvs = lvs
lvsOnNode := make(map[string]internal.LVData, len(lvs))
for _, lv := range lvs {
lvsOnNode[c.configureLVKey(lv.VGName, lv.LVName)] = lv
}

c.m.Lock()
defer c.m.Unlock()

for _, lv := range lvsOnNode {
k := c.configureLVKey(lv.VGName, lv.LVName)
if cachedLV, exist := c.lvs[k]; !exist || cachedLV.Exist {
c.lvs[k] = &LVData{
Data: lv,
Exist: true,
}
}
}

for key, lv := range c.lvs {
if lv.Exist {
continue
}

if _, exist := lvsOnNode[key]; !exist {
delete(c.lvs, key)
}
}

c.lvsErrs = stdErr
}

func (c *Cache) GetLVs() ([]internal.LVData, bytes.Buffer) {
dst := make([]internal.LVData, len(c.lvs))
copy(dst, c.lvs)
dst := make([]internal.LVData, 0, len(c.lvs))

c.m.RLock()
defer c.m.RUnlock()
for _, lv := range c.lvs {
dst = append(dst, lv.Data)
}

return dst, c.lvsErrs
}

func (c *Cache) FindLV(vgName, lvName string) *internal.LVData {
for _, lv := range c.lvs {
if lv.VGName == vgName && lv.LVName == lvName {
return &lv
}
func (c *Cache) FindLV(vgName, lvName string) *LVData {
c.m.RLock()
defer c.m.RUnlock()
return c.lvs[c.configureLVKey(vgName, lvName)]
}

func (c *Cache) AddLV(vgName, lvName string) {
c.m.Lock()
defer c.m.Unlock()
c.lvs[c.configureLVKey(vgName, lvName)] = &LVData{
Data: internal.LVData{VGName: vgName, LVName: lvName},
Exist: true,
}
}

return nil
func (c *Cache) MarkLVAsRemoved(vgName, lvName string) {
c.m.Lock()
defer c.m.Unlock()

c.lvs[c.configureLVKey(vgName, lvName)].Exist = false
}

func (c *Cache) FindVG(vgName string) *internal.VGData {
Expand Down Expand Up @@ -115,11 +172,16 @@ func (c *Cache) PrintTheCache(log logger.Logger) {
log.Cache(c.vgsErrs.String())
log.Cache("[VGs ENDS]")
log.Cache("[LVs BEGIN]")
for _, lv := range c.lvs {
log.Cache(fmt.Sprintf(" LV Name: %s, VG name: %s, size: %s, tags: %s, attr: %s, pool: %s", lv.LVName, lv.VGName, lv.LVSize.String(), lv.LvTags, lv.LVAttr, lv.PoolName))
lvs, _ := c.GetLVs()
for _, lv := range lvs {
log.Cache(fmt.Sprintf(" Data Name: %s, VG name: %s, size: %s, tags: %s, attr: %s, pool: %s", lv.LVName, lv.VGName, lv.LVSize.String(), lv.LvTags, lv.LVAttr, lv.PoolName))
}
log.Cache("[ERRS]")
log.Cache(c.lvsErrs.String())
log.Cache("[LVs ENDS]")
log.Cache("*****************CACHE ENDS*****************")
}

func (c *Cache) configureLVKey(vgName, lvName string) string {
return fmt.Sprintf("%s/%s", vgName, lvName)
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m

lv := sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode)
if lv == nil {
err = fmt.Errorf("LV %s not found", llv.Spec.ActualLVNameOnTheNode)
err = fmt.Errorf("lv %s not found", llv.Spec.ActualLVNameOnTheNode)
log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to find LV %s of the LVMLogicalVolume %s", llv.Spec.ActualLVNameOnTheNode, llv.Name))
err = updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, &llv, LLVStatusPhaseFailed, err.Error())
if err != nil {
Expand All @@ -159,13 +159,13 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m
continue
}

if utils.AreSizesEqualWithinDelta(llvRequestedSize, lv.LVSize, internal.ResizeDelta) {
if utils.AreSizesEqualWithinDelta(llvRequestedSize, lv.Data.LVSize, internal.ResizeDelta) {
log.Info(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s should not be extended", llv.Name))
continue
}

if llvRequestedSize.Value() < lv.LVSize.Value() {
log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s requested size %s is less than actual one on the node %s", llv.Name, llvRequestedSize.String(), lv.LVSize.String()))
if llvRequestedSize.Value() < lv.Data.LVSize.Value() {
log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s requested size %s is less than actual one on the node %s", llv.Name, llvRequestedSize.String(), lv.Data.LVSize.String()))
continue
}

Expand Down Expand Up @@ -207,18 +207,18 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m
)
for currentAttempts < maxAttempts {
lv = sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode)
if utils.AreSizesEqualWithinDelta(lv.LVSize, llvRequestedSize, internal.ResizeDelta) {
log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s of the LVMLogicalVolume %s was successfully updated in the cache", lv.LVName, llv.Name))
if utils.AreSizesEqualWithinDelta(lv.Data.LVSize, llvRequestedSize, internal.ResizeDelta) {
log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s of the LVMLogicalVolume %s was successfully updated in the cache", lv.Data.LVName, llv.Name))
break
}

log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s size of the LVMLogicalVolume %s was not yet updated in the cache, retry...", lv.LVName, llv.Name))
log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s size of the LVMLogicalVolume %s was not yet updated in the cache, retry...", lv.Data.LVName, llv.Name))
currentAttempts++
time.Sleep(1 * time.Second)
}

if currentAttempts == maxAttempts {
err = fmt.Errorf("LV %s is not updated in the cache", lv.LVName)
err = fmt.Errorf("LV %s is not updated in the cache", lv.Data.LVName)
log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to resize the LVMLogicalVolume %s", llv.Name))
shouldRetry = true

Expand All @@ -228,7 +228,7 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m
continue
}

updated, err := updateLLVPhaseToCreatedIfNeeded(ctx, cl, &llv, lv.LVSize)
updated, err := updateLLVPhaseToCreatedIfNeeded(ctx, cl, &llv, lv.Data.LVSize)
if err != nil {
log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to update the LVMLogicalVolume %s", llv.Name))
shouldRetry = true
Expand Down
7 changes: 4 additions & 3 deletions images/agent/src/pkg/controller/lvm_logical_volume_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ func reconcileLLVCreateFunc(

log.Info(fmt.Sprintf("[reconcileLLVCreateFunc] successfully created LV %s in VG %s for LVMLogicalVolume resource with name: %s", llv.Spec.ActualLVNameOnTheNode, lvg.Spec.ActualVGNameOnTheNode, llv.Name))

log.Debug(fmt.Sprintf("[reconcileLLVCreateFunc] adds the LV %s to the cache", llv.Spec.ActualLVNameOnTheNode))
sdsCache.AddLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode)
log.Debug(fmt.Sprintf("[reconcileLLVCreateFunc] tries to get the LV %s actual size", llv.Spec.ActualLVNameOnTheNode))
actualSize := getLVActualSize(sdsCache, lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode)
if actualSize.Value() == 0 {
Expand Down Expand Up @@ -323,9 +325,8 @@ func reconcileLLVUpdateFunc(
log.Debug(fmt.Sprintf("[reconcileLLVUpdateFunc] tries to get LVMLogicalVolume %s actual size before the extension", llv.Name))
actualSize := getLVActualSize(sdsCache, lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode)
if actualSize.Value() == 0 {
err := fmt.Errorf("LV %s has zero size (likely LV was not found in the cache)", llv.Spec.ActualLVNameOnTheNode)
log.Error(err, fmt.Sprintf("[reconcileLLVUpdateFunc] unable to get actual LV %s size of the LVMLogicalVolume %s", llv.Spec.ActualLVNameOnTheNode, llv.Name))
return true, err
log.Warning(fmt.Sprintf("[reconcileLLVUpdateFunc] LV %s of the LVMLogicalVolume %s has zero size (likely LV was not updated in the cache) ", llv.Spec.ActualLVNameOnTheNode, llv.Name))
return true, nil
}
log.Debug(fmt.Sprintf("[reconcileLLVUpdateFunc] successfully got LVMLogicalVolume %s actual size %s before the extension", llv.Name, actualSize.String()))

Expand Down
22 changes: 11 additions & 11 deletions images/agent/src/pkg/controller/lvm_logical_volume_watcher_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,14 @@ func updateLLVPhaseToCreatedIfNeeded(ctx context.Context, cl client.Client, llv

func deleteLVIfNeeded(log logger.Logger, sdsCache *cache.Cache, vgName string, llv *v1alpha1.LVMLogicalVolume) error {
lv := sdsCache.FindLV(vgName, llv.Spec.ActualLVNameOnTheNode)
if lv == nil {
if lv == nil || !lv.Exist {
log.Warning(fmt.Sprintf("[deleteLVIfNeeded] did not find LV %s in VG %s", llv.Spec.ActualLVNameOnTheNode, vgName))
return nil
}

// this case prevents unexpected same-name LV deletions which does not actually belong to our LLV
if !checkIfLVBelongsToLLV(llv, lv) {
log.Warning(fmt.Sprintf("[deleteLVIfNeeded] no need to delete LV %s as it doesnt belong to LVMLogicalVolume %s", lv.LVName, llv.Name))
if !checkIfLVBelongsToLLV(llv, &lv.Data) {
log.Warning(fmt.Sprintf("[deleteLVIfNeeded] no need to delete LV %s as it doesnt belong to LVMLogicalVolume %s", lv.Data.LVName, llv.Name))
return nil
}

Expand All @@ -170,6 +170,9 @@ func deleteLVIfNeeded(log logger.Logger, sdsCache *cache.Cache, vgName string, l
return err
}

log.Debug(fmt.Sprintf("[deleteLVIfNeeded] mark LV %s in the cache as removed", lv.Data.LVName))
sdsCache.MarkLVAsRemoved(lv.Data.VGName, lv.Data.LVName)

return nil
}

Expand All @@ -179,7 +182,7 @@ func getLVActualSize(sdsCache *cache.Cache, vgName, lvName string) resource.Quan
return resource.Quantity{}
}

result := resource.NewQuantity(lv.LVSize.Value(), resource.BinarySI)
result := resource.NewQuantity(lv.Data.LVSize.Value(), resource.BinarySI)

return *result
}
Expand Down Expand Up @@ -294,12 +297,9 @@ func validateLVMLogicalVolume(sdsCache *cache.Cache, llv *v1alpha1.LVMLogicalVol

// if a specified Thick LV name matches the existing Thin one
lv := sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode)
if lv != nil {
if len(lv.LVAttr) == 0 {
reason.WriteString(fmt.Sprintf("LV %s was found on the node, but can't be validated due to its attributes is empty string. ", lv.LVName))
} else if !checkIfLVBelongsToLLV(llv, lv) {
reason.WriteString(fmt.Sprintf("Specified LV %s is already created and it is doesnt match the one on the node.", lv.LVName))
}
if lv != nil &&
len(lv.Data.LVAttr) != 0 && !checkIfLVBelongsToLLV(llv, &lv.Data) {
reason.WriteString(fmt.Sprintf("Specified LV %s is already created and it is doesnt match the one on the node.", lv.Data.LVName))
}

if reason.Len() > 0 {
Expand Down Expand Up @@ -344,7 +344,7 @@ func shouldReconcileByUpdateFunc(sdsCache *cache.Cache, vgName string, llv *v1al
}

lv := sdsCache.FindLV(vgName, llv.Spec.ActualLVNameOnTheNode)
return lv != nil
return lv != nil && lv.Exist
}

func isContiguous(llv *v1alpha1.LVMLogicalVolume) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestLVMLogicaVolumeWatcher(t *testing.T) {

v, r := validateLVMLogicalVolume(sdsCache, llv, &v1alpha1.LvmVolumeGroup{})
if assert.False(t, v) {
assert.Equal(t, "Zero size for LV. Thin pool specified for Thick LV. LV test-lv was found on the node, but can't be validated due to its attributes is empty string. ", r)
assert.Equal(t, "Zero size for LV. Thin pool specified for Thick LV. ", r)
}
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,12 @@ func validateLVGForUpdateFunc(log logger.Logger, sdsCache *cache.Cache, lvg *v1a
for _, tp := range lvg.Spec.ThinPools {
lv := sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, tp.Name)
if lv != nil {
if !isThinPool(*lv) {
reason.WriteString(fmt.Sprintf("LV %s is already created on the node and it is not a thin-pool", lv.LVName))
if !isThinPool(lv.Data) {
reason.WriteString(fmt.Sprintf("LV %s is already created on the node and it is not a thin-pool", lv.Data.LVName))
continue
}

actualThinPools[lv.LVName] = *lv
actualThinPools[lv.Data.LVName] = lv.Data
}
}

Expand Down
10 changes: 10 additions & 0 deletions images/agent/src/pkg/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/pilebones/go-udev/netlink"
"k8s.io/utils/clock"
kubeCtrl "sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

Expand Down Expand Up @@ -159,22 +160,31 @@ func runControllersReconcile(ctx context.Context, log logger.Logger, bdCtrl, lvg

func fillTheCache(ctx context.Context, log logger.Logger, cache *cache.Cache, cfg config.Options) error {
// the scan operations order is very important as it guarantees the consistent and reliable data from the node
realClock := clock.RealClock{}
now := time.Now()
lvs, lvsErr, err := scanLVs(ctx, log, cfg)
log.Trace(fmt.Sprintf("[fillTheCache] LVS command runs for: %s", realClock.Since(now).String()))
if err != nil {
return err
}

now = time.Now()
vgs, vgsErr, err := scanVGs(ctx, log, cfg)
log.Trace(fmt.Sprintf("[fillTheCache] VGS command runs for: %s", realClock.Since(now).String()))
if err != nil {
return err
}

now = time.Now()
pvs, pvsErr, err := scanPVs(ctx, log, cfg)
log.Trace(fmt.Sprintf("[fillTheCache] PVS command runs for: %s", realClock.Since(now).String()))
if err != nil {
return err
}

now = time.Now()
devices, devErr, err := scanDevices(ctx, log, cfg)
log.Trace(fmt.Sprintf("[fillTheCache] LSBLK command runs for: %s", realClock.Since(now).String()))
if err != nil {
return err
}
Expand Down
Loading