diff --git a/pkg/scalers/liiklus_scaler.go b/pkg/scalers/liiklus_scaler.go index 9ca9947fd0a..4d6782ea03a 100644 --- a/pkg/scalers/liiklus_scaler.go +++ b/pkg/scalers/liiklus_scaler.go @@ -3,7 +3,6 @@ package scalers import ( "context" "fmt" - "strconv" "time" "github.com/go-logr/logr" @@ -27,12 +26,12 @@ type liiklusScaler struct { } type liiklusMetadata struct { - lagThreshold int64 - activationLagThreshold int64 - address string - topic string - group string - groupVersion uint32 + LagThreshold int64 `keda:"name=lagThreshold,order=triggerMetadata,default=10"` + ActivationLagThreshold int64 `keda:"name=activationLagThreshold,order=triggerMetadata,default=0"` + Address string `keda:"name=address,order=triggerMetadata"` + Topic string `keda:"name=topic,order=triggerMetadata"` + Group string `keda:"name=group,order=triggerMetadata"` + GroupVersion uint32 `keda:"name=groupVersion,order=triggerMetadata,default=0"` triggerIndex int } @@ -70,7 +69,7 @@ func NewLiiklusScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { return nil, err } - conn, err := grpc.NewClient(lm.address, + conn, err := grpc.NewClient(lm.Address, grpc.WithDefaultServiceConfig(grpcConfig), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { @@ -94,21 +93,21 @@ func (s *liiklusScaler) GetMetricsAndActivity(ctx context.Context, metricName st return nil, false, err } - if totalLag/uint64(s.metadata.lagThreshold) > uint64(len(lags)) { - totalLag = uint64(s.metadata.lagThreshold) * uint64(len(lags)) + if totalLag/uint64(s.metadata.LagThreshold) > uint64(len(lags)) { + totalLag = uint64(s.metadata.LagThreshold) * uint64(len(lags)) } metric := GenerateMetricInMili(metricName, float64(totalLag)) - return []external_metrics.ExternalMetricValue{metric}, totalLag > uint64(s.metadata.activationLagThreshold), nil + return []external_metrics.ExternalMetricValue{metric}, totalLag > uint64(s.metadata.ActivationLagThreshold), nil } func (s *liiklusScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("liiklus-%s", s.metadata.topic))), + Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("liiklus-%s", s.metadata.Topic))), }, - Target: GetMetricTarget(s.metricType, s.metadata.lagThreshold), + Target: GetMetricTarget(s.metricType, s.metadata.LagThreshold), } metricSpec := v2.MetricSpec{External: externalMetric, Type: liiklusMetricType} return []v2.MetricSpec{metricSpec} @@ -131,9 +130,9 @@ func (s *liiklusScaler) getLag(ctx context.Context) (uint64, map[uint32]uint64, ctx1, cancel1 := context.WithTimeout(ctx, 10*time.Second) defer cancel1() gor, err := s.client.GetOffsets(ctx1, &liiklus_service.GetOffsetsRequest{ - Topic: s.metadata.topic, - Group: s.metadata.group, - GroupVersion: s.metadata.groupVersion, + Topic: s.metadata.Topic, + Group: s.metadata.Group, + GroupVersion: s.metadata.GroupVersion, }) if err != nil { return 0, nil, err @@ -142,7 +141,7 @@ func (s *liiklusScaler) getLag(ctx context.Context) (uint64, map[uint32]uint64, ctx2, cancel2 := context.WithTimeout(ctx, 10*time.Second) defer cancel2() geor, err := s.client.GetEndOffsets(ctx2, &liiklus_service.GetEndOffsetsRequest{ - Topic: s.metadata.topic, + Topic: s.metadata.Topic, }) if err != nil { return 0, nil, err @@ -159,50 +158,17 @@ func (s *liiklusScaler) getLag(ctx context.Context) (uint64, map[uint32]uint64, } func parseLiiklusMetadata(config *scalersconfig.ScalerConfig) (*liiklusMetadata, error) { - lagThreshold := defaultLiiklusLagThreshold - activationLagThreshold := defaultLiiklusActivationLagThreshold - - if val, ok := config.TriggerMetadata[liiklusLagThresholdMetricName]; ok { - t, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return nil, fmt.Errorf("error parsing %s: %w", liiklusLagThresholdMetricName, err) - } - lagThreshold = t + meta := &liiklusMetadata{} + if err := config.TypedConfig(meta); err != nil { + return nil, fmt.Errorf("error parsing liiklus metadata: %w", err) } - - if val, ok := config.TriggerMetadata[liiklusActivationLagThresholdMetricName]; ok { - t, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return nil, fmt.Errorf("error parsing %s: %w", liiklusActivationLagThresholdMetricName, err) - } - activationLagThreshold = t - } - - groupVersion := uint32(0) - if val, ok := config.TriggerMetadata["groupVersion"]; ok { - t, err := strconv.ParseUint(val, 10, 32) - if err != nil { - return nil, fmt.Errorf("error parsing groupVersion: %w", err) - } - groupVersion = uint32(t) - } - switch { - case config.TriggerMetadata["topic"] == "": + case meta.Topic == "": return nil, ErrLiiklusNoTopic - case config.TriggerMetadata["address"] == "": + case meta.Address == "": return nil, ErrLiiklusNoAddress - case config.TriggerMetadata["group"] == "": + case meta.Group == "": return nil, ErrLiiklusNoGroup } - - return &liiklusMetadata{ - topic: config.TriggerMetadata["topic"], - address: config.TriggerMetadata["address"], - group: config.TriggerMetadata["group"], - groupVersion: groupVersion, - lagThreshold: lagThreshold, - activationLagThreshold: activationLagThreshold, - triggerIndex: config.TriggerIndex, - }, nil + return meta, nil } diff --git a/pkg/scalers/liiklus_scaler_test.go b/pkg/scalers/liiklus_scaler_test.go index 268157beb9f..80713727084 100644 --- a/pkg/scalers/liiklus_scaler_test.go +++ b/pkg/scalers/liiklus_scaler_test.go @@ -2,8 +2,7 @@ package scalers import ( "context" - "errors" - "strconv" + "fmt" "testing" "github.com/go-logr/logr" @@ -15,12 +14,10 @@ import ( ) type parseLiiklusMetadataTestData struct { - metadata map[string]string - err error - liiklusAddress string - group string - topic string - threshold int64 + name string + metadata map[string]string + ExpectedErr error + ExpectedMetatada *liiklusMetadata } type liiklusMetricIdentifier struct { @@ -30,12 +27,64 @@ type liiklusMetricIdentifier struct { } var parseLiiklusMetadataTestDataset = []parseLiiklusMetadataTestData{ - {map[string]string{}, ErrLiiklusNoTopic, "", "", "", 0}, - {map[string]string{"topic": "foo"}, ErrLiiklusNoAddress, "", "", "", 0}, - {map[string]string{"topic": "foo", "address": "bar:6565"}, ErrLiiklusNoGroup, "", "", "", 0}, - {map[string]string{"topic": "foo", "address": "bar:6565", "group": "mygroup"}, nil, "bar:6565", "mygroup", "foo", 10}, - {map[string]string{"topic": "foo", "address": "bar:6565", "group": "mygroup", "activationLagThreshold": "aa"}, strconv.ErrSyntax, "bar:6565", "mygroup", "foo", 10}, - {map[string]string{"topic": "foo", "address": "bar:6565", "group": "mygroup", "lagThreshold": "15"}, nil, "bar:6565", "mygroup", "foo", 15}, + { + name: "Empty metadata", + metadata: map[string]string{}, + ExpectedErr: fmt.Errorf("error parsing liiklus metadata: " + + "missing required parameter \"address\" in [triggerMetadata]\n" + + "missing required parameter \"topic\" in [triggerMetadata]\n" + + "missing required parameter \"group\" in [triggerMetadata]"), + ExpectedMetatada: nil, + }, + { + name: "Empty address", + metadata: map[string]string{"topic": "foo"}, + ExpectedErr: fmt.Errorf("error parsing liiklus metadata: " + + "missing required parameter \"address\" in [triggerMetadata]\n" + + "missing required parameter \"group\" in [triggerMetadata]"), + ExpectedMetatada: nil, + }, + { + name: "Empty group", + metadata: map[string]string{"topic": "foo", "address": "using-mock"}, + ExpectedErr: fmt.Errorf("error parsing liiklus metadata: " + + "missing required parameter \"group\" in [triggerMetadata]"), + ExpectedMetatada: nil, + }, + { + name: "Valid", + metadata: map[string]string{"topic": "foo", "address": "using-mock", "group": "mygroup"}, + ExpectedErr: nil, + ExpectedMetatada: &liiklusMetadata{ + LagThreshold: defaultLiiklusLagThreshold, + ActivationLagThreshold: defaultLiiklusActivationLagThreshold, + Address: "using-mock", + Topic: "foo", + Group: "mygroup", + GroupVersion: 0, + triggerIndex: 0, + }, + }, + { + name: "Invalid activationLagThreshold", + metadata: map[string]string{"topic": "foo", "address": "using-mock", "group": "mygroup", "activationLagThreshold": "invalid"}, + ExpectedErr: fmt.Errorf("error parsing liiklus metadata: unable to set param \"activationLagThreshold\" value \"invalid\": unable to unmarshal to field type int64: invalid character 'i' looking for beginning of value"), + ExpectedMetatada: nil, + }, + { + name: "Custom lagThreshold", + metadata: map[string]string{"topic": "foo", "address": "using-mock", "group": "mygroup", "lagThreshold": "20"}, + ExpectedErr: nil, + ExpectedMetatada: &liiklusMetadata{ + LagThreshold: 20, + ActivationLagThreshold: defaultLiiklusActivationLagThreshold, + Address: "using-mock", + Topic: "foo", + Group: "mygroup", + GroupVersion: 0, + triggerIndex: 0, + }, + }, } var liiklusMetricIdentifiers = []liiklusMetricIdentifier{ @@ -45,38 +94,44 @@ var liiklusMetricIdentifiers = []liiklusMetricIdentifier{ func TestLiiklusParseMetadata(t *testing.T) { for _, testData := range parseLiiklusMetadataTestDataset { - meta, err := parseLiiklusMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata}) - if err != nil && testData.err == nil { - t.Error("Expected success but got error", err) - continue - } - if testData.err != nil && err == nil { - t.Error("Expected error but got success") - continue - } - if testData.err != nil && err != nil && !errors.Is(err, testData.err) { - t.Errorf("Expected error %v but got %v", testData.err, err) - continue - } - if err != nil { - continue - } - if testData.liiklusAddress != meta.address { - t.Errorf("Expected address %q but got %q\n", testData.liiklusAddress, meta.address) - continue - } - if meta.group != testData.group { - t.Errorf("Expected group %q but got %q\n", testData.group, meta.group) - continue - } - if meta.topic != testData.topic { - t.Errorf("Expected topic %q but got %q\n", testData.topic, meta.topic) - continue - } - if meta.lagThreshold != testData.threshold { - t.Errorf("Expected threshold %d but got %d\n", testData.threshold, meta.lagThreshold) - continue - } + t.Run(testData.name, func(t *testing.T) { + meta, err := parseLiiklusMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata}) + + // error cases + if testData.ExpectedErr != nil { + if err == nil { + t.Errorf("Expected error %v but got success", testData.ExpectedErr) + } else if err.Error() != testData.ExpectedErr.Error() { + t.Errorf("Expected error %v but got %v", testData.ExpectedErr, err) + } + return // Skip the rest of the checks for error cases + } + + // success cases + if err != nil { + t.Errorf("Expected success but got error %v", err) + } + if testData.ExpectedMetatada != nil { + if testData.ExpectedMetatada.Address != meta.Address { + t.Errorf("Expected address %q but got %q", testData.ExpectedMetatada.Address, meta.Address) + } + if meta.Group != testData.ExpectedMetatada.Group { + t.Errorf("Expected group %q but got %q", testData.ExpectedMetatada.Group, meta.Group) + } + if meta.Topic != testData.ExpectedMetatada.Topic { + t.Errorf("Expected topic %q but got %q", testData.ExpectedMetatada.Topic, meta.Topic) + } + if meta.LagThreshold != testData.ExpectedMetatada.LagThreshold { + t.Errorf("Expected threshold %d but got %d", testData.ExpectedMetatada.LagThreshold, meta.LagThreshold) + } + if meta.ActivationLagThreshold != testData.ExpectedMetatada.ActivationLagThreshold { + t.Errorf("Expected activation threshold %d but got %d", testData.ExpectedMetatada.ActivationLagThreshold, meta.ActivationLagThreshold) + } + if meta.GroupVersion != testData.ExpectedMetatada.GroupVersion { + t.Errorf("Expected group version %d but got %d", testData.ExpectedMetatada.GroupVersion, meta.GroupVersion) + } + } + }) } }