Skip to content

Commit

Permalink
RabbitMQ scaler changes
Browse files Browse the repository at this point in the history
Signed-off-by: Youn Jae Kim <[email protected]>
  • Loading branch information
aagusuab committed Dec 6, 2024
1 parent 3212330 commit 03aecf1
Show file tree
Hide file tree
Showing 12 changed files with 43 additions and 875 deletions.
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
cloud.google.com/go/secretmanager v1.14.2
cloud.google.com/go/storage v1.43.0
dario.cat/mergo v1.0.1
github.com/Azure/azure-amqp-common-go/v4 v4.2.0
github.com/Azure/azure-kusto-go v0.16.1
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.16.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.0
Expand All @@ -20,7 +19,6 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1
github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue v1.0.0
github.com/Azure/go-autorest/autorest v0.11.29
github.com/Azure/go-autorest/autorest/azure/auth v0.5.13
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.3
github.com/DataDog/datadog-api-client-go v1.16.0
github.com/Huawei/gophercloud v1.0.21
Expand Down Expand Up @@ -118,6 +116,8 @@ require (
sigs.k8s.io/kustomize/kustomize/v5 v5.5.0
)

require github.com/Azure/go-autorest/autorest/adal v0.9.23 // indirect

replace (
// pin k8s.io to v0.31.2 & sigs.k8s.io/controller-runtime to v0.19.1
github.com/google/cel-go => github.com/google/cel-go v0.20.1
Expand Down Expand Up @@ -204,7 +204,6 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dennwc/varint v1.0.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dimchansky/utfbom v1.1.1 // indirect
github.com/eapache/go-resiliency v1.7.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
Expand Down
5 changes: 3 additions & 2 deletions pkg/scalers/azure/azure_aad_workload_identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ package azure
import (
"context"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/AzureAD/microsoft-authentication-library-for-go/apps/confidential"
"os"
"strconv"
"strings"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/AzureAD/microsoft-authentication-library-for-go/apps/confidential"
)

// Azure AD Workload Identity Webhook will inject the following environment variables.
Expand Down
33 changes: 15 additions & 18 deletions pkg/scalers/azure/azure_app_insights.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,6 @@ func toISO8601(time string) (string, error) {
return fmt.Sprintf("PT%02dH%02dM", hours, minutes), nil
}

//func getAuthConfig(ctx context.Context, info AppInsightsInfo, podIdentity kedav1alpha1.AuthPodIdentity) auth.AuthorizerConfig {
// switch podIdentity.Provider {
// case "", kedav1alpha1.PodIdentityProviderNone:
// config := auth.NewClientCredentialsConfig(info.ClientID, info.ClientPassword, info.TenantID)
// config.Resource = info.AppInsightsResourceURL
// config.AADEndpoint = info.ActiveDirectoryEndpoint
// return config
// case kedav1alpha1.PodIdentityProviderAzureWorkload:
// return NewAzureADWorkloadIdentityConfig(ctx, podIdentity.GetIdentityID(), podIdentity.GetIdentityTenantID(), podIdentity.GetIdentityAuthorityHost(), info.AppInsightsResourceURL)
// }
// return nil
//}

func extractAppInsightValue(info AppInsightsInfo, metric ApplicationInsightsMetric) (float64, error) {
if _, ok := metric.Value[info.MetricID]; !ok {
return -1, fmt.Errorf("metric named %s not found in app insights response", info.MetricID)
Expand Down Expand Up @@ -108,13 +95,23 @@ func queryParamsForAppInsightsRequest(info AppInsightsInfo) (map[string]interfac
return queryParams, nil
}

// GetAzureAppInsightsMetricValue returns the value of an Azure App Insights metric, rounded to the nearest int
func GetAzureAppInsightsMetricValue(ctx context.Context, info AppInsightsInfo, podIdentity kedav1alpha1.AuthPodIdentity, ignoreNullValues bool) (float64, error) {
func getAuthConfig(ctx context.Context, info AppInsightsInfo, podIdentity kedav1alpha1.AuthPodIdentity) (AADToken, error) {
token := AADToken{}
var err error

switch podIdentity.Provider {
case "", kedav1alpha1.PodIdentityProviderNone:
token, err = GetAzureADWorkloadIdentityToken(ctx, info.ClientID, info.TenantID, info.ActiveDirectoryEndpoint, info.AppInsightsResourceURL)
case kedav1alpha1.PodIdentityProviderAzureWorkload:
token, err = GetAzureADWorkloadIdentityToken(ctx, podIdentity.GetIdentityID(), podIdentity.GetIdentityTenantID(), podIdentity.GetIdentityAuthorityHost(), info.AppInsightsResourceURL)
}

//config := getAuthConfig(ctx, info, podIdentity)
return token, err
}

token, err := GetAzureADWorkloadIdentityToken(ctx, info.ClientID, info.TenantID, "", info.AppInsightsResourceURL)
//MSAL get Token here instead of the config
// GetAzureAppInsightsMetricValue returns the value of an Azure App Insights metric, rounded to the nearest int
func GetAzureAppInsightsMetricValue(ctx context.Context, info AppInsightsInfo, podIdentity kedav1alpha1.AuthPodIdentity, ignoreNullValues bool) (float64, error) {
token, err := getAuthConfig(ctx, info, podIdentity)
if err != nil {
return -1, err
}
Expand Down
36 changes: 0 additions & 36 deletions pkg/scalers/azure/azure_app_insights_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package azure

import (
"context"
"testing"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
)

type testExtractAzAppInsightsTestData struct {
Expand Down Expand Up @@ -67,39 +64,6 @@ func TestAzGetAzureAppInsightsMetricValue(t *testing.T) {
}
}

type testAppInsightsAuthConfigTestData struct {
testName string
config string
info AppInsightsInfo
podIdentity kedav1alpha1.PodIdentityProvider
}

const (
msiConfig = "msiConfig"
clientCredentialsConfig = "clientCredentialsConfig"
workloadIdentityConfig = "workloadIdentityConfig"
)

var testAppInsightsAuthConfigData = []testAppInsightsAuthConfigTestData{
{"client credentials", clientCredentialsConfig, AppInsightsInfo{ClientID: "1234", ClientPassword: "pw", TenantID: "5678"}, ""},
{"client credentials - pod id none", clientCredentialsConfig, AppInsightsInfo{ClientID: "1234", ClientPassword: "pw", TenantID: "5678"}, kedav1alpha1.PodIdentityProviderNone},
{"azure workload identity", workloadIdentityConfig, AppInsightsInfo{}, kedav1alpha1.PodIdentityProviderAzureWorkload},
}

func TestAzAppInfoGetToken(t *testing.T) {
for _, testData := range testAppInsightsAuthConfigData {
authToken, err := GetAzureADWorkloadIdentityToken(context.TODO(), testData.info.ClientID, testData.info.TenantID, "", testData.info.AppInsightsResourceURL)

if err != nil {
t.Errorf("Test %v; Expected success but got error: %v", testData.testName, err)
}
if authToken.AccessToken == "" {
t.Errorf("Test %v; Expected token but got empty token: %v", testData.testName, authToken)
}
t.Logf("Test %v; data: %v, token: %v", testData.testName, testData.info, authToken)
}
}

type toISO8601TestData struct {
testName string
isError bool
Expand Down
29 changes: 23 additions & 6 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import (
"strings"
"time"

"github.com/AzureAD/microsoft-authentication-library-for-go/apps/confidential"
"github.com/go-logr/logr"
amqp "github.com/rabbitmq/amqp091-go"
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/metrics/pkg/apis/external_metrics"

"github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/scalers/azure"
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)
Expand Down Expand Up @@ -64,7 +64,7 @@ type rabbitMQScaler struct {
connection *amqp.Connection
channel *amqp.Channel
httpClient *http.Client
azureOAuth *azure.ADWorkloadIdentityTokenProvider
azureOAuth *confidential.Client
logger logr.Logger
}

Expand Down Expand Up @@ -409,15 +409,32 @@ func getJSON(ctx context.Context, s *rabbitMQScaler, url string) (queueInfo, err

if s.metadata.WorkloadIdentityResource != "" {
if s.azureOAuth == nil {
s.azureOAuth = azure.NewAzureADWorkloadIdentityTokenProvider(ctx, s.metadata.workloadIdentityClientID, s.metadata.workloadIdentityTenantID, s.metadata.workloadIdentityAuthorityHost, s.metadata.WorkloadIdentityResource)
cred, err := confidential.NewCredFromSecret(s.metadata.workloadIdentityClientID)
if err != nil {
return result, err
}

client, err := confidential.New(
fmt.Sprintf("%s%s/oauth2/token", s.metadata.workloadIdentityAuthorityHost, s.metadata.workloadIdentityTenantID),
s.metadata.workloadIdentityClientID,
cred,
)
if err != nil {
return result, err
}

s.azureOAuth = &client
}

err = s.azureOAuth.Refresh()
token, err := s.azureOAuth.AcquireTokenSilent(ctx, []string{s.metadata.WorkloadIdentityResource})
if err != nil {
return result, err
token, err = s.azureOAuth.AcquireTokenByCredential(ctx, []string{s.metadata.WorkloadIdentityResource})
if err != nil {
return result, err
}
}

request.Header.Set("Authorization", "Bearer "+s.azureOAuth.OAuthToken())
request.Header.Set("Authorization", "Bearer "+token.AccessToken)
}

r, err := s.httpClient.Do(request)
Expand Down
58 changes: 0 additions & 58 deletions vendor/github.com/Azure/azure-amqp-common-go/v4/auth/token.go

This file was deleted.

Loading

0 comments on commit 03aecf1

Please sign in to comment.