Skip to content

Commit

Permalink
[controller] Add a sds scaner and a cache (#47)
Browse files Browse the repository at this point in the history
Signed-off-by: Viktor Kramarenko <viktor.kramarenko@flant.com>
ViktorKram authored May 21, 2024
1 parent fe2c39b commit 9cb4133
Showing 12 changed files with 412 additions and 13 deletions.
10 changes: 10 additions & 0 deletions images/agent/cmd/main.go
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ import (
goruntime "runtime"
"sds-node-configurator/api/v1alpha1"
"sds-node-configurator/config"
"sds-node-configurator/pkg/cache"
"sds-node-configurator/pkg/controller"
"sds-node-configurator/pkg/kubutils"
"sds-node-configurator/pkg/logger"
@@ -109,6 +110,15 @@ func main() {
}
log.Info("[main] ReTag ends")

sdsCache := cache.New()

go func() {
if err = controller.RunScanner(*log, *cfgParams, sdsCache); err != nil {
log.Error(err, "[main] unable to run scanner")
os.Exit(1)
}
}()

if _, err = controller.RunBlockDeviceController(ctx, mgr, *cfgParams, *log, metrics); err != nil {
log.Error(err, "[main] unable to controller.RunBlockDeviceController")
os.Exit(1)
43 changes: 35 additions & 8 deletions images/agent/config/config.go
Original file line number Diff line number Diff line change
@@ -23,16 +23,19 @@ import (
"os/exec"
"sds-node-configurator/internal"
"sds-node-configurator/pkg/logger"

"strconv"
"strings"
"time"
)

const (
ScanInterval = "SCAN_INTERVAL"
NodeName = "NODE_NAME"
LogLevel = "LOG_LEVEL"
MetricsPort = "METRICS_PORT"
MachineID = "MACHINE_ID"
ScanInterval = "SCAN_INTERVAL"
NodeName = "NODE_NAME"
LogLevel = "LOG_LEVEL"
MetricsPort = "METRICS_PORT"
MachineID = "MACHINE_ID"
ThrottleInterval = "THROTTLER_INTERVAL"
)

type Options struct {
@@ -43,6 +46,7 @@ type Options struct {
BlockDeviceScanInterval time.Duration
VolumeGroupScanInterval time.Duration
LLVRequeInterval time.Duration
ThrottleInterval time.Duration
}

func NewConfig() (*Options, error) {
@@ -71,9 +75,32 @@ func NewConfig() (*Options, error) {
opts.MetricsPort = ":8080"
}

opts.BlockDeviceScanInterval = 5
opts.VolumeGroupScanInterval = 5
opts.LLVRequeInterval = 5
scanInt := os.Getenv(ScanInterval)
if scanInt == "" {
opts.BlockDeviceScanInterval = 5
opts.VolumeGroupScanInterval = 5
opts.LLVRequeInterval = 5
} else {
interval, err := strconv.Atoi(scanInt)
if err != nil {
return nil, fmt.Errorf("[NewConfig] unable to get %s, error: %w", ScanInterval, err)
}
opts.BlockDeviceScanInterval = time.Duration(interval)
opts.VolumeGroupScanInterval = time.Duration(interval)
opts.LLVRequeInterval = time.Duration(interval)
}

thrInt := os.Getenv(ThrottleInterval)
if thrInt == "" {
opts.ThrottleInterval = 3
} else {
interval, err := strconv.Atoi(scanInt)
if err != nil {
return nil, fmt.Errorf("[NewConfig] unable to get %s, error: %w", ThrottleInterval, err)
}

opts.ThrottleInterval = time.Duration(interval)
}

return &opts, nil
}
1 change: 1 addition & 0 deletions images/agent/go.mod
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ require (
github.com/google/go-cmp v0.6.0
github.com/onsi/ginkgo/v2 v2.14.0
github.com/onsi/gomega v1.30.0
github.com/pilebones/go-udev v0.9.0
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.8.4
k8s.io/api v0.29.4
2 changes: 2 additions & 0 deletions images/agent/go.sum
Original file line number Diff line number Diff line change
@@ -84,6 +84,8 @@ github.com/onsi/ginkgo/v2 v2.14.0 h1:vSmGj2Z5YPb9JwCWT6z6ihcUvDhuXLc3sJiqd3jMKAY
github.com/onsi/ginkgo/v2 v2.14.0/go.mod h1:JkUdW7JkN0V6rFvsHcJ478egV3XH9NxpD27Hal/PhZw=
github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8=
github.com/onsi/gomega v1.30.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ=
github.com/pilebones/go-udev v0.9.0 h1:N1uEO/SxUwtIctc0WLU0t69JeBxIYEYnj8lT/Nabl9Q=
github.com/pilebones/go-udev v0.9.0/go.mod h1:T2eI2tUSK0hA2WS5QLjXJUfQkluZQu+18Cqvem3CaXI=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
87 changes: 87 additions & 0 deletions images/agent/pkg/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package cache

import (
"fmt"
"sds-node-configurator/internal"
"sds-node-configurator/pkg/logger"
)

type Cache struct {
devices []internal.Device
pvs []internal.PVData
vgs []internal.VGData
lvs []internal.LVData
}

func New() Cache {
return Cache{}
}

func (c *Cache) StoreDevices(devices []internal.Device) {
c.devices = devices
}

func (c *Cache) GetDevices() []internal.Device {
dst := make([]internal.Device, len(c.devices))
copy(dst, c.devices)

return dst
}

func (c *Cache) StorePVs(pvs []internal.PVData) {
c.pvs = pvs
}

func (c *Cache) GetPVs() []internal.PVData {
dst := make([]internal.PVData, len(c.pvs))
copy(dst, c.pvs)

return dst
}

func (c *Cache) StoreVGs(vgs []internal.VGData) {
c.vgs = vgs
}

func (c *Cache) GetVGs() []internal.VGData {
dst := make([]internal.VGData, len(c.vgs))
copy(dst, c.vgs)

return dst
}

func (c *Cache) StoreLVs(lvs []internal.LVData) {
c.lvs = lvs
}

func (c *Cache) GetLVs() []internal.LVData {
dst := make([]internal.LVData, len(c.lvs))
copy(dst, c.lvs)

return dst
}

func (c *Cache) PrintTheCache(log logger.Logger) {
log.Cache("*****************CACHE BEGIN*****************")
log.Cache("[Devices BEGIN]")
for _, d := range c.devices {
log.Cache(fmt.Sprintf(" Device Name: %s, size: %s, fsType: %s, serial: %s, wwn: %s", d.Name, d.Size.String(), d.FSType, d.Serial, d.Wwn))
}
log.Cache("[Devices ENDS]")
log.Cache("[PVs BEGIN]")
for _, pv := range c.pvs {
log.Cache(fmt.Sprintf(" PV Name: %s, VG Name: %s, size: %s, vgTags: %s", pv.PVName, pv.VGName, pv.PVSize.String(), pv.VGTags))
}
log.Cache("[PVs ENDS]")
log.Cache("[VGs BEGIN]")
for _, vg := range c.vgs {
log.Cache(fmt.Sprintf(" VG Name: %s, size: %s, free: %s, vgTags: %s", vg.VGName, vg.VGSize.String(), vg.VGFree.String(), vg.VGTags))
}
log.Cache("[VGs ENDS]")
log.Cache("[LVs BEGIN]")
for _, lv := range c.lvs {
log.Cache(fmt.Sprintf(" LV Name: %s, VG name: %s, size: %s, tags: %s, attr: %s, pool: %s", lv.LVName, lv.VGName, lv.LVSize.String(), lv.LvTags, lv.LVAttr, lv.PoolLv))
}
log.Cache("[LVs ENDS]")
log.Cache("*****************CACHE ENDS*****************")
}
68 changes: 68 additions & 0 deletions images/agent/pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package cache

import (
"github.com/stretchr/testify/assert"
"sds-node-configurator/internal"
"testing"
)

func TestCache(t *testing.T) {
sdsCache := New()
devices := []internal.Device{
{
Name: "test-1",
},
{
Name: "test-2",
},
{
Name: "test-3",
},
}

pvs := []internal.PVData{
{
PVName: "pv-1",
},
{
PVName: "pv-2",
},
{
PVName: "pv-3",
},
}

vgs := []internal.VGData{
{
VGName: "vg-1",
},
{
VGName: "vg-2",
},
{
VGName: "vg-3",
},
}

lvs := []internal.LVData{
{
LVName: "lv-1",
},
{
LVName: "lv-2",
},
{
LVName: "lv-3",
},
}

sdsCache.StoreDevices(devices)
sdsCache.StorePVs(pvs)
sdsCache.StoreVGs(vgs)
sdsCache.StoreLVs(lvs)

assert.ElementsMatch(t, devices, sdsCache.GetDevices())
assert.ElementsMatch(t, pvs, sdsCache.GetPVs())
assert.ElementsMatch(t, vgs, sdsCache.GetVGs())
assert.ElementsMatch(t, lvs, sdsCache.GetLVs())
}
2 changes: 1 addition & 1 deletion images/agent/pkg/controller/lvm_volume_group_discover.go
Original file line number Diff line number Diff line change
@@ -516,7 +516,7 @@ func sortPVIssuesByVG(log logger.Logger, pvs []internal.PVData) map[string][]str
}

if stdErr.Len() != 0 {
log.Error(fmt.Errorf(stdErr.String()), fmt.Sprintf(`[sortPVIssuesByVG] pvs command for pv "%s" has stderr: `, pv.PVName))
log.Error(fmt.Errorf(stdErr.String()), fmt.Sprintf(`[sortPVIssuesByVG] pvs command for pv "%s" has stderr: %s`, pv.PVName, stdErr.String()))
pvIssuesByVG[pv.VGName+pv.VGUuid] = append(pvIssuesByVG[pv.VGName+pv.VGUuid], stdErr.String())
stdErr.Reset()
}
158 changes: 158 additions & 0 deletions images/agent/pkg/controller/scanner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package controller

import (
"errors"
"fmt"
"github.com/pilebones/go-udev/netlink"
"sds-node-configurator/config"
"sds-node-configurator/internal"
"sds-node-configurator/pkg/cache"
"sds-node-configurator/pkg/logger"
"sds-node-configurator/pkg/throttler"
"sds-node-configurator/pkg/utils"
"time"
)

func RunScanner(log logger.Logger, cfg config.Options, sdsCache cache.Cache) error {
log.Info("[RunScanner] starts the work")

t := throttler.New(cfg.ThrottleInterval * time.Second)

conn := new(netlink.UEventConn)
if err := conn.Connect(netlink.UdevEvent); err != nil {
log.Error(err, "[RunScanner] Failed to connect to Netlink")
return err
}
log.Debug("[RunScanner] system socket connection succeeded")

errChan := make(chan error)
eventChan := make(chan netlink.UEvent)
matcher := &netlink.RuleDefinitions{
Rules: []netlink.RuleDefinition{
{
Env: map[string]string{
"SUBSYSTEM": "block",
},
},
},
}
quit := conn.Monitor(eventChan, errChan, matcher)

log.Info("[RunScanner] start to listen to events")

timer := time.NewTimer(1 * time.Second)
for {
select {
case device, open := <-eventChan:
timer.Reset(1 * time.Second)
log.Debug(fmt.Sprintf("[RunScanner] event triggered for device: %s", device.Env["DEVNAME"]))
log.Trace(fmt.Sprintf("[RunScanner] device from the event: %s", device.String()))
if !open {
err := errors.New("EventChan has been closed when monitor udev event")
log.Error(err, "[RunScanner] unable to read from the event channel")
return err
}

t.Do(func() {
log.Info("[RunScanner] start to fill the cache")
err := fillTheCache(log, sdsCache)
if err != nil {
log.Error(err, "[RunScanner] unable to fill the cache")
return
}

log.Info("[RunScanner] successfully filled the cache")
})

case err := <-errChan:
log.Error(err, "[RunScanner] Monitor udev event error")
return err

case <-quit:
err := errors.New("receive quit signal when monitor udev event")
log.Error(err, "[RunScanner] unable to read from the event channel")
return err

case <-timer.C:
log.Info("[RunScanner] events ran out. Start to fill the cache")
err := fillTheCache(log, sdsCache)
if err != nil {
log.Error(err, "[RunScanner] unable to fill the cache after all events passed")
break
}
log.Info("[RunScanner] successfully filled the cache after all events passed")
}
}
}

func fillTheCache(log logger.Logger, cache cache.Cache) error {
devices, err := scanDevices(log)
if err != nil {
return err
}

pvs, err := scanPVs(log)
if err != nil {
return err
}

vgs, err := scanVGs(log)
if err != nil {
return err
}

lvs, err := scanLVs(log)
if err != nil {
return err
}

log.Debug("[fillTheCache] successfully scanned entities. Starts to fill the cache")
cache.StoreDevices(devices)
cache.StorePVs(pvs)
cache.StoreVGs(vgs)
cache.StoreLVs(lvs)
log.Debug("[fillTheCache] successfully filled the cache")
cache.PrintTheCache(log)

return nil
}

func scanDevices(log logger.Logger) ([]internal.Device, error) {
devices, cmdStr, err := utils.GetBlockDevices()
if err != nil {
log.Error(err, fmt.Sprintf("[ScanDevices] unable to scan the devices, cmd: %s", cmdStr))
return nil, err
}

return devices, nil
}

func scanPVs(log logger.Logger) ([]internal.PVData, error) {
pvs, cmdStr, _, err := utils.GetAllPVs()
if err != nil {
log.Error(err, fmt.Sprintf("[ScanPVs] unable to scan the PVs, cmd: %s", cmdStr))
return nil, err
}

return pvs, nil
}

func scanVGs(log logger.Logger) ([]internal.VGData, error) {
vgs, cmdStr, _, err := utils.GetAllVGs()
if err != nil {
log.Error(err, fmt.Sprintf("[ScanVGs] unable to scan the VGs, cmd: %s", cmdStr))
return nil, err
}

return vgs, nil
}

func scanLVs(log logger.Logger) ([]internal.LVData, error) {
lvs, cmdStr, _, err := utils.GetAllLVs()
if err != nil {
log.Error(err, fmt.Sprintf("[ScanLVs] unable to scan LVs, cmd: %s", cmdStr))
return nil, err
}

return lvs, nil
}
6 changes: 6 additions & 0 deletions images/agent/pkg/logger/logger.go
Original file line number Diff line number Diff line change
@@ -30,13 +30,15 @@ const (
InfoLevel Verbosity = "2"
DebugLevel Verbosity = "3"
TraceLevel Verbosity = "4"
CacheLevel Verbosity = "5"
)

const (
warnLvl = iota + 1
infoLvl
debugLvl
traceLvl
cacheLvl
)

type (
@@ -82,3 +84,7 @@ func (l Logger) Debug(message string, keysAndValues ...interface{}) {
func (l Logger) Trace(message string, keysAndValues ...interface{}) {
l.log.V(traceLvl).Info(fmt.Sprintf("TRACE %s", message), keysAndValues...)
}

func (l Logger) Cache(message string, keysAndValues ...interface{}) {
l.log.V(cacheLvl).Info(fmt.Sprintf("CACHE %s", message), keysAndValues...)
}
36 changes: 36 additions & 0 deletions images/agent/pkg/throttler/throttler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package throttler

import (
"sync"
"time"
)

type Throttler interface {
Do(f func())
}

type throttle struct {
duration time.Duration
once sync.Once
m sync.Mutex
}

func (t *throttle) Do(f func()) {
t.m.Lock()
defer t.m.Unlock()
t.once.Do(func() {
go func() {
time.Sleep(t.duration)
t.m.Lock()
defer t.m.Unlock()
t.once = sync.Once{}
}()
f()
})
}

func New(duration time.Duration) Throttler {
return &throttle{
duration: duration,
}
}
11 changes: 7 additions & 4 deletions images/agent/pkg/utils/commands.go
Original file line number Diff line number Diff line change
@@ -517,17 +517,20 @@ func filterStdErr(command string, stdErr bytes.Buffer) bytes.Buffer {
var filteredStdErr bytes.Buffer
stdErrScanner := bufio.NewScanner(&stdErr)
regexpPattern := `Regex version mismatch, expected: .+ actual: .+`
regex, err := regexp.Compile(regexpPattern)
regexpSocketError := `File descriptor .+ leaked on lvm.static invocation. Parent PID .+: /opt/deckhouse/sds/bin/nsenter`
regex1, err := regexp.Compile(regexpPattern)
regex2, err := regexp.Compile(regexpSocketError)
if err != nil {
return stdErr
}

for stdErrScanner.Scan() {
line := stdErrScanner.Text()
if !regex.MatchString(line) {
filteredStdErr.WriteString(line + "\n")
} else {
if regex1.MatchString(line) ||
regex2.MatchString(line) {
golog.Printf("WARNING: [filterStdErr] Line filtered from stderr due to matching exclusion pattern. Line: '%s'. Triggered by command: '%s'.", line, command)
} else {
filteredStdErr.WriteString(line + "\n")
}
}

1 change: 1 addition & 0 deletions templates/agent/daemonset.yaml
Original file line number Diff line number Diff line change
@@ -51,6 +51,7 @@ spec:
- name: {{ .Chart.Name }}-module-registry
serviceAccountName: sds-node-configurator
hostPID: true
hostNetwork: true
initContainers:
- name: sds-utils-installer
image: {{ include "helm_lib_module_image" (list . "sdsUtilsInstaller") }}

0 comments on commit 9cb4133

Please sign in to comment.