diff --git a/images/agent/cmd/main.go b/images/agent/cmd/main.go index cca258a5..e07e1954 100644 --- a/images/agent/cmd/main.go +++ b/images/agent/cmd/main.go @@ -23,6 +23,7 @@ import ( goruntime "runtime" "sds-node-configurator/api/v1alpha1" "sds-node-configurator/config" + "sds-node-configurator/pkg/cache" "sds-node-configurator/pkg/controller" "sds-node-configurator/pkg/kubutils" "sds-node-configurator/pkg/logger" @@ -109,6 +110,15 @@ func main() { } log.Info("[main] ReTag ends") + sdsCache := cache.New() + + go func() { + if err = controller.RunScanner(*log, *cfgParams, sdsCache); err != nil { + log.Error(err, "[main] unable to run scanner") + os.Exit(1) + } + }() + if _, err = controller.RunBlockDeviceController(ctx, mgr, *cfgParams, *log, metrics); err != nil { log.Error(err, "[main] unable to controller.RunBlockDeviceController") os.Exit(1) diff --git a/images/agent/config/config.go b/images/agent/config/config.go index 9d417df1..84f0d064 100644 --- a/images/agent/config/config.go +++ b/images/agent/config/config.go @@ -23,16 +23,19 @@ import ( "os/exec" "sds-node-configurator/internal" "sds-node-configurator/pkg/logger" + + "strconv" "strings" "time" ) const ( - ScanInterval = "SCAN_INTERVAL" - NodeName = "NODE_NAME" - LogLevel = "LOG_LEVEL" - MetricsPort = "METRICS_PORT" - MachineID = "MACHINE_ID" + ScanInterval = "SCAN_INTERVAL" + NodeName = "NODE_NAME" + LogLevel = "LOG_LEVEL" + MetricsPort = "METRICS_PORT" + MachineID = "MACHINE_ID" + ThrottleInterval = "THROTTLER_INTERVAL" ) type Options struct { @@ -43,6 +46,7 @@ type Options struct { BlockDeviceScanInterval time.Duration VolumeGroupScanInterval time.Duration LLVRequeInterval time.Duration + ThrottleInterval time.Duration } func NewConfig() (*Options, error) { @@ -71,9 +75,32 @@ func NewConfig() (*Options, error) { opts.MetricsPort = ":8080" } - opts.BlockDeviceScanInterval = 5 - opts.VolumeGroupScanInterval = 5 - opts.LLVRequeInterval = 5 + scanInt := os.Getenv(ScanInterval) + if scanInt == "" { + opts.BlockDeviceScanInterval = 5 + opts.VolumeGroupScanInterval = 5 + opts.LLVRequeInterval = 5 + } else { + interval, err := strconv.Atoi(scanInt) + if err != nil { + return nil, fmt.Errorf("[NewConfig] unable to get %s, error: %w", ScanInterval, err) + } + opts.BlockDeviceScanInterval = time.Duration(interval) + opts.VolumeGroupScanInterval = time.Duration(interval) + opts.LLVRequeInterval = time.Duration(interval) + } + + thrInt := os.Getenv(ThrottleInterval) + if thrInt == "" { + opts.ThrottleInterval = 3 + } else { + interval, err := strconv.Atoi(scanInt) + if err != nil { + return nil, fmt.Errorf("[NewConfig] unable to get %s, error: %w", ThrottleInterval, err) + } + + opts.ThrottleInterval = time.Duration(interval) + } return &opts, nil } diff --git a/images/agent/go.mod b/images/agent/go.mod index 810e3b6b..42f4a819 100644 --- a/images/agent/go.mod +++ b/images/agent/go.mod @@ -7,6 +7,7 @@ require ( github.com/google/go-cmp v0.6.0 github.com/onsi/ginkgo/v2 v2.14.0 github.com/onsi/gomega v1.30.0 + github.com/pilebones/go-udev v0.9.0 github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.8.4 k8s.io/api v0.29.4 diff --git a/images/agent/go.sum b/images/agent/go.sum index 03407c29..a8d52641 100644 --- a/images/agent/go.sum +++ b/images/agent/go.sum @@ -84,6 +84,8 @@ github.com/onsi/ginkgo/v2 v2.14.0 h1:vSmGj2Z5YPb9JwCWT6z6ihcUvDhuXLc3sJiqd3jMKAY github.com/onsi/ginkgo/v2 v2.14.0/go.mod h1:JkUdW7JkN0V6rFvsHcJ478egV3XH9NxpD27Hal/PhZw= github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8= github.com/onsi/gomega v1.30.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ= +github.com/pilebones/go-udev v0.9.0 h1:N1uEO/SxUwtIctc0WLU0t69JeBxIYEYnj8lT/Nabl9Q= +github.com/pilebones/go-udev v0.9.0/go.mod h1:T2eI2tUSK0hA2WS5QLjXJUfQkluZQu+18Cqvem3CaXI= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/images/agent/pkg/cache/cache.go b/images/agent/pkg/cache/cache.go new file mode 100644 index 00000000..40225ae7 --- /dev/null +++ b/images/agent/pkg/cache/cache.go @@ -0,0 +1,87 @@ +package cache + +import ( + "fmt" + "sds-node-configurator/internal" + "sds-node-configurator/pkg/logger" +) + +type Cache struct { + devices []internal.Device + pvs []internal.PVData + vgs []internal.VGData + lvs []internal.LVData +} + +func New() Cache { + return Cache{} +} + +func (c *Cache) StoreDevices(devices []internal.Device) { + c.devices = devices +} + +func (c *Cache) GetDevices() []internal.Device { + dst := make([]internal.Device, len(c.devices)) + copy(dst, c.devices) + + return dst +} + +func (c *Cache) StorePVs(pvs []internal.PVData) { + c.pvs = pvs +} + +func (c *Cache) GetPVs() []internal.PVData { + dst := make([]internal.PVData, len(c.pvs)) + copy(dst, c.pvs) + + return dst +} + +func (c *Cache) StoreVGs(vgs []internal.VGData) { + c.vgs = vgs +} + +func (c *Cache) GetVGs() []internal.VGData { + dst := make([]internal.VGData, len(c.vgs)) + copy(dst, c.vgs) + + return dst +} + +func (c *Cache) StoreLVs(lvs []internal.LVData) { + c.lvs = lvs +} + +func (c *Cache) GetLVs() []internal.LVData { + dst := make([]internal.LVData, len(c.lvs)) + copy(dst, c.lvs) + + return dst +} + +func (c *Cache) PrintTheCache(log logger.Logger) { + log.Cache("*****************CACHE BEGIN*****************") + log.Cache("[Devices BEGIN]") + for _, d := range c.devices { + log.Cache(fmt.Sprintf(" Device Name: %s, size: %s, fsType: %s, serial: %s, wwn: %s", d.Name, d.Size.String(), d.FSType, d.Serial, d.Wwn)) + } + log.Cache("[Devices ENDS]") + log.Cache("[PVs BEGIN]") + for _, pv := range c.pvs { + log.Cache(fmt.Sprintf(" PV Name: %s, VG Name: %s, size: %s, vgTags: %s", pv.PVName, pv.VGName, pv.PVSize.String(), pv.VGTags)) + } + log.Cache("[PVs ENDS]") + log.Cache("[VGs BEGIN]") + for _, vg := range c.vgs { + log.Cache(fmt.Sprintf(" VG Name: %s, size: %s, free: %s, vgTags: %s", vg.VGName, vg.VGSize.String(), vg.VGFree.String(), vg.VGTags)) + } + log.Cache("[VGs ENDS]") + log.Cache("[LVs BEGIN]") + for _, lv := range c.lvs { + log.Cache(fmt.Sprintf(" LV Name: %s, VG name: %s, size: %s, tags: %s, attr: %s, pool: %s", lv.LVName, lv.VGName, lv.LVSize.String(), lv.LvTags, lv.LVAttr, lv.PoolLv)) + } + log.Cache("[LVs ENDS]") + log.Cache("*****************CACHE ENDS*****************") +} diff --git a/images/agent/pkg/cache/cache_test.go b/images/agent/pkg/cache/cache_test.go new file mode 100644 index 00000000..bebf8a34 --- /dev/null +++ b/images/agent/pkg/cache/cache_test.go @@ -0,0 +1,68 @@ +package cache + +import ( + "github.com/stretchr/testify/assert" + "sds-node-configurator/internal" + "testing" +) + +func TestCache(t *testing.T) { + sdsCache := New() + devices := []internal.Device{ + { + Name: "test-1", + }, + { + Name: "test-2", + }, + { + Name: "test-3", + }, + } + + pvs := []internal.PVData{ + { + PVName: "pv-1", + }, + { + PVName: "pv-2", + }, + { + PVName: "pv-3", + }, + } + + vgs := []internal.VGData{ + { + VGName: "vg-1", + }, + { + VGName: "vg-2", + }, + { + VGName: "vg-3", + }, + } + + lvs := []internal.LVData{ + { + LVName: "lv-1", + }, + { + LVName: "lv-2", + }, + { + LVName: "lv-3", + }, + } + + sdsCache.StoreDevices(devices) + sdsCache.StorePVs(pvs) + sdsCache.StoreVGs(vgs) + sdsCache.StoreLVs(lvs) + + assert.ElementsMatch(t, devices, sdsCache.GetDevices()) + assert.ElementsMatch(t, pvs, sdsCache.GetPVs()) + assert.ElementsMatch(t, vgs, sdsCache.GetVGs()) + assert.ElementsMatch(t, lvs, sdsCache.GetLVs()) +} diff --git a/images/agent/pkg/controller/lvm_volume_group_discover.go b/images/agent/pkg/controller/lvm_volume_group_discover.go index 6e9f2572..e86f2fad 100644 --- a/images/agent/pkg/controller/lvm_volume_group_discover.go +++ b/images/agent/pkg/controller/lvm_volume_group_discover.go @@ -516,7 +516,7 @@ func sortPVIssuesByVG(log logger.Logger, pvs []internal.PVData) map[string][]str } if stdErr.Len() != 0 { - log.Error(fmt.Errorf(stdErr.String()), fmt.Sprintf(`[sortPVIssuesByVG] pvs command for pv "%s" has stderr: `, pv.PVName)) + log.Error(fmt.Errorf(stdErr.String()), fmt.Sprintf(`[sortPVIssuesByVG] pvs command for pv "%s" has stderr: %s`, pv.PVName, stdErr.String())) pvIssuesByVG[pv.VGName+pv.VGUuid] = append(pvIssuesByVG[pv.VGName+pv.VGUuid], stdErr.String()) stdErr.Reset() } diff --git a/images/agent/pkg/controller/scanner.go b/images/agent/pkg/controller/scanner.go new file mode 100644 index 00000000..afde24dc --- /dev/null +++ b/images/agent/pkg/controller/scanner.go @@ -0,0 +1,158 @@ +package controller + +import ( + "errors" + "fmt" + "github.com/pilebones/go-udev/netlink" + "sds-node-configurator/config" + "sds-node-configurator/internal" + "sds-node-configurator/pkg/cache" + "sds-node-configurator/pkg/logger" + "sds-node-configurator/pkg/throttler" + "sds-node-configurator/pkg/utils" + "time" +) + +func RunScanner(log logger.Logger, cfg config.Options, sdsCache cache.Cache) error { + log.Info("[RunScanner] starts the work") + + t := throttler.New(cfg.ThrottleInterval * time.Second) + + conn := new(netlink.UEventConn) + if err := conn.Connect(netlink.UdevEvent); err != nil { + log.Error(err, "[RunScanner] Failed to connect to Netlink") + return err + } + log.Debug("[RunScanner] system socket connection succeeded") + + errChan := make(chan error) + eventChan := make(chan netlink.UEvent) + matcher := &netlink.RuleDefinitions{ + Rules: []netlink.RuleDefinition{ + { + Env: map[string]string{ + "SUBSYSTEM": "block", + }, + }, + }, + } + quit := conn.Monitor(eventChan, errChan, matcher) + + log.Info("[RunScanner] start to listen to events") + + timer := time.NewTimer(1 * time.Second) + for { + select { + case device, open := <-eventChan: + timer.Reset(1 * time.Second) + log.Debug(fmt.Sprintf("[RunScanner] event triggered for device: %s", device.Env["DEVNAME"])) + log.Trace(fmt.Sprintf("[RunScanner] device from the event: %s", device.String())) + if !open { + err := errors.New("EventChan has been closed when monitor udev event") + log.Error(err, "[RunScanner] unable to read from the event channel") + return err + } + + t.Do(func() { + log.Info("[RunScanner] start to fill the cache") + err := fillTheCache(log, sdsCache) + if err != nil { + log.Error(err, "[RunScanner] unable to fill the cache") + return + } + + log.Info("[RunScanner] successfully filled the cache") + }) + + case err := <-errChan: + log.Error(err, "[RunScanner] Monitor udev event error") + return err + + case <-quit: + err := errors.New("receive quit signal when monitor udev event") + log.Error(err, "[RunScanner] unable to read from the event channel") + return err + + case <-timer.C: + log.Info("[RunScanner] events ran out. Start to fill the cache") + err := fillTheCache(log, sdsCache) + if err != nil { + log.Error(err, "[RunScanner] unable to fill the cache after all events passed") + break + } + log.Info("[RunScanner] successfully filled the cache after all events passed") + } + } +} + +func fillTheCache(log logger.Logger, cache cache.Cache) error { + devices, err := scanDevices(log) + if err != nil { + return err + } + + pvs, err := scanPVs(log) + if err != nil { + return err + } + + vgs, err := scanVGs(log) + if err != nil { + return err + } + + lvs, err := scanLVs(log) + if err != nil { + return err + } + + log.Debug("[fillTheCache] successfully scanned entities. Starts to fill the cache") + cache.StoreDevices(devices) + cache.StorePVs(pvs) + cache.StoreVGs(vgs) + cache.StoreLVs(lvs) + log.Debug("[fillTheCache] successfully filled the cache") + cache.PrintTheCache(log) + + return nil +} + +func scanDevices(log logger.Logger) ([]internal.Device, error) { + devices, cmdStr, err := utils.GetBlockDevices() + if err != nil { + log.Error(err, fmt.Sprintf("[ScanDevices] unable to scan the devices, cmd: %s", cmdStr)) + return nil, err + } + + return devices, nil +} + +func scanPVs(log logger.Logger) ([]internal.PVData, error) { + pvs, cmdStr, _, err := utils.GetAllPVs() + if err != nil { + log.Error(err, fmt.Sprintf("[ScanPVs] unable to scan the PVs, cmd: %s", cmdStr)) + return nil, err + } + + return pvs, nil +} + +func scanVGs(log logger.Logger) ([]internal.VGData, error) { + vgs, cmdStr, _, err := utils.GetAllVGs() + if err != nil { + log.Error(err, fmt.Sprintf("[ScanVGs] unable to scan the VGs, cmd: %s", cmdStr)) + return nil, err + } + + return vgs, nil +} + +func scanLVs(log logger.Logger) ([]internal.LVData, error) { + lvs, cmdStr, _, err := utils.GetAllLVs() + if err != nil { + log.Error(err, fmt.Sprintf("[ScanLVs] unable to scan LVs, cmd: %s", cmdStr)) + return nil, err + } + + return lvs, nil +} diff --git a/images/agent/pkg/logger/logger.go b/images/agent/pkg/logger/logger.go index 8599e572..34b94245 100644 --- a/images/agent/pkg/logger/logger.go +++ b/images/agent/pkg/logger/logger.go @@ -30,6 +30,7 @@ const ( InfoLevel Verbosity = "2" DebugLevel Verbosity = "3" TraceLevel Verbosity = "4" + CacheLevel Verbosity = "5" ) const ( @@ -37,6 +38,7 @@ const ( infoLvl debugLvl traceLvl + cacheLvl ) type ( @@ -82,3 +84,7 @@ func (l Logger) Debug(message string, keysAndValues ...interface{}) { func (l Logger) Trace(message string, keysAndValues ...interface{}) { l.log.V(traceLvl).Info(fmt.Sprintf("TRACE %s", message), keysAndValues...) } + +func (l Logger) Cache(message string, keysAndValues ...interface{}) { + l.log.V(cacheLvl).Info(fmt.Sprintf("CACHE %s", message), keysAndValues...) +} diff --git a/images/agent/pkg/throttler/throttler.go b/images/agent/pkg/throttler/throttler.go new file mode 100644 index 00000000..d661ae30 --- /dev/null +++ b/images/agent/pkg/throttler/throttler.go @@ -0,0 +1,36 @@ +package throttler + +import ( + "sync" + "time" +) + +type Throttler interface { + Do(f func()) +} + +type throttle struct { + duration time.Duration + once sync.Once + m sync.Mutex +} + +func (t *throttle) Do(f func()) { + t.m.Lock() + defer t.m.Unlock() + t.once.Do(func() { + go func() { + time.Sleep(t.duration) + t.m.Lock() + defer t.m.Unlock() + t.once = sync.Once{} + }() + f() + }) +} + +func New(duration time.Duration) Throttler { + return &throttle{ + duration: duration, + } +} diff --git a/images/agent/pkg/utils/commands.go b/images/agent/pkg/utils/commands.go index 45984549..b2f55dd8 100644 --- a/images/agent/pkg/utils/commands.go +++ b/images/agent/pkg/utils/commands.go @@ -517,17 +517,20 @@ func filterStdErr(command string, stdErr bytes.Buffer) bytes.Buffer { var filteredStdErr bytes.Buffer stdErrScanner := bufio.NewScanner(&stdErr) regexpPattern := `Regex version mismatch, expected: .+ actual: .+` - regex, err := regexp.Compile(regexpPattern) + regexpSocketError := `File descriptor .+ leaked on lvm.static invocation. Parent PID .+: /opt/deckhouse/sds/bin/nsenter` + regex1, err := regexp.Compile(regexpPattern) + regex2, err := regexp.Compile(regexpSocketError) if err != nil { return stdErr } for stdErrScanner.Scan() { line := stdErrScanner.Text() - if !regex.MatchString(line) { - filteredStdErr.WriteString(line + "\n") - } else { + if regex1.MatchString(line) || + regex2.MatchString(line) { golog.Printf("WARNING: [filterStdErr] Line filtered from stderr due to matching exclusion pattern. Line: '%s'. Triggered by command: '%s'.", line, command) + } else { + filteredStdErr.WriteString(line + "\n") } } diff --git a/templates/agent/daemonset.yaml b/templates/agent/daemonset.yaml index b98b6373..8c654496 100644 --- a/templates/agent/daemonset.yaml +++ b/templates/agent/daemonset.yaml @@ -51,6 +51,7 @@ spec: - name: {{ .Chart.Name }}-module-registry serviceAccountName: sds-node-configurator hostPID: true + hostNetwork: true initContainers: - name: sds-utils-installer image: {{ include "helm_lib_module_image" (list . "sdsUtilsInstaller") }}