Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor liklus scaler #6433

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 22 additions & 70 deletions pkg/scalers/liiklus_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scalers
import (
"context"
"fmt"
"strconv"
"time"

"github.com/go-logr/logr"
Expand All @@ -27,24 +26,17 @@ type liiklusScaler struct {
}

type liiklusMetadata struct {
lagThreshold int64
activationLagThreshold int64
address string
topic string
group string
groupVersion uint32
LagThreshold int64 `keda:"name=lagThreshold,order=triggerMetadata,default=10"`
ActivationLagThreshold int64 `keda:"name=activationLagThreshold,order=triggerMetadata,default=0"`
Address string `keda:"name=address,order=triggerMetadata"`
Topic string `keda:"name=topic,order=triggerMetadata"`
Group string `keda:"name=group,order=triggerMetadata"`
GroupVersion uint32 `keda:"name=groupVersion,order=triggerMetadata,default=0"`
triggerIndex int
}

const (
defaultLiiklusLagThreshold int64 = 10
defaultLiiklusActivationLagThreshold int64 = 0
)

const (
liiklusLagThresholdMetricName = "lagThreshold"
liiklusActivationLagThresholdMetricName = "activationLagThreshold"
liiklusMetricType = "External"
liiklusMetricType = "External"
)

var (
Expand All @@ -70,7 +62,7 @@ func NewLiiklusScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
return nil, err
}

conn, err := grpc.NewClient(lm.address,
conn, err := grpc.NewClient(lm.Address,
grpc.WithDefaultServiceConfig(grpcConfig),
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
Expand All @@ -94,21 +86,21 @@ func (s *liiklusScaler) GetMetricsAndActivity(ctx context.Context, metricName st
return nil, false, err
}

if totalLag/uint64(s.metadata.lagThreshold) > uint64(len(lags)) {
totalLag = uint64(s.metadata.lagThreshold) * uint64(len(lags))
if totalLag/uint64(s.metadata.LagThreshold) > uint64(len(lags)) {
totalLag = uint64(s.metadata.LagThreshold) * uint64(len(lags))
}

metric := GenerateMetricInMili(metricName, float64(totalLag))

return []external_metrics.ExternalMetricValue{metric}, totalLag > uint64(s.metadata.activationLagThreshold), nil
return []external_metrics.ExternalMetricValue{metric}, totalLag > uint64(s.metadata.ActivationLagThreshold), nil
}

func (s *liiklusScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("liiklus-%s", s.metadata.topic))),
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("liiklus-%s", s.metadata.Topic))),
},
Target: GetMetricTarget(s.metricType, s.metadata.lagThreshold),
Target: GetMetricTarget(s.metricType, s.metadata.LagThreshold),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: liiklusMetricType}
return []v2.MetricSpec{metricSpec}
Expand All @@ -131,9 +123,9 @@ func (s *liiklusScaler) getLag(ctx context.Context) (uint64, map[uint32]uint64,
ctx1, cancel1 := context.WithTimeout(ctx, 10*time.Second)
defer cancel1()
gor, err := s.client.GetOffsets(ctx1, &liiklus_service.GetOffsetsRequest{
Topic: s.metadata.topic,
Group: s.metadata.group,
GroupVersion: s.metadata.groupVersion,
Topic: s.metadata.Topic,
Group: s.metadata.Group,
GroupVersion: s.metadata.GroupVersion,
})
if err != nil {
return 0, nil, err
Expand All @@ -142,7 +134,7 @@ func (s *liiklusScaler) getLag(ctx context.Context) (uint64, map[uint32]uint64,
ctx2, cancel2 := context.WithTimeout(ctx, 10*time.Second)
defer cancel2()
geor, err := s.client.GetEndOffsets(ctx2, &liiklus_service.GetEndOffsetsRequest{
Topic: s.metadata.topic,
Topic: s.metadata.Topic,
})
if err != nil {
return 0, nil, err
Expand All @@ -159,50 +151,10 @@ func (s *liiklusScaler) getLag(ctx context.Context) (uint64, map[uint32]uint64,
}

func parseLiiklusMetadata(config *scalersconfig.ScalerConfig) (*liiklusMetadata, error) {
lagThreshold := defaultLiiklusLagThreshold
activationLagThreshold := defaultLiiklusActivationLagThreshold

if val, ok := config.TriggerMetadata[liiklusLagThresholdMetricName]; ok {
t, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing %s: %w", liiklusLagThresholdMetricName, err)
}
lagThreshold = t
}

if val, ok := config.TriggerMetadata[liiklusActivationLagThresholdMetricName]; ok {
t, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing %s: %w", liiklusActivationLagThresholdMetricName, err)
}
activationLagThreshold = t
meta := &liiklusMetadata{}
if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing liiklus metadata: %w", err)
}

groupVersion := uint32(0)
if val, ok := config.TriggerMetadata["groupVersion"]; ok {
t, err := strconv.ParseUint(val, 10, 32)
if err != nil {
return nil, fmt.Errorf("error parsing groupVersion: %w", err)
}
groupVersion = uint32(t)
}

switch {
case config.TriggerMetadata["topic"] == "":
return nil, ErrLiiklusNoTopic
case config.TriggerMetadata["address"] == "":
return nil, ErrLiiklusNoAddress
case config.TriggerMetadata["group"] == "":
return nil, ErrLiiklusNoGroup
}

return &liiklusMetadata{
topic: config.TriggerMetadata["topic"],
address: config.TriggerMetadata["address"],
group: config.TriggerMetadata["group"],
groupVersion: groupVersion,
lagThreshold: lagThreshold,
activationLagThreshold: activationLagThreshold,
triggerIndex: config.TriggerIndex,
}, nil
meta.triggerIndex = config.TriggerIndex
return meta, nil
wozniakjan marked this conversation as resolved.
Show resolved Hide resolved
}
169 changes: 112 additions & 57 deletions pkg/scalers/liiklus_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package scalers

import (
"context"
"errors"
"strconv"
"fmt"
"testing"

"github.com/go-logr/logr"
Expand All @@ -15,12 +14,10 @@ import (
)

type parseLiiklusMetadataTestData struct {
metadata map[string]string
err error
liiklusAddress string
group string
topic string
threshold int64
name string
metadata map[string]string
ExpectedErr error
ExpectedMetatada *liiklusMetadata
}

type liiklusMetricIdentifier struct {
Expand All @@ -30,12 +27,64 @@ type liiklusMetricIdentifier struct {
}

var parseLiiklusMetadataTestDataset = []parseLiiklusMetadataTestData{
{map[string]string{}, ErrLiiklusNoTopic, "", "", "", 0},
{map[string]string{"topic": "foo"}, ErrLiiklusNoAddress, "", "", "", 0},
{map[string]string{"topic": "foo", "address": "bar:6565"}, ErrLiiklusNoGroup, "", "", "", 0},
{map[string]string{"topic": "foo", "address": "bar:6565", "group": "mygroup"}, nil, "bar:6565", "mygroup", "foo", 10},
{map[string]string{"topic": "foo", "address": "bar:6565", "group": "mygroup", "activationLagThreshold": "aa"}, strconv.ErrSyntax, "bar:6565", "mygroup", "foo", 10},
{map[string]string{"topic": "foo", "address": "bar:6565", "group": "mygroup", "lagThreshold": "15"}, nil, "bar:6565", "mygroup", "foo", 15},
{
name: "Empty metadata",
metadata: map[string]string{},
ExpectedErr: fmt.Errorf("error parsing liiklus metadata: " +
"missing required parameter \"address\" in [triggerMetadata]\n" +
"missing required parameter \"topic\" in [triggerMetadata]\n" +
"missing required parameter \"group\" in [triggerMetadata]"),
ExpectedMetatada: nil,
},
{
name: "Empty address",
metadata: map[string]string{"topic": "foo"},
ExpectedErr: fmt.Errorf("error parsing liiklus metadata: " +
"missing required parameter \"address\" in [triggerMetadata]\n" +
"missing required parameter \"group\" in [triggerMetadata]"),
ExpectedMetatada: nil,
},
{
name: "Empty group",
metadata: map[string]string{"topic": "foo", "address": "using-mock"},
ExpectedErr: fmt.Errorf("error parsing liiklus metadata: " +
"missing required parameter \"group\" in [triggerMetadata]"),
ExpectedMetatada: nil,
},
{
name: "Valid",
metadata: map[string]string{"topic": "foo", "address": "using-mock", "group": "mygroup"},
ExpectedErr: nil,
ExpectedMetatada: &liiklusMetadata{
LagThreshold: 10,
ActivationLagThreshold: 0,
Address: "using-mock",
Topic: "foo",
Group: "mygroup",
GroupVersion: 0,
triggerIndex: 0,
},
},
{
name: "Invalid activationLagThreshold",
metadata: map[string]string{"topic": "foo", "address": "using-mock", "group": "mygroup", "activationLagThreshold": "invalid"},
ExpectedErr: fmt.Errorf("error parsing liiklus metadata: unable to set param \"activationLagThreshold\" value \"invalid\": unable to unmarshal to field type int64: invalid character 'i' looking for beginning of value"),
ExpectedMetatada: nil,
},
{
name: "Custom lagThreshold",
metadata: map[string]string{"topic": "foo", "address": "using-mock", "group": "mygroup", "lagThreshold": "20"},
ExpectedErr: nil,
ExpectedMetatada: &liiklusMetadata{
LagThreshold: 20,
ActivationLagThreshold: 0,
Address: "using-mock",
Topic: "foo",
Group: "mygroup",
GroupVersion: 0,
triggerIndex: 0,
},
},
}

var liiklusMetricIdentifiers = []liiklusMetricIdentifier{
Expand All @@ -45,38 +94,44 @@ var liiklusMetricIdentifiers = []liiklusMetricIdentifier{

func TestLiiklusParseMetadata(t *testing.T) {
for _, testData := range parseLiiklusMetadataTestDataset {
meta, err := parseLiiklusMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata})
if err != nil && testData.err == nil {
t.Error("Expected success but got error", err)
continue
}
if testData.err != nil && err == nil {
t.Error("Expected error but got success")
continue
}
if testData.err != nil && err != nil && !errors.Is(err, testData.err) {
t.Errorf("Expected error %v but got %v", testData.err, err)
continue
}
if err != nil {
continue
}
if testData.liiklusAddress != meta.address {
t.Errorf("Expected address %q but got %q\n", testData.liiklusAddress, meta.address)
continue
}
if meta.group != testData.group {
t.Errorf("Expected group %q but got %q\n", testData.group, meta.group)
continue
}
if meta.topic != testData.topic {
t.Errorf("Expected topic %q but got %q\n", testData.topic, meta.topic)
continue
}
if meta.lagThreshold != testData.threshold {
t.Errorf("Expected threshold %d but got %d\n", testData.threshold, meta.lagThreshold)
continue
}
t.Run(testData.name, func(t *testing.T) {
meta, err := parseLiiklusMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata})

// error cases
if testData.ExpectedErr != nil {
if err == nil {
t.Errorf("Expected error %v but got success", testData.ExpectedErr)
} else if err.Error() != testData.ExpectedErr.Error() {
t.Errorf("Expected error %v but got %v", testData.ExpectedErr, err)
}
return // Skip the rest of the checks for error cases
}

// success cases
if err != nil {
t.Errorf("Expected success but got error %v", err)
}
if testData.ExpectedMetatada != nil {
if testData.ExpectedMetatada.Address != meta.Address {
t.Errorf("Expected address %q but got %q", testData.ExpectedMetatada.Address, meta.Address)
}
if meta.Group != testData.ExpectedMetatada.Group {
t.Errorf("Expected group %q but got %q", testData.ExpectedMetatada.Group, meta.Group)
}
if meta.Topic != testData.ExpectedMetatada.Topic {
t.Errorf("Expected topic %q but got %q", testData.ExpectedMetatada.Topic, meta.Topic)
}
if meta.LagThreshold != testData.ExpectedMetatada.LagThreshold {
t.Errorf("Expected threshold %d but got %d", testData.ExpectedMetatada.LagThreshold, meta.LagThreshold)
}
if meta.ActivationLagThreshold != testData.ExpectedMetatada.ActivationLagThreshold {
t.Errorf("Expected activation threshold %d but got %d", testData.ExpectedMetatada.ActivationLagThreshold, meta.ActivationLagThreshold)
}
if meta.GroupVersion != testData.ExpectedMetatada.GroupVersion {
t.Errorf("Expected group version %d but got %d", testData.ExpectedMetatada.GroupVersion, meta.GroupVersion)
}
}
})
}
}

Expand Down Expand Up @@ -172,16 +227,16 @@ func TestLiiklusScalerGetMetricsBehavior(t *testing.T) {

func TestLiiklusGetMetricSpecForScaling(t *testing.T) {
for _, testData := range liiklusMetricIdentifiers {
meta, err := parseLiiklusMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, TriggerIndex: testData.triggerIndex})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockLiiklusScaler := liiklusScaler{"", meta, nil, nil, logr.Discard()}

metricSpec := mockLiiklusScaler.GetMetricSpecForScaling(context.Background())
metricName := metricSpec[0].External.Metric.Name
if metricName != testData.name {
t.Error("Wrong External metric source name:", metricName)
}
t.Run(testData.name, func(t *testing.T) {
meta, err := parseLiiklusMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, TriggerIndex: testData.triggerIndex})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockLiiklusScaler := liiklusScaler{"", meta, nil, nil, logr.Discard()}
metricSpec := mockLiiklusScaler.GetMetricSpecForScaling(context.Background())
if metricSpec[0].External.Metric.Name != testData.name {
t.Errorf("Wrong External metric source name: %s", metricSpec[0].External.Metric.Name)
}
})
}
}
Loading