Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor gcp cloud tasks scaler #6406

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 15 additions & 60 deletions pkg/scalers/gcp_cloud_tasks_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scalers
import (
"context"
"fmt"
"strconv"

"github.com/go-logr/logr"
v2 "k8s.io/api/autoscaling/v2"
Expand All @@ -16,8 +15,6 @@ import (

const (
cloudTasksStackDriverQueueSize = "cloudtasks.googleapis.com/queue/depth"

cloudTaskDefaultValue = 100
)

type gcpCloudTasksScaler struct {
Expand All @@ -28,12 +25,12 @@ type gcpCloudTasksScaler struct {
}

type gcpCloudTaskMetadata struct {
value float64
activationValue float64
filterDuration int64
Value float64 `keda:"name=value, order=triggerMetadata, optional, default=100"`
ActivationValue float64 `keda:"name=activationValue, order=triggerMetadata, optional, default=0"`
FilterDuration int64 `keda:"name=filterDuration, order=triggerMetadata, optional"`

queueName string
projectID string
QueueName string `keda:"name=queueName, order=triggerMetadata"`
ProjectID string `keda:"name=projectID, order=triggerMetadata"`
gcpAuthorization *gcp.AuthorizationMetadata
triggerIndex int
}
Expand All @@ -60,61 +57,19 @@ func NewGcpCloudTasksScaler(config *scalersconfig.ScalerConfig) (Scaler, error)
}

func parseGcpCloudTasksMetadata(config *scalersconfig.ScalerConfig) (*gcpCloudTaskMetadata, error) {
meta := gcpCloudTaskMetadata{value: cloudTaskDefaultValue}

value, valuePresent := config.TriggerMetadata["value"]

if valuePresent {
triggerValue, err := strconv.ParseFloat(value, 64)
if err != nil {
return nil, fmt.Errorf("value parsing error %w", err)
}
meta.value = triggerValue
}

if val, ok := config.TriggerMetadata["queueName"]; ok {
if val == "" {
return nil, fmt.Errorf("no queue name given")
}
meta.queueName = val
} else {
return nil, fmt.Errorf("no queue name given")
}

if val, ok := config.TriggerMetadata["filterDuration"]; ok {
filterDuration, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("filterDuration parsing error %w", err)
}
meta.filterDuration = filterDuration
}

meta.activationValue = 0
if val, ok := config.TriggerMetadata["activationValue"]; ok {
activationValue, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("activationValue parsing error %w", err)
}
meta.activationValue = activationValue
}

if val, ok := config.TriggerMetadata["projectID"]; ok {
if val == "" {
return nil, fmt.Errorf("no project id given")
}

meta.projectID = val
} else {
return nil, fmt.Errorf("no project id given")
meta := &gcpCloudTaskMetadata{}
if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing Gcp cloud task metadata: %w", err)
}

auth, err := gcp.GetGCPAuthorization(config)
if err != nil {
return nil, err
}

meta.gcpAuthorization = auth
meta.triggerIndex = config.TriggerIndex
return &meta, nil
return meta, nil
}

func (s *gcpCloudTasksScaler) Close(context.Context) error {
Expand All @@ -132,9 +87,9 @@ func (s *gcpCloudTasksScaler) Close(context.Context) error {
func (s *gcpCloudTasksScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("gcp-ct-%s", s.metadata.queueName))),
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("gcp-ct-%s", s.metadata.QueueName))),
},
Target: GetMetricTargetMili(s.metricType, s.metadata.value),
Target: GetMetricTargetMili(s.metricType, s.metadata.Value),
}

// Create the metric spec for the HPA
Expand All @@ -158,7 +113,7 @@ func (s *gcpCloudTasksScaler) GetMetricsAndActivity(ctx context.Context, metricN

metric := GenerateMetricInMili(metricName, value)

return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.activationValue, nil
return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.ActivationValue, nil
}

func (s *gcpCloudTasksScaler) setStackdriverClient(ctx context.Context) error {
Expand All @@ -185,9 +140,9 @@ func (s *gcpCloudTasksScaler) getMetrics(ctx context.Context, metricType string)
return -1, err
}
}
filter := `metric.type="` + metricType + `" AND resource.labels.queue_id="` + s.metadata.queueName + `"`
filter := `metric.type="` + metricType + `" AND resource.labels.queue_id="` + s.metadata.QueueName + `"`

// Cloud Tasks metrics are collected every 60 seconds so no need to aggregate them.
// See: https://cloud.google.com/monitoring/api/metrics_gcp#gcp-cloudtasks
return s.client.GetMetrics(ctx, filter, s.metadata.projectID, nil, nil, s.metadata.filterDuration)
return s.client.GetMetrics(ctx, filter, s.metadata.ProjectID, nil, nil, s.metadata.FilterDuration)
}
125 changes: 99 additions & 26 deletions pkg/scalers/gcp_cloud_tasks_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package scalers

import (
"context"
"reflect"
"testing"

"github.com/go-logr/logr"

"github.com/kedacore/keda/v2/pkg/scalers/gcp"
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
)

Expand All @@ -17,6 +19,8 @@ type parseGcpCloudTasksMetadataTestData struct {
authParams map[string]string
metadata map[string]string
isError bool
expected *gcpCloudTaskMetadata
comment string
}

type gcpCloudTasksMetricIdentifier struct {
Expand All @@ -26,25 +30,82 @@ type gcpCloudTasksMetricIdentifier struct {
}

var testGcpCloudTasksMetadata = []parseGcpCloudTasksMetadataTestData{
{map[string]string{}, map[string]string{}, true},
// all properly formed
{nil, map[string]string{"queueName": "myQueue", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "projectID": "myproject", "activationValue": "5"}, false},
// missing subscriptionName
{nil, map[string]string{"queueName": "", "value": "7", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// missing credentials
{nil, map[string]string{"queueName": "myQueue", "value": "7", "projectID": "myproject", "credentialsFromEnv": ""}, true},
// malformed subscriptionSize
{nil, map[string]string{"queueName": "myQueue", "value": "AA", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// malformed mode
{nil, map[string]string{"queueName": "", "mode": "AA", "value": "7", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// malformed activationTargetValue
{nil, map[string]string{"queueName": "myQueue", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "projectID": "myproject", "activationValue": "AA"}, true},
// Credentials from AuthParams
{map[string]string{"GoogleApplicationCredentials": "Creds"}, map[string]string{"queueName": "myQueue", "value": "7", "projectID": "myproject"}, false},
// Credentials from AuthParams with empty creds
{map[string]string{"GoogleApplicationCredentials": ""}, map[string]string{"queueName": "myQueue", "subscriptionSize": "7", "projectID": "myproject"}, true},
// properly formed float value and activationTargetValue
{nil, map[string]string{"queueName": "mysubscription", "value": "7.1", "credentialsFromEnv": "SAMPLE_CREDS", "activationValue": "2.1", "projectID": "myproject"}, false},

{map[string]string{}, map[string]string{}, true, nil, "erro case"},

{nil, map[string]string{"queueName": "myQueue", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "projectID": "myproject", "activationValue": "5"}, false, &gcpCloudTaskMetadata{
Value: 7,
ActivationValue: 5,
FilterDuration: 0,
QueueName: "myQueue",
ProjectID: "myproject",
gcpAuthorization: &gcp.AuthorizationMetadata{
GoogleApplicationCredentials: "{}",
PodIdentityProviderEnabled: false,
},
triggerIndex: 0}, "all properly formed"},

{nil, map[string]string{"queueName": "", "value": "7", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true, nil, "missing subscriptionName"},

{nil, map[string]string{"queueName": "myQueue", "value": "7", "projectID": "myproject", "credentialsFromEnv": ""}, true, nil, "missing credentials"},

{nil, map[string]string{"queueName": "myQueue", "value": "AA", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true, nil, "malformed subscriptionSize"},

{nil, map[string]string{"queueName": "", "mode": "AA", "value": "7", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true, nil, "malformed mode"},

{nil, map[string]string{"queueName": "myQueue", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "projectID": "myproject", "activationValue": "AA"}, true, nil, "malformed activationTargetValue"},

{map[string]string{"GoogleApplicationCredentials": "Creds"}, map[string]string{"queueName": "myQueue", "value": "7", "projectID": "myproject"}, false, &gcpCloudTaskMetadata{
Value: 7,
ActivationValue: 0,
FilterDuration: 0,
QueueName: "myQueue",
ProjectID: "myproject",
gcpAuthorization: &gcp.AuthorizationMetadata{
GoogleApplicationCredentials: "Creds",
PodIdentityProviderEnabled: false,
},
triggerIndex: 0}, "Credentials from AuthParams"},

{map[string]string{"GoogleApplicationCredentials": ""}, map[string]string{"queueName": "myQueue", "subscriptionSize": "7", "projectID": "myproject"}, true, nil, "Credentials from AuthParams with empty creds"},

{nil, map[string]string{"queueName": "mysubscription", "value": "7.1", "credentialsFromEnv": "SAMPLE_CREDS", "activationValue": "2.1", "projectID": "myproject"}, false, &gcpCloudTaskMetadata{
Value: 7.1,
ActivationValue: 2.1,
FilterDuration: 0,
QueueName: "mysubscription",
ProjectID: "myproject",
gcpAuthorization: &gcp.AuthorizationMetadata{
GoogleApplicationCredentials: "{}",
PodIdentityProviderEnabled: false,
},
triggerIndex: 0}, "properly formed float value and activationTargetValue"},

{nil, map[string]string{"queueName": "myQueue", "projectID": "myProject", "credentialsFromEnv": "SAMPLE_CREDS"}, false, &gcpCloudTaskMetadata{
Value: 100,
ActivationValue: 0,
FilterDuration: 0,
QueueName: "myQueue",
ProjectID: "myProject",
gcpAuthorization: &gcp.AuthorizationMetadata{
GoogleApplicationCredentials: "{}",
PodIdentityProviderEnabled: false,
},
triggerIndex: 0}, "test default value (100) when value is not provided"},

{nil, map[string]string{"queueName": "myQueue", "projectID": "myProject", "credentialsFromEnv": "SAMPLE_CREDS", "activationValue": "5"}, false, &gcpCloudTaskMetadata{
Value: 100,
ActivationValue: 5,
FilterDuration: 0,
QueueName: "myQueue",
ProjectID: "myProject",
gcpAuthorization: &gcp.AuthorizationMetadata{
GoogleApplicationCredentials: "{}",
PodIdentityProviderEnabled: false,
},
triggerIndex: 0}, "test default value with specified activationVal"},

{nil, map[string]string{"queueName": "myQueue", "projectID": "myProject", "credentialsFromEnv": "SAMPLE_CREDS", "filterDuration": "invalid"}, true, nil, "test invalid filterDuration with default values"},
}

var gcpCloudTasksMetricIdentifiers = []gcpCloudTasksMetricIdentifier{
Expand All @@ -54,13 +115,25 @@ var gcpCloudTasksMetricIdentifiers = []gcpCloudTasksMetricIdentifier{

func TestGcpCloudTasksParseMetadata(t *testing.T) {
for _, testData := range testGcpCloudTasksMetadata {
_, err := parseGcpCloudTasksMetadata(&scalersconfig.ScalerConfig{AuthParams: testData.authParams, TriggerMetadata: testData.metadata, ResolvedEnv: testGcpCloudTasksResolvedEnv})
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
t.Run(testData.comment, func(t *testing.T) {
metadata, err := parseGcpCloudTasksMetadata(&scalersconfig.ScalerConfig{
AuthParams: testData.authParams,
TriggerMetadata: testData.metadata,
ResolvedEnv: testGcpCloudTasksResolvedEnv,
})

if err != nil && !testData.isError {
t.Errorf("Expected success but got error")
}

if testData.isError && err == nil {
t.Errorf("Expected error but got success")
}

if !testData.isError && !reflect.DeepEqual(testData.expected, metadata) {
t.Fatalf("Expected %#v but got %+#v", testData.expected, metadata)
}
})
}
}

Expand Down
Loading