diff --git a/internal/daemon/daemon.go b/internal/daemon/daemon.go index dc538bf..a38a20b 100644 --- a/internal/daemon/daemon.go +++ b/internal/daemon/daemon.go @@ -32,6 +32,7 @@ import ( "github.com/GoogleCloudPlatform/workloadagent/internal/daemon/configuration" "github.com/GoogleCloudPlatform/workloadagent/internal/daemon/mysql" "github.com/GoogleCloudPlatform/workloadagent/internal/daemon/oracle" + "github.com/GoogleCloudPlatform/workloadagent/internal/daemon/redis" "github.com/GoogleCloudPlatform/workloadagent/internal/usagemetrics" cpb "github.com/GoogleCloudPlatform/workloadagent/protos/configuration" @@ -128,8 +129,9 @@ func (d *Daemon) startdaemonHandler(ctx context.Context, cancel context.CancelFu signal.Notify(shutdownch, syscall.SIGINT, syscall.SIGTERM, os.Interrupt) log.Logger.Info("Starting common discovery") - oracleCh := make(chan commondiscovery.Result,1) + oracleCh := make(chan commondiscovery.Result, 1) mySQLCh := make(chan commondiscovery.Result, 1) + redisCh := make(chan commondiscovery.Result, 1) cdChs := []chan commondiscovery.Result{mySQLCh, oracleCh} commondiscovery := commondiscovery.DiscoveryService{ ProcessLister: commondiscovery.DefaultProcessLister{}, @@ -150,6 +152,7 @@ func (d *Daemon) startdaemonHandler(ctx context.Context, cancel context.CancelFu d.services = []Service{ &oracle.Service{Config: d.config, CloudProps: d.cloudProps, CommonCh: oracleCh}, &mysql.Service{Config: d.config, CloudProps: d.cloudProps, CommonCh: mySQLCh}, + &redis.Service{Config: d.config, CloudProps: d.cloudProps, CommonCh: redisCh}, } for _, service := range d.services { log.Logger.Infof("Starting %s", service.String()) diff --git a/internal/daemon/redis/redis.go b/internal/daemon/redis/redis.go new file mode 100644 index 0000000..7c127b9 --- /dev/null +++ b/internal/daemon/redis/redis.go @@ -0,0 +1,204 @@ +/* +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package redis implements the Redis workload agent service. +package redis + +import ( + "context" + "time" + + "go.uber.org/zap/zapcore" + "github.com/GoogleCloudPlatform/sapagent/shared/log" + "github.com/GoogleCloudPlatform/sapagent/shared/recovery" + "github.com/GoogleCloudPlatform/workloadagent/internal/commondiscovery" + "github.com/GoogleCloudPlatform/workloadagent/internal/redisdiscovery" + "github.com/GoogleCloudPlatform/workloadagent/internal/redismetrics" + "github.com/GoogleCloudPlatform/workloadagent/internal/usagemetrics" + configpb "github.com/GoogleCloudPlatform/workloadagent/protos/configuration" +) + +// Service implements the interfaces for Redis workload agent service. +type Service struct { + Config *configpb.Configuration + CloudProps *configpb.CloudProperties + CommonCh chan commondiscovery.Result + processes commondiscovery.Result + redisProcesses []commondiscovery.ProcessWrapper +} + +type runDiscoveryArgs struct { + s *Service +} + +type runMetricCollectionArgs struct { + s *Service +} + +// Start initiates the Redis workload agent service +func (s *Service) Start(ctx context.Context, a any) { + if s.Config.GetRedisConfiguration() == nil || s.Config.GetRedisConfiguration().Enabled == nil { + // If Redis workload agent service is not explicitly enabled in the configuration, + // then check if the workload is present on the host. + log.CtxLogger(ctx).Info("Redis workload agent service is not explicitly enabled in the configuration") + if !s.checkCommonDiscovery(ctx) { + return + } + } else if !s.Config.GetRedisConfiguration().GetEnabled() { + // If Redis workload agent service is explicitly disabled in the configuration, then return. + log.CtxLogger(ctx).Info("Redis workload agent service is disabled in the configuration") + return + } + + // Start Redis Discovery + dCtx := log.SetCtx(ctx, "context", "RedisDiscovery") + discoveryRoutine := &recovery.RecoverableRoutine{ + Routine: runDiscovery, + RoutineArg: runDiscoveryArgs{s}, + ErrorCode: usagemetrics.RedisDiscoveryFailure, + UsageLogger: *usagemetrics.UsageLogger, + ExpectedMinDuration: 0, + } + discoveryRoutine.StartRoutine(dCtx) + + // Start Redis Metric Collection + mcCtx := log.SetCtx(ctx, "context", "RedisMetricCollection") + metricCollectionRoutine := &recovery.RecoverableRoutine{ + Routine: runMetricCollection, + RoutineArg: runMetricCollectionArgs{s}, + ErrorCode: usagemetrics.RedisMetricCollectionFailure, + UsageLogger: *usagemetrics.UsageLogger, + ExpectedMinDuration: 0, + } + metricCollectionRoutine.StartRoutine(mcCtx) + select { + case <-ctx.Done(): + log.CtxLogger(ctx).Info("Redis workload agent service cancellation requested") + return + case s.processes = <-s.CommonCh: + log.CtxLogger(ctx).Debugw("Redis workload agent service received common discovery result", "result", s.processes) + s.identifyRedisProcesses(ctx) + return + } +} + +func runDiscovery(ctx context.Context, a any) { + log.CtxLogger(ctx).Info("Starting Redis Discovery") + var args runDiscoveryArgs + var ok bool + if args, ok = a.(runDiscoveryArgs); !ok { + log.CtxLogger(ctx).Warnw("failed to parse discovery args", "args", a) + return + } + log.CtxLogger(ctx).Debugw("Redis discovery args", "args", args) + ticker := time.NewTicker(10 * time.Minute) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + log.CtxLogger(ctx).Info("Redis discovery cancellation requested") + return + case <-ticker.C: + redisdiscovery.Discover(ctx) + } + } +} + +func runMetricCollection(ctx context.Context, a any) { + log.CtxLogger(ctx).Info("Starting Redis Metric Collection") + var args runMetricCollectionArgs + var ok bool + if args, ok = a.(runMetricCollectionArgs); !ok { + log.CtxLogger(ctx).Warnw("failed to parse metric collection args", "args", a) + return + } + log.CtxLogger(ctx).Debugw("Redis metric collection args", "args", args) + ticker := time.NewTicker(10 * time.Minute) + defer ticker.Stop() + for { + redismetrics.CollectMetricsOnce(ctx) + select { + case <-ctx.Done(): + log.CtxLogger(ctx).Info("Redis metric collection cancellation requested") + return + case <-ticker.C: + continue + } + } +} + +// checkCommonDiscovery checks for common discovery results. +// Returns true if Redis workload is present on the host. +// Otherwise, returns false to indicate that the context is done. +func (s *Service) checkCommonDiscovery(ctx context.Context) bool { + for { + select { + case <-ctx.Done(): + log.CtxLogger(ctx).Info("Redis workload agent service cancellation requested") + return false + case s.processes = <-s.CommonCh: + log.CtxLogger(ctx).Debugw("Redis workload agent service received common discovery result", "NumProcesses", len(s.processes.Processes)) + s.identifyRedisProcesses(ctx) + if s.isWorkloadPresent() { + log.CtxLogger(ctx).Info("Redis workload is present, starting discovery and metric collection") + return true + } + log.CtxLogger(ctx).Debug("Redis workload is not present") + } + } +} + +func (s *Service) identifyRedisProcesses(ctx context.Context) { + for _, process := range s.processes.Processes { + name, err := process.Name() + if err == nil && name == "redis-server" { + s.redisProcesses = append(s.redisProcesses, process) + } + } + s.logRedisProcesses(ctx, zapcore.DebugLevel) +} + +func (s *Service) isWorkloadPresent() bool { + return len(s.redisProcesses) > 0 +} + +func (s *Service) logRedisProcesses(ctx context.Context, loglevel zapcore.Level) { + log.CtxLogger(ctx).Logf(loglevel, "Number of Redis processes found: %v", len(s.redisProcesses)) + for _, process := range s.redisProcesses { + name, _ := process.Name() + username, _ := process.Username() + cmdline, _ := process.CmdlineSlice() + env, _ := process.Environ() + log.CtxLogger(ctx).Logw(loglevel, "Redis process", "name", name, "username", username, "cmdline", cmdline, "env", env, "pid", process.Pid()) + } +} + +// String returns the name of the Redis service. +func (s *Service) String() string { + return "Redis Service" +} + +// ErrorCode returns the error code for the Redis service. +func (s *Service) ErrorCode() int { + return usagemetrics.RedisServiceError +} + +// ExpectedMinDuration returns the expected minimum duration for the Redis service. +// Used by the recovery handler to determine if the service ran long enough to be considered +// successful. +func (s *Service) ExpectedMinDuration() time.Duration { + return 0 +} diff --git a/internal/daemon/redis/redis_test.go b/internal/daemon/redis/redis_test.go new file mode 100644 index 0000000..6dff4bd --- /dev/null +++ b/internal/daemon/redis/redis_test.go @@ -0,0 +1,283 @@ +/* +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package redis tests the Redis workload agent service. +package redis + +import ( + "context" + "testing" + "time" + + "github.com/GoogleCloudPlatform/workloadagent/internal/commondiscovery" + "github.com/GoogleCloudPlatform/workloadagent/internal/usagemetrics" +) + +// Stub is a no-op test double for psutil.Process. +type processStub struct { + username string + pid int32 + name string + args []string + environ []string +} + +// Username returns the username of the process. +func (p processStub) Username() (string, error) { + return p.username, nil +} + +// Pid returns the PID of the process. +func (p processStub) Pid() int32 { + return p.pid +} + +// Name returns the name of the process. +func (p processStub) Name() (string, error) { + return p.name, nil +} + +func (p processStub) CmdlineSlice() ([]string, error) { + return p.args, nil +} + +func (p processStub) Environ() ([]string, error) { + return p.environ, nil +} + +func TestIsWorkloadPresent(t *testing.T) { + tests := []struct { + name string + s *Service + want bool + }{ + { + name: "Present", + s: &Service{redisProcesses: []commondiscovery.ProcessWrapper{ + processStub{ + username: "redis_user", + pid: 1234, + name: "redis-server", + args: []string{"--port 6379", "--bind 0.0.0.0"}, + environ: []string{"REDIS_PORT=6379", "REDIS_BIND=0.0.0.0"}, + }, + }}, + want: true, + }, + { + name: "NotPresent", + s: &Service{redisProcesses: []commondiscovery.ProcessWrapper{}}, + want: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := test.s.isWorkloadPresent() + if got != test.want { + t.Errorf("isWorkloadPresent() = %v, want %v", got, test.want) + } + }) + } +} + +func TestIdentifyRedisProcesses(t *testing.T) { + tests := []struct { + name string + s *Service + want int + }{ + { + name: "MixedProcesses", + s: &Service{ + processes: commondiscovery.Result{ + Processes: []commondiscovery.ProcessWrapper{ + processStub{ + username: "redis_user", + pid: 1234, + name: "redis-server", + args: []string{"--port 6379", "--bind 0.0.0.0"}, + environ: []string{"REDIS_PORT=6379", "REDIS_BIND=0.0.0.0"}, + }, + processStub{ + username: "test_user", + pid: 1234, + name: "test_name", + }, + }}}, + want: 1, + }, + { + name: "OneRedisProcess", + s: &Service{ + processes: commondiscovery.Result{ + Processes: []commondiscovery.ProcessWrapper{ + processStub{ + username: "redis_user", + pid: 1234, + name: "redis-server", + args: []string{"--port 6379", "--bind 0.0.0.0"}, + environ: []string{"REDIS_PORT=6379", "REDIS_BIND=0.0.0.0"}, + }, + }}}, + want: 1, + }, + { + name: "OneNotRedisProcess", + s: &Service{ + processes: commondiscovery.Result{ + Processes: []commondiscovery.ProcessWrapper{ + processStub{ + username: "test_user", + pid: 1234, + name: "test_name", + }, + }}}, + want: 0, + }, + { + name: "ZeroProcesses", + s: &Service{processes: commondiscovery.Result{Processes: []commondiscovery.ProcessWrapper{}}}, + want: 0, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + test.s.identifyRedisProcesses(context.Background()) + got := len(test.s.redisProcesses) + if got != test.want { + t.Errorf("isWorkloadPresent() = %v, want %v", got, test.want) + } + }) + } +} + +func sendCommonDiscoveryResult(t *testing.T, result commondiscovery.Result, ch chan commondiscovery.Result) { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { + ch <- result + select { + case <-ticker.C: + continue + } + } +} + +func TestCheckCommonDiscovery(t *testing.T) { + ch := make(chan commondiscovery.Result) + result := commondiscovery.Result{Processes: []commondiscovery.ProcessWrapper{ + processStub{ + username: "redis_user", + pid: 1234, + name: "redis-server", + args: []string{"--port 6379", "--bind 0.0.0.0"}, + environ: []string{"REDIS_PORT=6379", "REDIS_BIND=0.0.0.0"}, + }, + }} + // Need a message to be sent to the channel after the method starts listening to the channel. + go sendCommonDiscoveryResult(t, result, ch) + + cancelledCtx, cancel := context.WithCancel(context.Background()) + cancel() + tests := []struct { + name string + s *Service + ctx context.Context + want bool + }{ + { + name: "WorkloadPresent", + s: &Service{CommonCh: ch}, + ctx: context.Background(), + want: true, + }, + { + name: "ContextEnded", + s: &Service{}, + ctx: cancelledCtx, + want: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := test.s.checkCommonDiscovery(test.ctx) + if got != test.want { + t.Errorf("checkCommonDiscovery() = %v, want %v", got, test.want) + } + }) + } +} + +// Can only test the case where the context is cancelled. +// In other cases, the test will hang because this method is meant to run perpetually. +func TestStart(t *testing.T) { + cancelledCtx, cancel := context.WithCancel(context.Background()) + cancel() + tests := []struct { + name string + s *Service + ctx context.Context + want int + }{ + { + name: "ContextEnded", + s: &Service{}, + ctx: cancelledCtx, + want: 0, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + test.s.Start(test.ctx, nil) + got := len(test.s.processes.Processes) + if got != test.want { + t.Errorf("Start() = %v, want %v", got, test.want) + } + }) + } +} + +func TestString(t *testing.T) { + s := &Service{} + got := s.String() + // Unlikely to intentionally change. + want := "Redis Service" + if got != want { + t.Errorf("String() = %v, want %v", got, want) + } +} + +func TestErrorCode(t *testing.T) { + s := &Service{} + got := s.ErrorCode() + want := usagemetrics.RedisServiceError + if got != want { + t.Errorf("ErrorCode() = %v, want %v", got, want) + } +} + +func TestExpectedMinDuration(t *testing.T) { + s := &Service{} + got := s.ExpectedMinDuration() + want := 0 * time.Second + if got != want { + t.Errorf("ExpectedMinDuration() = %v, want %v", got, want) + } +} diff --git a/internal/redisdiscovery/redisdiscovery.go b/internal/redisdiscovery/redisdiscovery.go new file mode 100644 index 0000000..dc796a7 --- /dev/null +++ b/internal/redisdiscovery/redisdiscovery.go @@ -0,0 +1,30 @@ +/* +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package redisdiscovery implements discovery for the Redis workload agent service. +package redisdiscovery + +import ( + "context" + + "github.com/GoogleCloudPlatform/sapagent/shared/log" +) + +// Discover runs the Redis discovery routine. +func Discover(ctx context.Context) { + log.CtxLogger(ctx).Info("Redis discovery not yet implemented.") + return +} diff --git a/internal/redismetrics/redismetrics.go b/internal/redismetrics/redismetrics.go new file mode 100644 index 0000000..66113e1 --- /dev/null +++ b/internal/redismetrics/redismetrics.go @@ -0,0 +1,30 @@ +/* +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package redismetrics implements metric collection for the Redis workload agent service. +package redismetrics + +import ( + "context" + + "github.com/GoogleCloudPlatform/sapagent/shared/log" +) + +// CollectMetricsOnce collects metrics for Redis databases running on the host. +func CollectMetricsOnce(ctx context.Context) { + log.CtxLogger(ctx).Info("Redis metric collection not yet implemented.") + return +} diff --git a/internal/usagemetrics/usagemetrics.go b/internal/usagemetrics/usagemetrics.go index 7d83ae5..d839885 100644 --- a/internal/usagemetrics/usagemetrics.go +++ b/internal/usagemetrics/usagemetrics.go @@ -49,6 +49,9 @@ const ( MySQLMetricCollectionFailure = 11 MySQLDiscoveryFailure = 12 CommonDiscoveryFailure = 13 + RedisServiceError = 14 + RedisMetricCollectionFailure = 15 + RedisDiscoveryFailure = 16 ) // Agent wide action mappings. diff --git a/protos/configuration/configuration.proto b/protos/configuration/configuration.proto index 5bd878d..558a586 100644 --- a/protos/configuration/configuration.proto +++ b/protos/configuration/configuration.proto @@ -38,6 +38,7 @@ message Configuration { OracleConfiguration oracle_configuration = 6; MySQLConfiguration mysql_configuration = 7; CommonDiscovery common_discovery = 8; + RedisConfiguration redis_configuration = 9; } message CloudProperties { @@ -88,6 +89,10 @@ message CommonDiscovery { google.protobuf.Duration collection_frequency = 1; } +message RedisConfiguration { + optional bool enabled = 1; +} + message ConnectionParameters { string username = 1; SecretRef secret = 2;