Skip to content

Commit 2af3028

Browse files
committed
refactored cache
Signed-off-by: Viktor Kramarenko <[email protected]>
1 parent 6d3a8c8 commit 2af3028

7 files changed

+98
-55
lines changed

images/agent/src/pkg/cache/cache.go

+74-22
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,39 @@
11
package cache
22

33
import (
4-
"agent/internal"
5-
"agent/pkg/logger"
64
"bytes"
75
"fmt"
6+
"sync"
7+
8+
"agent/internal"
9+
"agent/pkg/logger"
10+
)
11+
12+
const (
13+
lvcount = 50
814
)
915

1016
type Cache struct {
17+
m sync.RWMutex
1118
devices []internal.Device
1219
deviceErrs bytes.Buffer
1320
pvs []internal.PVData
1421
pvsErrs bytes.Buffer
1522
vgs []internal.VGData
1623
vgsErrs bytes.Buffer
17-
lvs []internal.LVData
24+
lvs map[string]*LVData
1825
lvsErrs bytes.Buffer
1926
}
2027

28+
type LVData struct {
29+
Data internal.LVData
30+
Exist bool
31+
}
32+
2133
func New() *Cache {
22-
return &Cache{}
34+
return &Cache{
35+
lvs: make(map[string]*LVData, lvcount),
36+
}
2337
}
2438

2539
func (c *Cache) StoreDevices(devices []internal.Device, stdErr bytes.Buffer) {
@@ -59,37 +73,69 @@ func (c *Cache) GetVGs() ([]internal.VGData, bytes.Buffer) {
5973
}
6074

6175
func (c *Cache) StoreLVs(lvs []internal.LVData, stdErr bytes.Buffer) {
62-
c.lvs = lvs
76+
lvsOnNode := make(map[string]internal.LVData, len(lvs))
77+
for _, lv := range lvs {
78+
lvsOnNode[c.configureLVKey(lv.VGName, lv.LVName)] = lv
79+
}
80+
81+
c.m.Lock()
82+
defer c.m.Unlock()
83+
84+
for _, lv := range lvsOnNode {
85+
k := c.configureLVKey(lv.VGName, lv.LVName)
86+
if cachedLV, exist := c.lvs[k]; !exist || cachedLV.Exist {
87+
c.lvs[k] = &LVData{
88+
Data: lv,
89+
Exist: true,
90+
}
91+
}
92+
}
93+
94+
for key, lv := range c.lvs {
95+
if lv.Exist {
96+
continue
97+
}
98+
99+
if _, exist := lvsOnNode[key]; !exist {
100+
delete(c.lvs, key)
101+
}
102+
}
103+
63104
c.lvsErrs = stdErr
64105
}
65106

66107
func (c *Cache) GetLVs() ([]internal.LVData, bytes.Buffer) {
67-
dst := make([]internal.LVData, len(c.lvs))
68-
copy(dst, c.lvs)
108+
dst := make([]internal.LVData, 0, len(c.lvs))
69109

70-
return dst, c.lvsErrs
71-
}
72-
73-
func (c *Cache) FindLV(vgName, lvName string) *internal.LVData {
110+
c.m.RLock()
111+
defer c.m.RUnlock()
74112
for _, lv := range c.lvs {
75-
if lv.VGName == vgName && lv.LVName == lvName {
76-
return &lv
77-
}
113+
dst = append(dst, lv.Data)
78114
}
79115

80-
return nil
116+
return dst, c.lvsErrs
117+
}
118+
119+
func (c *Cache) FindLV(vgName, lvName string) *LVData {
120+
c.m.RLock()
121+
defer c.m.RUnlock()
122+
return c.lvs[c.configureLVKey(vgName, lvName)]
81123
}
82124

83125
func (c *Cache) AddLV(vgName, lvName string) {
84-
c.lvs = append(c.lvs, internal.LVData{VGName: vgName, LVName: lvName})
126+
c.m.Lock()
127+
defer c.m.Unlock()
128+
c.lvs[c.configureLVKey(vgName, lvName)] = &LVData{
129+
Data: internal.LVData{VGName: vgName, LVName: lvName},
130+
Exist: true,
131+
}
85132
}
86133

87134
func (c *Cache) RemoveLV(vgName, lvName string) {
88-
for i, lv := range c.lvs {
89-
if lv.VGName == vgName && lv.LVName == lvName {
90-
c.lvs = append(c.lvs[:i], c.lvs[i+1:]...)
91-
}
92-
}
135+
c.m.Lock()
136+
defer c.m.Unlock()
137+
138+
c.lvs[c.configureLVKey(vgName, lvName)].Exist = false
93139
}
94140

95141
func (c *Cache) FindVG(vgName string) *internal.VGData {
@@ -126,11 +172,17 @@ func (c *Cache) PrintTheCache(log logger.Logger) {
126172
log.Cache(c.vgsErrs.String())
127173
log.Cache("[VGs ENDS]")
128174
log.Cache("[LVs BEGIN]")
175+
c.m.RLock()
129176
for _, lv := range c.lvs {
130-
log.Cache(fmt.Sprintf(" LV Name: %s, VG name: %s, size: %s, tags: %s, attr: %s, pool: %s", lv.LVName, lv.VGName, lv.LVSize.String(), lv.LvTags, lv.LVAttr, lv.PoolName))
177+
log.Cache(fmt.Sprintf(" Data Name: %s, VG name: %s, size: %s, tags: %s, attr: %s, pool: %s", lv.Data.LVName, lv.Data.VGName, lv.Data.LVSize.String(), lv.Data.LvTags, lv.Data.LVAttr, lv.Data.PoolName))
131178
}
179+
c.m.RUnlock()
132180
log.Cache("[ERRS]")
133181
log.Cache(c.lvsErrs.String())
134182
log.Cache("[LVs ENDS]")
135183
log.Cache("*****************CACHE ENDS*****************")
136184
}
185+
186+
func (c *Cache) configureLVKey(vgName, lvName string) string {
187+
return fmt.Sprintf("%s/%s", vgName, lvName)
188+
}

images/agent/src/pkg/controller/lvm_logical_volume_extender_watcher.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m
149149

150150
lv := sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode)
151151
if lv == nil {
152-
err = fmt.Errorf("LV %s not found", llv.Spec.ActualLVNameOnTheNode)
152+
err = fmt.Errorf("lv %s not found", llv.Spec.ActualLVNameOnTheNode)
153153
log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to find LV %s of the LVMLogicalVolume %s", llv.Spec.ActualLVNameOnTheNode, llv.Name))
154154
err = updateLVMLogicalVolumePhaseIfNeeded(ctx, cl, log, metrics, &llv, LLVStatusPhaseFailed, err.Error())
155155
if err != nil {
@@ -159,13 +159,13 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m
159159
continue
160160
}
161161

162-
if utils.AreSizesEqualWithinDelta(llvRequestedSize, lv.LVSize, internal.ResizeDelta) {
162+
if utils.AreSizesEqualWithinDelta(llvRequestedSize, lv.Data.LVSize, internal.ResizeDelta) {
163163
log.Info(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s should not be extended", llv.Name))
164164
continue
165165
}
166166

167-
if llvRequestedSize.Value() < lv.LVSize.Value() {
168-
log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s requested size %s is less than actual one on the node %s", llv.Name, llvRequestedSize.String(), lv.LVSize.String()))
167+
if llvRequestedSize.Value() < lv.Data.LVSize.Value() {
168+
log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] the LVMLogicalVolume %s requested size %s is less than actual one on the node %s", llv.Name, llvRequestedSize.String(), lv.Data.LVSize.String()))
169169
continue
170170
}
171171

@@ -207,18 +207,18 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m
207207
)
208208
for currentAttempts < maxAttempts {
209209
lv = sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode)
210-
if utils.AreSizesEqualWithinDelta(lv.LVSize, llvRequestedSize, internal.ResizeDelta) {
211-
log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s of the LVMLogicalVolume %s was successfully updated in the cache", lv.LVName, llv.Name))
210+
if utils.AreSizesEqualWithinDelta(lv.Data.LVSize, llvRequestedSize, internal.ResizeDelta) {
211+
log.Debug(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s of the LVMLogicalVolume %s was successfully updated in the cache", lv.Data.LVName, llv.Name))
212212
break
213213
}
214214

215-
log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s size of the LVMLogicalVolume %s was not yet updated in the cache, retry...", lv.LVName, llv.Name))
215+
log.Warning(fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] LV %s size of the LVMLogicalVolume %s was not yet updated in the cache, retry...", lv.Data.LVName, llv.Name))
216216
currentAttempts++
217217
time.Sleep(1 * time.Second)
218218
}
219219

220220
if currentAttempts == maxAttempts {
221-
err = fmt.Errorf("LV %s is not updated in the cache", lv.LVName)
221+
err = fmt.Errorf("LV %s is not updated in the cache", lv.Data.LVName)
222222
log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to resize the LVMLogicalVolume %s", llv.Name))
223223
shouldRetry = true
224224

@@ -228,7 +228,7 @@ func ReconcileLVMLogicalVolumeExtension(ctx context.Context, cl client.Client, m
228228
continue
229229
}
230230

231-
updated, err := updateLLVPhaseToCreatedIfNeeded(ctx, cl, &llv, lv.LVSize)
231+
updated, err := updateLLVPhaseToCreatedIfNeeded(ctx, cl, &llv, lv.Data.LVSize)
232232
if err != nil {
233233
log.Error(err, fmt.Sprintf("[ReconcileLVMLogicalVolumeExtension] unable to update the LVMLogicalVolume %s", llv.Name))
234234
shouldRetry = true

images/agent/src/pkg/controller/lvm_logical_volume_watcher.go

-4
Original file line numberDiff line numberDiff line change
@@ -145,10 +145,6 @@ func RunLVMLogicalVolumeWatcherController(
145145
return reconcile.Result{RequeueAfter: cfg.LLVRequeueIntervalSec}, nil
146146
}
147147

148-
// создаем LV и пытаемся получить его размер на 113278 строчке I0819 12:43:10.563596
149-
// при этом у нас кэш начинает заполняться в I0819 12:43:09.486602 и заканчивает в I0819 12:43:22.070604, а потом только в I0819 12:43:22.081861
150-
// при этом получаем ретрай в I0819 12:43:15.563851 (и на мо
151-
152148
log.Info(fmt.Sprintf("[RunLVMLogicalVolumeWatcherController] successfully ended reconciliation of the LVMLogicalVolume %s", request.Name))
153149
return reconcile.Result{}, nil
154150
}),

images/agent/src/pkg/controller/lvm_logical_volume_watcher_func.go

+10-13
Original file line numberDiff line numberDiff line change
@@ -152,14 +152,14 @@ func updateLLVPhaseToCreatedIfNeeded(ctx context.Context, cl client.Client, llv
152152

153153
func deleteLVIfNeeded(log logger.Logger, sdsCache *cache.Cache, vgName string, llv *v1alpha1.LVMLogicalVolume) error {
154154
lv := sdsCache.FindLV(vgName, llv.Spec.ActualLVNameOnTheNode)
155-
if lv == nil {
155+
if lv == nil || !lv.Exist {
156156
log.Warning(fmt.Sprintf("[deleteLVIfNeeded] did not find LV %s in VG %s", llv.Spec.ActualLVNameOnTheNode, vgName))
157157
return nil
158158
}
159159

160160
// this case prevents unexpected same-name LV deletions which does not actually belong to our LLV
161-
if !checkIfLVBelongsToLLV(llv, lv) {
162-
log.Warning(fmt.Sprintf("[deleteLVIfNeeded] no need to delete LV %s as it doesnt belong to LVMLogicalVolume %s", lv.LVName, llv.Name))
161+
if !checkIfLVBelongsToLLV(llv, &lv.Data) {
162+
log.Warning(fmt.Sprintf("[deleteLVIfNeeded] no need to delete LV %s as it doesnt belong to LVMLogicalVolume %s", lv.Data.LVName, llv.Name))
163163
return nil
164164
}
165165

@@ -170,8 +170,8 @@ func deleteLVIfNeeded(log logger.Logger, sdsCache *cache.Cache, vgName string, l
170170
return err
171171
}
172172

173-
log.Debug(fmt.Sprintf("[deleteLVIfNeeded] removes LV %s from the cache", lv.LVName))
174-
sdsCache.RemoveLV(lv.VGName, lv.LVName)
173+
log.Debug(fmt.Sprintf("[deleteLVIfNeeded] removes LV %s from the cache", lv.Data.LVName))
174+
sdsCache.RemoveLV(lv.Data.VGName, lv.Data.LVName)
175175

176176
return nil
177177
}
@@ -182,7 +182,7 @@ func getLVActualSize(sdsCache *cache.Cache, vgName, lvName string) resource.Quan
182182
return resource.Quantity{}
183183
}
184184

185-
result := resource.NewQuantity(lv.LVSize.Value(), resource.BinarySI)
185+
result := resource.NewQuantity(lv.Data.LVSize.Value(), resource.BinarySI)
186186

187187
return *result
188188
}
@@ -297,12 +297,9 @@ func validateLVMLogicalVolume(sdsCache *cache.Cache, llv *v1alpha1.LVMLogicalVol
297297

298298
// if a specified Thick LV name matches the existing Thin one
299299
lv := sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, llv.Spec.ActualLVNameOnTheNode)
300-
if lv != nil {
301-
if len(lv.LVAttr) == 0 {
302-
reason.WriteString(fmt.Sprintf("LV %s was found on the node, but can't be validated due to its attributes is empty string. ", lv.LVName))
303-
} else if !checkIfLVBelongsToLLV(llv, lv) {
304-
reason.WriteString(fmt.Sprintf("Specified LV %s is already created and it is doesnt match the one on the node.", lv.LVName))
305-
}
300+
if lv != nil &&
301+
len(lv.Data.LVAttr) != 0 && !checkIfLVBelongsToLLV(llv, &lv.Data) {
302+
reason.WriteString(fmt.Sprintf("Specified LV %s is already created and it is doesnt match the one on the node.", lv.Data.LVName))
306303
}
307304

308305
if reason.Len() > 0 {
@@ -347,7 +344,7 @@ func shouldReconcileByUpdateFunc(sdsCache *cache.Cache, vgName string, llv *v1al
347344
}
348345

349346
lv := sdsCache.FindLV(vgName, llv.Spec.ActualLVNameOnTheNode)
350-
return lv != nil
347+
return lv != nil && lv.Exist
351348
}
352349

353350
func isContiguous(llv *v1alpha1.LVMLogicalVolume) bool {

images/agent/src/pkg/controller/lvm_logical_volume_watcher_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func TestLVMLogicaVolumeWatcher(t *testing.T) {
153153

154154
v, r := validateLVMLogicalVolume(sdsCache, llv, &v1alpha1.LvmVolumeGroup{})
155155
if assert.False(t, v) {
156-
assert.Equal(t, "Zero size for LV. Thin pool specified for Thick LV. LV test-lv was found on the node, but can't be validated due to its attributes is empty string. ", r)
156+
assert.Equal(t, "Zero size for LV. Thin pool specified for Thick LV. ", r)
157157
}
158158
})
159159

images/agent/src/pkg/controller/lvm_volume_group_watcher_func.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -412,12 +412,12 @@ func validateLVGForUpdateFunc(log logger.Logger, sdsCache *cache.Cache, lvg *v1a
412412
for _, tp := range lvg.Spec.ThinPools {
413413
lv := sdsCache.FindLV(lvg.Spec.ActualVGNameOnTheNode, tp.Name)
414414
if lv != nil {
415-
if !isThinPool(*lv) {
416-
reason.WriteString(fmt.Sprintf("LV %s is already created on the node and it is not a thin-pool", lv.LVName))
415+
if !isThinPool(lv.Data) {
416+
reason.WriteString(fmt.Sprintf("LV %s is already created on the node and it is not a thin-pool", lv.Data.LVName))
417417
continue
418418
}
419419

420-
actualThinPools[lv.LVName] = *lv
420+
actualThinPools[lv.Data.LVName] = lv.Data
421421
}
422422
}
423423

images/agent/src/pkg/scanner/scanner.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ import (
55
"context"
66
"errors"
77
"fmt"
8-
"k8s.io/utils/clock"
98
"time"
109

1110
"github.com/pilebones/go-udev/netlink"
11+
"k8s.io/utils/clock"
1212
kubeCtrl "sigs.k8s.io/controller-runtime/pkg/controller"
1313
"sigs.k8s.io/controller-runtime/pkg/reconcile"
1414

@@ -162,13 +162,11 @@ func fillTheCache(ctx context.Context, log logger.Logger, cache *cache.Cache, cf
162162
// the scan operations order is very important as it guarantees the consistent and reliable data from the node
163163
realClock := clock.RealClock{}
164164
now := time.Now()
165-
fmt.Printf("LVS TIME BEFORE: %s", time.Now().String())
166165
lvs, lvsErr, err := scanLVs(ctx, log, cfg)
167166
log.Trace(fmt.Sprintf("[fillTheCache] LVS command runs for: %s", realClock.Since(now).String()))
168167
if err != nil {
169168
return err
170169
}
171-
fmt.Printf("LVS TIME AFTER: %s", time.Now().String())
172170

173171
now = time.Now()
174172
vgs, vgsErr, err := scanVGs(ctx, log, cfg)

0 commit comments

Comments
 (0)