Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/GoogleCloudPlatform/workloadagent/internal/daemon/configuration"
"github.com/GoogleCloudPlatform/workloadagent/internal/daemon/mysql"
"github.com/GoogleCloudPlatform/workloadagent/internal/daemon/oracle"
"github.com/GoogleCloudPlatform/workloadagent/internal/daemon/redis"
"github.com/GoogleCloudPlatform/workloadagent/internal/usagemetrics"

cpb "github.com/GoogleCloudPlatform/workloadagent/protos/configuration"
Expand Down Expand Up @@ -128,8 +129,9 @@ func (d *Daemon) startdaemonHandler(ctx context.Context, cancel context.CancelFu
signal.Notify(shutdownch, syscall.SIGINT, syscall.SIGTERM, os.Interrupt)

log.Logger.Info("Starting common discovery")
oracleCh := make(chan commondiscovery.Result,1)
oracleCh := make(chan commondiscovery.Result, 1)
mySQLCh := make(chan commondiscovery.Result, 1)
redisCh := make(chan commondiscovery.Result, 1)
cdChs := []chan commondiscovery.Result{mySQLCh, oracleCh}
commondiscovery := commondiscovery.DiscoveryService{
ProcessLister: commondiscovery.DefaultProcessLister{},
Expand All @@ -150,6 +152,7 @@ func (d *Daemon) startdaemonHandler(ctx context.Context, cancel context.CancelFu
d.services = []Service{
&oracle.Service{Config: d.config, CloudProps: d.cloudProps, CommonCh: oracleCh},
&mysql.Service{Config: d.config, CloudProps: d.cloudProps, CommonCh: mySQLCh},
&redis.Service{Config: d.config, CloudProps: d.cloudProps, CommonCh: redisCh},
}
for _, service := range d.services {
log.Logger.Infof("Starting %s", service.String())
Expand Down
204 changes: 204 additions & 0 deletions internal/daemon/redis/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
Copyright 2024 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package redis implements the Redis workload agent service.
package redis

import (
"context"
"time"

"go.uber.org/zap/zapcore"
"github.com/GoogleCloudPlatform/sapagent/shared/log"
"github.com/GoogleCloudPlatform/sapagent/shared/recovery"
"github.com/GoogleCloudPlatform/workloadagent/internal/commondiscovery"
"github.com/GoogleCloudPlatform/workloadagent/internal/redisdiscovery"
"github.com/GoogleCloudPlatform/workloadagent/internal/redismetrics"
"github.com/GoogleCloudPlatform/workloadagent/internal/usagemetrics"
configpb "github.com/GoogleCloudPlatform/workloadagent/protos/configuration"
)

// Service implements the interfaces for Redis workload agent service.
type Service struct {
Config *configpb.Configuration
CloudProps *configpb.CloudProperties
CommonCh chan commondiscovery.Result
processes commondiscovery.Result
redisProcesses []commondiscovery.ProcessWrapper
}

type runDiscoveryArgs struct {
s *Service
}

type runMetricCollectionArgs struct {
s *Service
}

// Start initiates the Redis workload agent service
func (s *Service) Start(ctx context.Context, a any) {
if s.Config.GetRedisConfiguration() == nil || s.Config.GetRedisConfiguration().Enabled == nil {
// If Redis workload agent service is not explicitly enabled in the configuration,
// then check if the workload is present on the host.
log.CtxLogger(ctx).Info("Redis workload agent service is not explicitly enabled in the configuration")
if !s.checkCommonDiscovery(ctx) {
return
}
} else if !s.Config.GetRedisConfiguration().GetEnabled() {
// If Redis workload agent service is explicitly disabled in the configuration, then return.
log.CtxLogger(ctx).Info("Redis workload agent service is disabled in the configuration")
return
}

// Start Redis Discovery
dCtx := log.SetCtx(ctx, "context", "RedisDiscovery")
discoveryRoutine := &recovery.RecoverableRoutine{
Routine: runDiscovery,
RoutineArg: runDiscoveryArgs{s},
ErrorCode: usagemetrics.RedisDiscoveryFailure,
UsageLogger: *usagemetrics.UsageLogger,
ExpectedMinDuration: 0,
}
discoveryRoutine.StartRoutine(dCtx)

// Start Redis Metric Collection
mcCtx := log.SetCtx(ctx, "context", "RedisMetricCollection")
metricCollectionRoutine := &recovery.RecoverableRoutine{
Routine: runMetricCollection,
RoutineArg: runMetricCollectionArgs{s},
ErrorCode: usagemetrics.RedisMetricCollectionFailure,
UsageLogger: *usagemetrics.UsageLogger,
ExpectedMinDuration: 0,
}
metricCollectionRoutine.StartRoutine(mcCtx)
select {
case <-ctx.Done():
log.CtxLogger(ctx).Info("Redis workload agent service cancellation requested")
return
case s.processes = <-s.CommonCh:
log.CtxLogger(ctx).Debugw("Redis workload agent service received common discovery result", "result", s.processes)
s.identifyRedisProcesses(ctx)
return
}
}

func runDiscovery(ctx context.Context, a any) {
log.CtxLogger(ctx).Info("Starting Redis Discovery")
var args runDiscoveryArgs
var ok bool
if args, ok = a.(runDiscoveryArgs); !ok {
log.CtxLogger(ctx).Warnw("failed to parse discovery args", "args", a)
return
}
log.CtxLogger(ctx).Debugw("Redis discovery args", "args", args)
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.CtxLogger(ctx).Info("Redis discovery cancellation requested")
return
case <-ticker.C:
redisdiscovery.Discover(ctx)
}
}
}

func runMetricCollection(ctx context.Context, a any) {
log.CtxLogger(ctx).Info("Starting Redis Metric Collection")
var args runMetricCollectionArgs
var ok bool
if args, ok = a.(runMetricCollectionArgs); !ok {
log.CtxLogger(ctx).Warnw("failed to parse metric collection args", "args", a)
return
}
log.CtxLogger(ctx).Debugw("Redis metric collection args", "args", args)
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()
for {
redismetrics.CollectMetricsOnce(ctx)
select {
case <-ctx.Done():
log.CtxLogger(ctx).Info("Redis metric collection cancellation requested")
return
case <-ticker.C:
continue
}
}
}

// checkCommonDiscovery checks for common discovery results.
// Returns true if Redis workload is present on the host.
// Otherwise, returns false to indicate that the context is done.
func (s *Service) checkCommonDiscovery(ctx context.Context) bool {
for {
select {
case <-ctx.Done():
log.CtxLogger(ctx).Info("Redis workload agent service cancellation requested")
return false
case s.processes = <-s.CommonCh:
log.CtxLogger(ctx).Debugw("Redis workload agent service received common discovery result", "NumProcesses", len(s.processes.Processes))
s.identifyRedisProcesses(ctx)
if s.isWorkloadPresent() {
log.CtxLogger(ctx).Info("Redis workload is present, starting discovery and metric collection")
return true
}
log.CtxLogger(ctx).Debug("Redis workload is not present")
}
}
}

func (s *Service) identifyRedisProcesses(ctx context.Context) {
for _, process := range s.processes.Processes {
name, err := process.Name()
if err == nil && name == "redis-server" {
s.redisProcesses = append(s.redisProcesses, process)
}
}
s.logRedisProcesses(ctx, zapcore.DebugLevel)
}

func (s *Service) isWorkloadPresent() bool {
return len(s.redisProcesses) > 0
}

func (s *Service) logRedisProcesses(ctx context.Context, loglevel zapcore.Level) {
log.CtxLogger(ctx).Logf(loglevel, "Number of Redis processes found: %v", len(s.redisProcesses))
for _, process := range s.redisProcesses {
name, _ := process.Name()
username, _ := process.Username()
cmdline, _ := process.CmdlineSlice()
env, _ := process.Environ()
log.CtxLogger(ctx).Logw(loglevel, "Redis process", "name", name, "username", username, "cmdline", cmdline, "env", env, "pid", process.Pid())
}
}

// String returns the name of the Redis service.
func (s *Service) String() string {
return "Redis Service"
}

// ErrorCode returns the error code for the Redis service.
func (s *Service) ErrorCode() int {
return usagemetrics.RedisServiceError
}

// ExpectedMinDuration returns the expected minimum duration for the Redis service.
// Used by the recovery handler to determine if the service ran long enough to be considered
// successful.
func (s *Service) ExpectedMinDuration() time.Duration {
return 0
}
Loading