Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion build/configuration.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
{
"log_level": "INFO"
"log_level": "INFO",
"common_discovery": {
"collection_frequency": {
"seconds": 10800
}
}
}
28 changes: 23 additions & 5 deletions internal/commondiscovery/commondiscovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/GoogleCloudPlatform/sapagent/shared/commandlineexecutor"
"github.com/GoogleCloudPlatform/sapagent/shared/log"
"github.com/GoogleCloudPlatform/workloadagent/internal/usagemetrics"

cpb "github.com/GoogleCloudPlatform/workloadagent/protos/configuration"
)

const (
Expand Down Expand Up @@ -69,6 +71,7 @@ type DiscoveryService struct {
ProcessLister processLister
ReadFile readFile
Hostname hostname
Config *cpb.Configuration
}

// ProcessWrapper is a wrapper around process.Process to support testing.
Expand Down Expand Up @@ -131,22 +134,37 @@ func (d DiscoveryService) commonDiscoveryLoop(ctx context.Context) (Result, erro

// CommonDiscovery returns a CommonDiscoveryResult and any errors encountered during the discovery process.
func (d DiscoveryService) CommonDiscovery(ctx context.Context, a any) {
var ch chan Result
log.CtxLogger(ctx).Info("CommonDiscovery started")
var chs []chan Result
var ok bool
if ch, ok = a.(chan Result); !ok {
if chs, ok = a.([]chan Result); !ok {
log.CtxLogger(ctx).Warn("args is not of type chan Result")
return
}

ticker := time.NewTicker(3 * time.Hour)
frequency := 3 * time.Hour
if d.Config.GetCommonDiscovery().GetCollectionFrequency() != nil {
frequency = d.Config.GetCommonDiscovery().GetCollectionFrequency().AsDuration()
}
ticker := time.NewTicker(frequency)
defer ticker.Stop()
for {
discoveryResult, err := d.commonDiscoveryLoop(ctx)
if err != nil {
log.CtxLogger(ctx).Errorw("Failed to perform common discovery", "error", err)
return
}
ch <- discoveryResult
log.CtxLogger(ctx).Infof("CommonDiscovery found %d processes.", len(discoveryResult.Processes))
fullChs := 0
for _, ch := range chs {
select {
case ch <- discoveryResult:
default:
fullChs++
}
}
if fullChs > 0 {
log.CtxLogger(ctx).Infof("CommonDiscovery found %d full channels that it was unable to write to.", fullChs)
}
select {
case <-ctx.Done():
log.CtxLogger(ctx).Info("CommonDiscovery cancellation requested")
Expand Down
133 changes: 132 additions & 1 deletion internal/commondiscovery/commondiscovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (
"testing"
"time"

dpb "google.golang.org/protobuf/types/known/durationpb"
"github.com/google/go-cmp/cmp"
"github.com/shirou/gopsutil/v3/process"
"github.com/GoogleCloudPlatform/workloadagent/internal/usagemetrics"
cpb "github.com/GoogleCloudPlatform/workloadagent/protos/configuration"
)

type errorProneProcessLister struct {
Expand Down Expand Up @@ -272,7 +274,7 @@ func ValidateResult(gotProcesses []ProcessWrapper, wantProcesses []ProcessWrappe
}
}

func TestCommonDiscovery(t *testing.T) {
func TestCommonDiscoveryLoop(t *testing.T) {
tests := []struct {
name string
d *DiscoveryService
Expand Down Expand Up @@ -389,3 +391,132 @@ func TestCommonDiscovery(t *testing.T) {
ValidateResult(result.Processes, tc.want.Processes, tc.name, t)
}
}

func TestCommonDiscoveryUnbufferedChannels(t *testing.T) {
tests := []struct {
name string
d *DiscoveryService
chs []chan Result
want Result
}{
{
name: "UnbufferedChannelsDoNotHang",
d: &DiscoveryService{
ProcessLister: fakeProcessLister{processes: []processStub{
{username: "user1", pid: 123, name: "test"},
}},
},
chs: []chan Result{make(chan Result), make(chan Result), make(chan Result)},
want: Result{},
},
}
ctx, cancel := context.WithCancel(context.Background())
cancel()
for _, tc := range tests {
// This test is just checking that the common discovery loop does not hang when the channels are unbuffered.
tc.d.CommonDiscovery(ctx, tc.chs)
}
}

func TestCommonDiscoveryFullChannel(t *testing.T) {
tests := []struct {
name string
d *DiscoveryService
chs []chan Result
want Result
}{
{
name: "OneChannelFullOneChannelNotBlocked",
d: &DiscoveryService{
Config: &cpb.Configuration{
CommonDiscovery: &cpb.CommonDiscovery{
// every 0.1 seconds
CollectionFrequency: &dpb.Duration{Nanos: 1000000000 * 0.1},
},
},
ProcessLister: fakeProcessLister{processes: []processStub{
{username: "user1", pid: 123, name: "test"},
}},
},
chs: []chan Result{make(chan Result, 1), make(chan Result, 1)},
want: Result{
Processes: []ProcessWrapper{
processStub{username: "user1", pid: 123, name: "test"},
},
},
},
}

ctx, cancel := context.WithCancel(context.Background())
cancel()
for range 10 {
for _, tc := range tests {
tc.d.CommonDiscovery(ctx, tc.chs)
// Ignore the second channel to make sure will not block the first channel.
result := <-tc.chs[0]
ValidateResult(result.Processes, tc.want.Processes, tc.name, t)
}
}
}

func TestCommonDiscovery(t *testing.T) {
tests := []struct {
name string
d *DiscoveryService
chs []chan Result
want Result
}{
{
name: "MultipleChannels",
d: &DiscoveryService{
ProcessLister: fakeProcessLister{processes: []processStub{
{username: "user1", pid: 123, name: "test"},
}},
},
chs: []chan Result{make(chan Result, 1), make(chan Result, 1)},
want: Result{
Processes: []ProcessWrapper{
processStub{username: "user1", pid: 123, name: "test"},
},
},
},
{
name: "SingleChannel",
d: &DiscoveryService{
ProcessLister: fakeProcessLister{processes: []processStub{
{username: "user1", pid: 123, name: "test"},
}},
},
chs: []chan Result{make(chan Result, 1)},
want: Result{
Processes: []ProcessWrapper{
processStub{username: "user1", pid: 123, name: "test"},
},
},
},
{
name: "ZeroChannelsDoesNotHang",
d: &DiscoveryService{
ProcessLister: fakeProcessLister{processes: []processStub{
{username: "user1", pid: 123, name: "test"},
}},
},
chs: []chan Result{},
want: Result{
Processes: []ProcessWrapper{
processStub{username: "user1", pid: 123, name: "test"},
},
},
},
}

ctx, cancel := context.WithCancel(context.Background())
cancel()
for _, tc := range tests {
tc.d.CommonDiscovery(ctx, tc.chs)
for _, ch := range tc.chs {
result := <-ch
ValidateResult(result.Processes, tc.want.Processes, tc.name, t)
}
}
}
11 changes: 7 additions & 4 deletions internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,18 @@ func (d *Daemon) startdaemonHandler(ctx context.Context, cancel context.CancelFu
signal.Notify(shutdownch, syscall.SIGINT, syscall.SIGTERM, os.Interrupt)

log.Logger.Info("Starting common discovery")
cdCh := make(chan commondiscovery.Result)
oracleCh := make(chan commondiscovery.Result,1)
mySQLCh := make(chan commondiscovery.Result, 1)
cdChs := []chan commondiscovery.Result{mySQLCh, oracleCh}
commondiscovery := commondiscovery.DiscoveryService{
ProcessLister: commondiscovery.DefaultProcessLister{},
ReadFile: os.ReadFile,
Hostname: os.Hostname,
Config: d.config,
}
recoverableStart := &recovery.RecoverableRoutine{
Routine: commondiscovery.CommonDiscovery,
RoutineArg: cdCh,
RoutineArg: cdChs,
ErrorCode: commondiscovery.ErrorCode(),
ExpectedMinDuration: commondiscovery.ExpectedMinDuration(),
UsageLogger: *usagemetrics.UsageLogger,
Expand All @@ -145,8 +148,8 @@ func (d *Daemon) startdaemonHandler(ctx context.Context, cancel context.CancelFu

// Add any additional services here.
d.services = []Service{
&oracle.Service{Config: d.config, CloudProps: d.cloudProps, CommonCh: cdCh},
&mysql.Service{Config: d.config, CloudProps: d.cloudProps, CommonCh: cdCh},
&oracle.Service{Config: d.config, CloudProps: d.cloudProps, CommonCh: oracleCh},
&mysql.Service{Config: d.config, CloudProps: d.cloudProps, CommonCh: mySQLCh},
}
for _, service := range d.services {
log.Logger.Infof("Starting %s", service.String())
Expand Down
5 changes: 5 additions & 0 deletions protos/configuration/configuration.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ message Configuration {
AgentProperties agent_properties = 5;
OracleConfiguration oracle_configuration = 6;
MySQLConfiguration mysql_configuration = 7;
CommonDiscovery common_discovery = 8;
}

message CloudProperties {
Expand Down Expand Up @@ -80,6 +81,10 @@ message MySQLConfiguration {
optional bool enabled = 1;
}

message CommonDiscovery {
google.protobuf.Duration collection_frequency = 1;
}

message ConnectionParameters {
string username = 1;
SecretRef secret = 2;
Expand Down