From c43af597fc94c8ffa8f444557d2647fc2eb1c318 Mon Sep 17 00:00:00 2001 From: Viet Nguyen Duc Date: Wed, 4 Dec 2024 08:11:43 +0000 Subject: [PATCH 1/4] fix: Selenium Grid scaler exposes sum of pending and ongoing sessions to KDEA (#6368) --- CHANGELOG.md | 1 + pkg/scalers/selenium_grid_scaler.go | 95 +++++++------ pkg/scalers/selenium_grid_scaler_test.go | 168 +++++++++++++---------- 3 files changed, 140 insertions(+), 124 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 52538a16f71..70fabdf5152 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -76,6 +76,7 @@ Here is an overview of all new **experimental** features: - **General**: Centralize and improve automaxprocs configuration with proper structured logging ([#5970](https://github.com/kedacore/keda/issues/5970)) - **General**: Paused ScaledObject count is reported correctly after operator restart ([#6321](https://github.com/kedacore/keda/issues/6321)) - **General**: ScaledJobs ready status set to true when recoverred problem ([#6329](https://github.com/kedacore/keda/pull/6329)) +- **Selenium Grid Scaler**: Exposes sum of pending and ongoing sessions to KDEA ([#6368](https://github.com/kedacore/keda/pull/6368)) ### Deprecations diff --git a/pkg/scalers/selenium_grid_scaler.go b/pkg/scalers/selenium_grid_scaler.go index 057181c87e7..bb18d7c9762 100644 --- a/pkg/scalers/selenium_grid_scaler.go +++ b/pkg/scalers/selenium_grid_scaler.go @@ -36,10 +36,10 @@ type seleniumGridScalerMetadata struct { BrowserName string `keda:"name=browserName, order=triggerMetadata"` SessionBrowserName string `keda:"name=sessionBrowserName, order=triggerMetadata, optional"` ActivationThreshold int64 `keda:"name=activationThreshold, order=triggerMetadata, optional"` - BrowserVersion string `keda:"name=browserVersion, order=triggerMetadata, optional, default=latest"` - UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, optional, default=false"` - PlatformName string `keda:"name=platformName, order=triggerMetadata, optional, default=linux"` - NodeMaxSessions int `keda:"name=nodeMaxSessions, order=triggerMetadata, optional, default=1"` + BrowserVersion string `keda:"name=browserVersion, order=triggerMetadata, default=latest"` + UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"` + PlatformName string `keda:"name=platformName, order=triggerMetadata, default=linux"` + NodeMaxSessions int64 `keda:"name=nodeMaxSessions, order=triggerMetadata, default=1"` TargetValue int64 } @@ -55,9 +55,9 @@ type Data struct { } type Grid struct { - SessionCount int `json:"sessionCount"` - MaxSession int `json:"maxSession"` - TotalSlots int `json:"totalSlots"` + SessionCount int64 `json:"sessionCount"` + MaxSession int64 `json:"maxSession"` + TotalSlots int64 `json:"totalSlots"` } type NodesInfo struct { @@ -71,17 +71,17 @@ type SessionsInfo struct { type Nodes []struct { ID string `json:"id"` Status string `json:"status"` - SessionCount int `json:"sessionCount"` - MaxSession int `json:"maxSession"` - SlotCount int `json:"slotCount"` + SessionCount int64 `json:"sessionCount"` + MaxSession int64 `json:"maxSession"` + SlotCount int64 `json:"slotCount"` Stereotypes string `json:"stereotypes"` Sessions Sessions `json:"sessions"` } type ReservedNodes struct { ID string `json:"id"` - MaxSession int `json:"maxSession"` - SlotCount int `json:"slotCount"` + MaxSession int64 `json:"maxSession"` + SlotCount int64 `json:"slotCount"` } type Sessions []struct { @@ -102,7 +102,7 @@ type Capability struct { } type Stereotypes []struct { - Slots int `json:"slots"` + Slots int64 `json:"slots"` Stereotype Capability `json:"stereotype"` } @@ -148,6 +148,7 @@ func parseSeleniumGridScalerMetadata(config *scalersconfig.ScalerConfig) (*selen if meta.SessionBrowserName == "" { meta.SessionBrowserName = meta.BrowserName } + return meta, nil } @@ -160,18 +161,18 @@ func (s *seleniumGridScaler) Close(context.Context) error { } func (s *seleniumGridScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { - sessions, err := s.getSessionsCount(ctx, s.logger) + newRequestNodes, onGoingSessions, err := s.getSessionsQueueLength(ctx, s.logger) if err != nil { return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error requesting selenium grid endpoint: %w", err) } - metric := GenerateMetricInMili(metricName, float64(sessions)) + metric := GenerateMetricInMili(metricName, float64(newRequestNodes+onGoingSessions)) - return []external_metrics.ExternalMetricValue{metric}, sessions > s.metadata.ActivationThreshold, nil + return []external_metrics.ExternalMetricValue{metric}, (newRequestNodes + onGoingSessions) > s.metadata.ActivationThreshold, nil } func (s *seleniumGridScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { - metricName := kedautil.NormalizeString(fmt.Sprintf("seleniumgrid-%s", s.metadata.BrowserName)) + metricName := kedautil.NormalizeString(fmt.Sprintf("selenium-grid-%s-%s-%s", s.metadata.BrowserName, s.metadata.BrowserVersion, s.metadata.PlatformName)) externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName), @@ -184,18 +185,18 @@ func (s *seleniumGridScaler) GetMetricSpecForScaling(context.Context) []v2.Metri return []v2.MetricSpec{metricSpec} } -func (s *seleniumGridScaler) getSessionsCount(ctx context.Context, logger logr.Logger) (int64, error) { +func (s *seleniumGridScaler) getSessionsQueueLength(ctx context.Context, logger logr.Logger) (int64, int64, error) { body, err := json.Marshal(map[string]string{ "query": "{ grid { sessionCount, maxSession, totalSlots }, nodesInfo { nodes { id, status, sessionCount, maxSession, slotCount, stereotypes, sessions { id, capabilities, slot { id, stereotype } } } }, sessionsInfo { sessionQueueRequests } }", }) if err != nil { - return -1, err + return -1, -1, err } req, err := http.NewRequestWithContext(ctx, "POST", s.metadata.URL, bytes.NewBuffer(body)) if err != nil { - return -1, err + return -1, -1, err } if (s.metadata.AuthType == "" || strings.EqualFold(s.metadata.AuthType, "Basic")) && s.metadata.Username != "" && s.metadata.Password != "" { @@ -206,28 +207,28 @@ func (s *seleniumGridScaler) getSessionsCount(ctx context.Context, logger logr.L res, err := s.httpClient.Do(req) if err != nil { - return -1, err + return -1, -1, err } if res.StatusCode != http.StatusOK { msg := fmt.Sprintf("selenium grid returned %d", res.StatusCode) - return -1, errors.New(msg) + return -1, -1, errors.New(msg) } defer res.Body.Close() b, err := io.ReadAll(res.Body) if err != nil { - return -1, err + return -1, -1, err } - v, err := getCountFromSeleniumResponse(b, s.metadata.BrowserName, s.metadata.BrowserVersion, s.metadata.SessionBrowserName, s.metadata.PlatformName, s.metadata.NodeMaxSessions, logger) + newRequestNodes, onGoingSession, err := getCountFromSeleniumResponse(b, s.metadata.BrowserName, s.metadata.BrowserVersion, s.metadata.SessionBrowserName, s.metadata.PlatformName, s.metadata.NodeMaxSessions, logger) if err != nil { - return -1, err + return -1, -1, err } - return v, nil + return newRequestNodes, onGoingSession, nil } -func countMatchingSlotsStereotypes(stereotypes Stereotypes, request Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string) int { - var matchingSlots int +func countMatchingSlotsStereotypes(stereotypes Stereotypes, request Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string) int64 { + var matchingSlots int64 for _, stereotype := range stereotypes { if checkCapabilitiesMatch(stereotype.Stereotype, request, browserName, browserVersion, sessionBrowserName, platformName) { matchingSlots += stereotype.Slots @@ -236,8 +237,8 @@ func countMatchingSlotsStereotypes(stereotypes Stereotypes, request Capability, return matchingSlots } -func countMatchingSessions(sessions Sessions, request Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string, logger logr.Logger) int { - var matchingSessions int +func countMatchingSessions(sessions Sessions, request Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string, logger logr.Logger) int64 { + var matchingSessions int64 for _, session := range sessions { var capability = Capability{} if err := json.Unmarshal([]byte(session.Capabilities), &capability); err == nil { @@ -274,7 +275,7 @@ func checkCapabilitiesMatch(capability Capability, requestCapability Capability, return browserNameMatches && browserVersionMatches && platformNameMatches } -func checkNodeReservedSlots(reservedNodes []ReservedNodes, nodeID string, availableSlots int) int { +func checkNodeReservedSlots(reservedNodes []ReservedNodes, nodeID string, availableSlots int64) int64 { for _, reservedNode := range reservedNodes { if strings.EqualFold(reservedNode.ID, nodeID) { return reservedNode.SlotCount @@ -283,7 +284,7 @@ func checkNodeReservedSlots(reservedNodes []ReservedNodes, nodeID string, availa return availableSlots } -func updateOrAddReservedNode(reservedNodes []ReservedNodes, nodeID string, slotCount int, maxSession int) []ReservedNodes { +func updateOrAddReservedNode(reservedNodes []ReservedNodes, nodeID string, slotCount int64, maxSession int64) []ReservedNodes { for i, reservedNode := range reservedNodes { if strings.EqualFold(reservedNode.ID, nodeID) { // Update remaining available slots for the reserved node @@ -295,17 +296,15 @@ func updateOrAddReservedNode(reservedNodes []ReservedNodes, nodeID string, slotC return append(reservedNodes, ReservedNodes{ID: nodeID, SlotCount: slotCount, MaxSession: maxSession}) } -func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion string, sessionBrowserName string, platformName string, nodeMaxSessions int, logger logr.Logger) (int64, error) { - // The returned count of the number of new Nodes will be scaled up - var count int64 +func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion string, sessionBrowserName string, platformName string, nodeMaxSessions int64, logger logr.Logger) (int64, int64, error) { // Track number of available slots of existing Nodes in the Grid can be reserved for the matched requests - var availableSlots int + var availableSlots int64 // Track number of matched requests in the sessions queue will be served by this scaler - var queueSlots int + var queueSlots int64 var seleniumResponse = SeleniumResponse{} if err := json.Unmarshal(b, &seleniumResponse); err != nil { - return 0, err + return 0, 0, err } var sessionQueueRequests = seleniumResponse.Data.SessionsInfo.SessionQueueRequests @@ -314,6 +313,7 @@ func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion s var reservedNodes []ReservedNodes // Track list of new Nodes will be scaled up with number of available slots following scaler parameter `nodeMaxSessions` var newRequestNodes []ReservedNodes + var onGoingSessions int64 for requestIndex, sessionQueueRequest := range sessionQueueRequests { var isRequestMatched bool var requestCapability = Capability{} @@ -332,20 +332,22 @@ func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion s } var isRequestReserved bool + var sumOfCurrentSessionsMatch int64 // Check if the matched request can be assigned to available slots of existing Nodes in the Grid for _, node := range nodes { + // Count ongoing sessions that match the request capability and scaler metadata + var currentSessionsMatch = countMatchingSessions(node.Sessions, requestCapability, browserName, browserVersion, sessionBrowserName, platformName, logger) + sumOfCurrentSessionsMatch += currentSessionsMatch // Check if node is UP and has available slots (maxSession > sessionCount) if strings.EqualFold(node.Status, "UP") && checkNodeReservedSlots(reservedNodes, node.ID, node.MaxSession-node.SessionCount) > 0 { var stereotypes = Stereotypes{} - var availableSlotsMatch int + var availableSlotsMatch int64 if err := json.Unmarshal([]byte(node.Stereotypes), &stereotypes); err == nil { // Count available slots that match the request capability and scaler metadata availableSlotsMatch += countMatchingSlotsStereotypes(stereotypes, requestCapability, browserName, browserVersion, sessionBrowserName, platformName) } else { logger.Error(err, fmt.Sprintf("Error when unmarshaling node stereotypes: %s", err)) } - // Count ongoing sessions that match the request capability and scaler metadata - var currentSessionsMatch = countMatchingSessions(node.Sessions, requestCapability, browserName, browserVersion, sessionBrowserName, platformName, logger) // Count remaining available slots can be reserved for this request var availableSlotsCanBeReserved = checkNodeReservedSlots(reservedNodes, node.ID, node.MaxSession-node.SessionCount) // Reserve one available slot for the request if available slots match is greater than current sessions match @@ -357,6 +359,9 @@ func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion s } } } + if sumOfCurrentSessionsMatch > onGoingSessions { + onGoingSessions = sumOfCurrentSessionsMatch + } // Check if the matched request can be assigned to available slots of new Nodes will be scaled up, since the scaler parameter `nodeMaxSessions` can be greater than 1 if !isRequestReserved { for _, newRequestNode := range newRequestNodes { @@ -373,11 +378,5 @@ func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion s } } - if queueSlots > availableSlots { - count = int64(len(newRequestNodes)) - } else { - count = 0 - } - - return count, nil + return int64(len(newRequestNodes)), onGoingSessions, nil } diff --git a/pkg/scalers/selenium_grid_scaler_test.go b/pkg/scalers/selenium_grid_scaler_test.go index b92936cbe7f..6613be242ca 100644 --- a/pkg/scalers/selenium_grid_scaler_test.go +++ b/pkg/scalers/selenium_grid_scaler_test.go @@ -16,13 +16,14 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { sessionBrowserName string browserVersion string platformName string - nodeMaxSessions int + nodeMaxSessions int64 } tests := []struct { - name string - args args - want int64 - wantErr bool + name string + args args + wantNewRequestNodes int64 + wantOnGoingSessions int64 + wantErr bool }{ { name: "nil response body should throw error", @@ -61,8 +62,8 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { `), browserName: "", }, - want: 0, - wantErr: false, + wantNewRequestNodes: 0, + wantErr: false, }, { name: "12 sessionQueueRequests with 4 requests matching browserName chrome should return count as 4", @@ -101,8 +102,8 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "latest", platformName: "linux", }, - want: 4, - wantErr: false, + wantNewRequestNodes: 4, + wantErr: false, }, { name: "2 sessionQueueRequests and 1 available nodeStereotypes with matching browserName firefox should return count as 1", @@ -276,8 +277,9 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "latest", platformName: "linux", }, - want: 1, - wantErr: false, + wantNewRequestNodes: 1, + wantOnGoingSessions: 4, + wantErr: false, }, { name: "1 sessionQueueRequests and 1 available nodeStereotypes with matching browserName chrome should return count as 0", @@ -325,8 +327,8 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "latest", platformName: "linux", }, - want: 0, - wantErr: false, + wantNewRequestNodes: 0, + wantErr: false, }, { name: "1 sessionQueueRequests Linux and 1 available nodeStereotypes Windows with matching browserName chrome should return count as 1", @@ -374,8 +376,8 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "latest", platformName: "linux", }, - want: 1, - wantErr: false, + wantNewRequestNodes: 1, + wantErr: false, }, { name: "scaler browserVersion is latest, 2 sessionQueueRequests wihtout browserVersion, 2 available nodeStereotypes with different versions and platforms, should return count as 1", @@ -422,8 +424,8 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "latest", platformName: "linux", }, - want: 1, - wantErr: false, + wantNewRequestNodes: 1, + wantErr: false, }, { name: "scaler browserVersion is latest, 5 sessionQueueRequests wihtout browserVersion also 1 different platformName, 1 available nodeStereotypes with 3 slots Linux and 1 node Windows, should return count as 1", @@ -473,8 +475,8 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "latest", platformName: "linux", }, - want: 1, - wantErr: false, + wantNewRequestNodes: 1, + wantErr: false, }, { name: "queue request with browserName browserVersion and browserVersion but no available nodes should return count as 1", @@ -516,8 +518,8 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "91.0", platformName: "linux", }, - want: 1, - wantErr: false, + wantNewRequestNodes: 1, + wantErr: false, }, { name: "1 queue request with browserName browserVersion and browserVersion but 2 nodes without available slots should return count as 1", @@ -573,8 +575,9 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "91.0", platformName: "linux", }, - want: 1, - wantErr: false, + wantNewRequestNodes: 1, + wantOnGoingSessions: 2, + wantErr: false, }, { name: "2 session queue with matching browsername and browserversion of 2 available slots should return count as 0", @@ -621,8 +624,8 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "91.0", platformName: "linux", }, - want: 0, - wantErr: false, + wantNewRequestNodes: 0, + wantErr: false, }, { name: "2 queue requests with browserName browserVersion and platformName matching 2 available slots on 2 different nodes should return count as 0", @@ -679,8 +682,9 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "91.0", platformName: "linux", }, - want: 0, - wantErr: false, + wantNewRequestNodes: 0, + wantOnGoingSessions: 2, + wantErr: false, }, { name: "1 queue request with browserName browserVersion and platformName matching 1 available slot on node has 3 max sessions should return count as 0", @@ -726,8 +730,9 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "91.0", platformName: "linux", }, - want: 0, - wantErr: false, + wantNewRequestNodes: 0, + wantOnGoingSessions: 2, + wantErr: false, }, { name: "3 queue requests with browserName browserVersion and platformName but 2 running nodes are busy should return count as 3", @@ -785,8 +790,9 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "91.0", platformName: "linux", }, - want: 3, - wantErr: false, + wantNewRequestNodes: 3, + wantOnGoingSessions: 2, + wantErr: false, }, { name: "3 queue requests with browserName browserVersion and platformName but 2 running nodes are busy with different versions should return count as 3", @@ -844,8 +850,9 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "latest", platformName: "linux", }, - want: 3, - wantErr: false, + wantNewRequestNodes: 3, + wantOnGoingSessions: 2, + wantErr: false, }, { name: "3 queue requests with browserName and platformName but 2 running nodes are busy with different versions should return count as 3", @@ -903,8 +910,9 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "latest", platformName: "linux", }, - want: 3, - wantErr: false, + wantNewRequestNodes: 3, + wantOnGoingSessions: 2, + wantErr: false, }, { name: "1 active session with matching browsername and version should return count as 2", @@ -947,8 +955,9 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "91.0", platformName: "linux", }, - want: 2, - wantErr: false, + wantNewRequestNodes: 2, + wantOnGoingSessions: 1, + wantErr: false, }, { name: "1 request without browserName and browserVersion stable can be match any available node should return count as 0", @@ -985,8 +994,8 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "latest", platformName: "linux", }, - want: 0, - wantErr: false, + wantNewRequestNodes: 0, + wantErr: false, }, { name: "1 request without browserName and browserVersion stable should return count as 1", @@ -1028,8 +1037,9 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "latest", platformName: "linux", }, - want: 1, - wantErr: false, + wantNewRequestNodes: 1, + wantOnGoingSessions: 1, + wantErr: false, }, { name: "2 queue requests with browserName in string match node stereotype and scaler metadata browserVersion should return count as 1", @@ -1072,8 +1082,9 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "dev", platformName: "linux", }, - want: 1, - wantErr: false, + wantNewRequestNodes: 1, + wantOnGoingSessions: 1, + wantErr: false, }, { name: "2 queue requests with matching browsername/sessionBrowserName but 1 node is busy should return count as 2", @@ -1116,8 +1127,9 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "91.0", platformName: "linux", }, - want: 2, - wantErr: false, + wantNewRequestNodes: 2, + wantOnGoingSessions: 1, + wantErr: false, }, { name: "2 queue requests with matching browsername/sessionBrowserName and 1 node is is available should return count as 1", @@ -1155,8 +1167,8 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "91.0", platformName: "linux", }, - want: 1, - wantErr: false, + wantNewRequestNodes: 1, + wantErr: false, }, { name: "2 queue requests with platformName and without platformName and node with 1 slot available should return count as 1", args: args{ @@ -1198,8 +1210,9 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "91.0", platformName: "Windows 11", }, - want: 1, - wantErr: false, + wantNewRequestNodes: 1, + wantOnGoingSessions: 1, + wantErr: false, }, { name: "1 active msedge session while asking for 2 chrome sessions should return a count of 2", @@ -1242,8 +1255,8 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "latest", platformName: "linux", }, - want: 2, - wantErr: false, + wantNewRequestNodes: 2, + wantErr: false, }, { name: "3 queue requests browserName chrome platformName linux but 1 node has maxSessions=3 with browserName msedge should return a count of 3", @@ -1287,8 +1300,8 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "latest", platformName: "linux", }, - want: 3, - wantErr: false, + wantNewRequestNodes: 3, + wantErr: false, }, { name: "session request with matching browsername and no specific platformName should return count as 2", @@ -1316,8 +1329,8 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "latest", platformName: "", }, - want: 2, - wantErr: false, + wantNewRequestNodes: 2, + wantErr: false, }, { name: "2 queue requests with 1 matching browsername and platformName and 1 existing slot is available should return count as 0", @@ -1355,8 +1368,8 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "latest", platformName: "Windows 11", }, - want: 0, - wantErr: false, + wantNewRequestNodes: 0, + wantErr: false, }, { name: "2 queue requests with 1 request matching browserName and platformName but 1 existing node is busy should return count as 1", @@ -1403,8 +1416,9 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "91.0", platformName: "Windows 11", }, - want: 1, - wantErr: false, + wantNewRequestNodes: 1, + wantOnGoingSessions: 1, + wantErr: false, }, { name: "5 queue requests with scaler parameter nodeMaxSessions is 2 should return count as 3", @@ -1437,8 +1451,8 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { platformName: "linux", nodeMaxSessions: 2, }, - want: 3, - wantErr: false, + wantNewRequestNodes: 3, + wantErr: false, }, { name: "5 queue requests with scaler parameter nodeMaxSessions is 3 should return count as 2", @@ -1471,8 +1485,8 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { platformName: "linux", nodeMaxSessions: 3, }, - want: 2, - wantErr: false, + wantNewRequestNodes: 2, + wantErr: false, }, { name: "5 queue requests with request matching browserName and platformName and scaler param nodeMaxSessions is 3 and existing node with 1 available slot should return count as 2", @@ -1523,8 +1537,9 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { platformName: "linux", nodeMaxSessions: 3, }, - want: 2, - wantErr: false, + wantNewRequestNodes: 2, + wantOnGoingSessions: 2, + wantErr: false, }, // Tests from PR: https://github.com/kedacore/keda/pull/6055 { @@ -1563,8 +1578,8 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "latest", platformName: "linux", }, - want: 0, - wantErr: false, + wantNewRequestNodes: 0, + wantErr: false, }, { name: "4 sessions requests with matching browsername and platformName when setSessionsFromHub turned on and node with 2 slots matches should return count as 2", @@ -1605,8 +1620,8 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "latest", platformName: "linux", }, - want: 2, - wantErr: false, + wantNewRequestNodes: 2, + wantErr: false, }, { name: "4 sessions requests with matching browsername and platformName when setSessionsFromHub turned on, no nodes and sessionsPerNode=2 matches should return count as 2", @@ -1637,8 +1652,8 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { platformName: "linux", nodeMaxSessions: 2, }, - want: 2, - wantErr: false, + wantNewRequestNodes: 2, + wantErr: false, }, { name: "sessions requests and active sessions with 1 matching browsername, platformName and sessionBrowserVersion should return count as 1", @@ -1687,19 +1702,20 @@ func Test_getCountFromSeleniumResponse(t *testing.T) { browserVersion: "91.0.4472.114", platformName: "linux", }, - want: 1, - wantErr: false, + wantNewRequestNodes: 1, + wantOnGoingSessions: 2, + wantErr: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := getCountFromSeleniumResponse(tt.args.b, tt.args.browserName, tt.args.browserVersion, tt.args.sessionBrowserName, tt.args.platformName, tt.args.nodeMaxSessions, logr.Discard()) + newRequestNodes, onGoingSessions, err := getCountFromSeleniumResponse(tt.args.b, tt.args.browserName, tt.args.browserVersion, tt.args.sessionBrowserName, tt.args.platformName, tt.args.nodeMaxSessions, logr.Discard()) if (err != nil) != tt.wantErr { t.Errorf("getCountFromSeleniumResponse() error = %v, wantErr %v", err, tt.wantErr) return } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("getCountFromSeleniumResponse() = %v, want %v", got, tt.want) + if !reflect.DeepEqual(newRequestNodes, tt.wantNewRequestNodes) || !reflect.DeepEqual(onGoingSessions, tt.wantOnGoingSessions) { + t.Errorf("getCountFromSeleniumResponse() = [%v, %v], want [%v, %v]", newRequestNodes, onGoingSessions, tt.wantNewRequestNodes, tt.wantOnGoingSessions) } }) } From 1eaa34c724b1d1c85d28971c7bfc76b7d6bd5591 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maksymilian=20Bogu=C5=84?= Date: Wed, 4 Dec 2024 09:44:22 +0100 Subject: [PATCH 2/4] Add AWS region to the AWS Config Cache key (#6134) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Introduce aws region into the AWS config cache Signed-off-by: Maksymilian Boguń * add CHANGELOG entry Signed-off-by: Maksymilian Boguń * embedded AWS region into Authorization metadata Signed-off-by: Maksymilian Boguń * move the fix to Unreleased version Signed-off-by: Maksymilian Boguń * Fix indentation Signed-off-by: Maksymilian Boguń --------- Signed-off-by: Maksymilian Boguń Signed-off-by: Jan Wozniak Co-authored-by: Jorge Turrado Ferrero Co-authored-by: Jan Wozniak --- CHANGELOG.md | 1 + pkg/scalers/apache_kafka_scaler.go | 2 +- pkg/scalers/aws/aws_authorization.go | 2 + pkg/scalers/aws/aws_common.go | 31 ++++---- pkg/scalers/aws/aws_config_cache.go | 10 +-- pkg/scalers/aws/aws_config_cache_test.go | 77 +++++++++++-------- pkg/scalers/aws/aws_sigv4.go | 18 ++--- pkg/scalers/aws_cloudwatch_scaler.go | 2 +- pkg/scalers/aws_dynamodb_scaler.go | 2 +- pkg/scalers/aws_dynamodb_streams_scaler.go | 2 +- pkg/scalers/aws_kinesis_stream_scaler.go | 2 +- pkg/scalers/aws_sqs_queue_scaler.go | 2 +- pkg/scalers/kafka_scaler.go | 2 +- .../resolver/aws_secretmanager_handler.go | 3 +- 14 files changed, 79 insertions(+), 77 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 70fabdf5152..266aea0115b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -76,6 +76,7 @@ Here is an overview of all new **experimental** features: - **General**: Centralize and improve automaxprocs configuration with proper structured logging ([#5970](https://github.com/kedacore/keda/issues/5970)) - **General**: Paused ScaledObject count is reported correctly after operator restart ([#6321](https://github.com/kedacore/keda/issues/6321)) - **General**: ScaledJobs ready status set to true when recoverred problem ([#6329](https://github.com/kedacore/keda/pull/6329)) +- **AWS Scalers**: Add AWS region to the AWS Config Cache key ([#6128](https://github.com/kedacore/keda/issues/6128)) - **Selenium Grid Scaler**: Exposes sum of pending and ongoing sessions to KDEA ([#6368](https://github.com/kedacore/keda/pull/6368)) ### Deprecations diff --git a/pkg/scalers/apache_kafka_scaler.go b/pkg/scalers/apache_kafka_scaler.go index 8704531e28a..95ca5faf155 100644 --- a/pkg/scalers/apache_kafka_scaler.go +++ b/pkg/scalers/apache_kafka_scaler.go @@ -220,7 +220,7 @@ func getApacheKafkaClient(ctx context.Context, metadata apacheKafkaMetadata, log case KafkaSASLTypeOAuthbearer: return nil, errors.New("SASL/OAUTHBEARER is not implemented yet") case KafkaSASLTypeMskIam: - cfg, err := awsutils.GetAwsConfig(ctx, metadata.AWSRegion, metadata.AWSAuthorization) + cfg, err := awsutils.GetAwsConfig(ctx, metadata.AWSAuthorization) if err != nil { return nil, err } diff --git a/pkg/scalers/aws/aws_authorization.go b/pkg/scalers/aws/aws_authorization.go index fd49c2f8995..8fed034553c 100644 --- a/pkg/scalers/aws/aws_authorization.go +++ b/pkg/scalers/aws/aws_authorization.go @@ -23,6 +23,8 @@ type AuthorizationMetadata struct { AwsSecretAccessKey string AwsSessionToken string + AwsRegion string + // Deprecated PodIdentityOwner bool // Pod identity owner is confusing and it'll be removed when we get diff --git a/pkg/scalers/aws/aws_common.go b/pkg/scalers/aws/aws_common.go index ed98e2b219d..30f25e98a4a 100644 --- a/pkg/scalers/aws/aws_common.go +++ b/pkg/scalers/aws/aws_common.go @@ -39,43 +39,33 @@ import ( // ErrAwsNoAccessKey is returned when awsAccessKeyID is missing. var ErrAwsNoAccessKey = errors.New("awsAccessKeyID not found") -type awsConfigMetadata struct { - awsRegion string - awsAuthorization AuthorizationMetadata -} - var awsSharedCredentialsCache = newSharedConfigsCache() // GetAwsConfig returns an *aws.Config for a given AuthorizationMetadata // If AuthorizationMetadata uses static credentials or `aws` auth, // we recover the *aws.Config from the shared cache. If not, we generate // a new entry on each request -func GetAwsConfig(ctx context.Context, awsRegion string, awsAuthorization AuthorizationMetadata) (*aws.Config, error) { - metadata := &awsConfigMetadata{ - awsRegion: awsRegion, - awsAuthorization: awsAuthorization, - } - - if metadata.awsAuthorization.UsingPodIdentity || - (metadata.awsAuthorization.AwsAccessKeyID != "" && metadata.awsAuthorization.AwsSecretAccessKey != "") { - return awsSharedCredentialsCache.GetCredentials(ctx, metadata.awsRegion, metadata.awsAuthorization) +func GetAwsConfig(ctx context.Context, awsAuthorization AuthorizationMetadata) (*aws.Config, error) { + if awsAuthorization.UsingPodIdentity || + (awsAuthorization.AwsAccessKeyID != "" && awsAuthorization.AwsSecretAccessKey != "") { + return awsSharedCredentialsCache.GetCredentials(ctx, awsAuthorization) } // TODO, remove when aws-eks are removed configOptions := make([]func(*config.LoadOptions) error, 0) - configOptions = append(configOptions, config.WithRegion(metadata.awsRegion)) + configOptions = append(configOptions, config.WithRegion(awsAuthorization.AwsRegion)) cfg, err := config.LoadDefaultConfig(ctx, configOptions...) if err != nil { return nil, err } - if !metadata.awsAuthorization.PodIdentityOwner { + if !awsAuthorization.PodIdentityOwner { return &cfg, nil } - if metadata.awsAuthorization.AwsRoleArn != "" { + if awsAuthorization.AwsRoleArn != "" { stsSvc := sts.NewFromConfig(cfg) - stsCredentialProvider := stscreds.NewAssumeRoleProvider(stsSvc, metadata.awsAuthorization.AwsRoleArn, func(_ *stscreds.AssumeRoleOptions) {}) + stsCredentialProvider := stscreds.NewAssumeRoleProvider(stsSvc, awsAuthorization.AwsRoleArn, func(_ *stscreds.AssumeRoleOptions) {}) cfg.Credentials = aws.NewCredentialsCache(stsCredentialProvider) } return &cfg, err @@ -88,6 +78,10 @@ func GetAwsAuthorization(uniqueKey string, podIdentity kedav1alpha1.AuthPodIdent TriggerUniqueKey: uniqueKey, } + if val, ok := authParams["awsRegion"]; ok && val != "" { + meta.AwsRegion = val + } + if podIdentity.Provider == kedav1alpha1.PodIdentityProviderAws { meta.UsingPodIdentity = true if val, ok := authParams["awsRoleArn"]; ok && val != "" { @@ -95,6 +89,7 @@ func GetAwsAuthorization(uniqueKey string, podIdentity kedav1alpha1.AuthPodIdent } return meta, nil } + // TODO, remove all the logic below and just keep the logic for // parsing awsAccessKeyID, awsSecretAccessKey and awsSessionToken // when aws-eks are removed diff --git a/pkg/scalers/aws/aws_config_cache.go b/pkg/scalers/aws/aws_config_cache.go index 684e45c743b..b43056b423b 100644 --- a/pkg/scalers/aws/aws_config_cache.go +++ b/pkg/scalers/aws/aws_config_cache.go @@ -69,11 +69,11 @@ func newSharedConfigsCache() sharedConfigCache { // getCacheKey returns a unique key based on given AuthorizationMetadata. // As it can contain sensitive data, the key is hashed to not expose secrets func (a *sharedConfigCache) getCacheKey(awsAuthorization AuthorizationMetadata) string { - key := "keda" + key := "keda-" + awsAuthorization.AwsRegion if awsAuthorization.AwsAccessKeyID != "" { - key = fmt.Sprintf("%s-%s-%s", awsAuthorization.AwsAccessKeyID, awsAuthorization.AwsSecretAccessKey, awsAuthorization.AwsSessionToken) + key = fmt.Sprintf("%s-%s-%s-%s", awsAuthorization.AwsAccessKeyID, awsAuthorization.AwsSecretAccessKey, awsAuthorization.AwsSessionToken, awsAuthorization.AwsRegion) } else if awsAuthorization.AwsRoleArn != "" { - key = awsAuthorization.AwsRoleArn + key = fmt.Sprintf("%s-%s", awsAuthorization.AwsRoleArn, awsAuthorization.AwsRegion) } // to avoid sensitive data as key and to use a constant key size, // we hash the key with sha3 @@ -86,7 +86,7 @@ func (a *sharedConfigCache) getCacheKey(awsAuthorization AuthorizationMetadata) // sharing it between all the requests. To track if the *aws.Config is used by whom, // every time when an scaler requests *aws.Config we register it inside // the cached item. -func (a *sharedConfigCache) GetCredentials(ctx context.Context, awsRegion string, awsAuthorization AuthorizationMetadata) (*aws.Config, error) { +func (a *sharedConfigCache) GetCredentials(ctx context.Context, awsAuthorization AuthorizationMetadata) (*aws.Config, error) { a.Lock() defer a.Unlock() key := a.getCacheKey(awsAuthorization) @@ -97,7 +97,7 @@ func (a *sharedConfigCache) GetCredentials(ctx context.Context, awsRegion string } configOptions := make([]func(*config.LoadOptions) error, 0) - configOptions = append(configOptions, config.WithRegion(awsRegion)) + configOptions = append(configOptions, config.WithRegion(awsAuthorization.AwsRegion)) cfg, err := config.LoadDefaultConfig(ctx, configOptions...) if err != nil { return nil, err diff --git a/pkg/scalers/aws/aws_config_cache_test.go b/pkg/scalers/aws/aws_config_cache_test.go index d94247a6fee..81abbdca6c7 100644 --- a/pkg/scalers/aws/aws_config_cache_test.go +++ b/pkg/scalers/aws/aws_config_cache_test.go @@ -28,84 +28,95 @@ import ( func TestGetCredentialsReturnNewItemAndStoreItIfNotExist(t *testing.T) { cache := newSharedConfigsCache() cache.logger = logr.Discard() - config := awsConfigMetadata{ - awsRegion: "test-region", - awsAuthorization: AuthorizationMetadata{ - TriggerUniqueKey: "test-key", - }, + awsAuthorization := AuthorizationMetadata{ + TriggerUniqueKey: "test-key", + AwsRegion: "test-region", } - cacheKey := cache.getCacheKey(config.awsAuthorization) - _, err := cache.GetCredentials(context.Background(), config.awsRegion, config.awsAuthorization) + cacheKey := cache.getCacheKey(awsAuthorization) + _, err := cache.GetCredentials(context.Background(), awsAuthorization) assert.NoError(t, err) assert.Contains(t, cache.items, cacheKey) - assert.Contains(t, cache.items[cacheKey].usages, config.awsAuthorization.TriggerUniqueKey) + assert.Contains(t, cache.items[cacheKey].usages, awsAuthorization.TriggerUniqueKey) } func TestGetCredentialsReturnCachedItemIfExist(t *testing.T) { cache := newSharedConfigsCache() cache.logger = logr.Discard() - config := awsConfigMetadata{ - awsRegion: "test1-region", - awsAuthorization: AuthorizationMetadata{ - TriggerUniqueKey: "test1-key", - }, + awsAuthorization := AuthorizationMetadata{ + TriggerUniqueKey: "test1-key", + AwsRegion: "test1-region", } cfg := aws.Config{} cfg.AppID = "test1-app" - cacheKey := cache.getCacheKey(config.awsAuthorization) + cacheKey := cache.getCacheKey(awsAuthorization) cache.items[cacheKey] = cacheEntry{ config: &cfg, usages: map[string]bool{ "other-usage": true, }, } - configFromCache, err := cache.GetCredentials(context.Background(), config.awsRegion, config.awsAuthorization) + configFromCache, err := cache.GetCredentials(context.Background(), awsAuthorization) assert.NoError(t, err) assert.Equal(t, &cfg, configFromCache) - assert.Contains(t, cache.items[cacheKey].usages, config.awsAuthorization.TriggerUniqueKey) + assert.Contains(t, cache.items[cacheKey].usages, awsAuthorization.TriggerUniqueKey) } func TestRemoveCachedEntryRemovesCachedItemIfNotUsages(t *testing.T) { cache := newSharedConfigsCache() cache.logger = logr.Discard() - config := awsConfigMetadata{ - awsRegion: "test2-region", - awsAuthorization: AuthorizationMetadata{ - TriggerUniqueKey: "test2-key", - }, + awsAuthorization := AuthorizationMetadata{ + TriggerUniqueKey: "test2-key", + AwsRegion: "test2-region", } cfg := aws.Config{} cfg.AppID = "test2-app" - cacheKey := cache.getCacheKey(config.awsAuthorization) + cacheKey := cache.getCacheKey(awsAuthorization) cache.items[cacheKey] = cacheEntry{ config: &cfg, usages: map[string]bool{ - config.awsAuthorization.TriggerUniqueKey: true, + awsAuthorization.TriggerUniqueKey: true, }, } - cache.RemoveCachedEntry(config.awsAuthorization) + cache.RemoveCachedEntry(awsAuthorization) assert.NotContains(t, cache.items, cacheKey) } func TestRemoveCachedEntryNotRemoveCachedItemIfUsages(t *testing.T) { cache := newSharedConfigsCache() cache.logger = logr.Discard() - config := awsConfigMetadata{ - awsRegion: "test3-region", - awsAuthorization: AuthorizationMetadata{ - TriggerUniqueKey: "test3-key", - }, + awsAuthorization := AuthorizationMetadata{ + TriggerUniqueKey: "test3-key", + AwsRegion: "test3-region", } cfg := aws.Config{} cfg.AppID = "test3-app" - cacheKey := cache.getCacheKey(config.awsAuthorization) + cacheKey := cache.getCacheKey(awsAuthorization) cache.items[cacheKey] = cacheEntry{ config: &cfg, usages: map[string]bool{ - config.awsAuthorization.TriggerUniqueKey: true, - "other-usage": true, + awsAuthorization.TriggerUniqueKey: true, + "other-usage": true, }, } - cache.RemoveCachedEntry(config.awsAuthorization) + cache.RemoveCachedEntry(awsAuthorization) assert.Contains(t, cache.items, cacheKey) } + +func TestCredentialsShouldBeCachedPerRegion(t *testing.T) { + cache := newSharedConfigsCache() + cache.logger = logr.Discard() + awsAuthorization1 := AuthorizationMetadata{ + TriggerUniqueKey: "test4-key", + AwsRegion: "test4-region1", + } + awsAuthorization2 := AuthorizationMetadata{ + TriggerUniqueKey: "test4-key", + AwsRegion: "test4-region2", + } + cred1, err1 := cache.GetCredentials(context.Background(), awsAuthorization1) + cred2, err2 := cache.GetCredentials(context.Background(), awsAuthorization2) + + assert.NoError(t, err1) + assert.NoError(t, err2) + assert.NotEqual(t, cred1, cred2, "Credentials should be stored per region") +} diff --git a/pkg/scalers/aws/aws_sigv4.go b/pkg/scalers/aws/aws_sigv4.go index 2abde772f1c..0be031938b7 100644 --- a/pkg/scalers/aws/aws_sigv4.go +++ b/pkg/scalers/aws/aws_sigv4.go @@ -71,20 +71,12 @@ func (rt *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { } // parseAwsAMPMetadata parses the data to get the AWS sepcific auth info and metadata -func parseAwsAMPMetadata(config *scalersconfig.ScalerConfig) (*awsConfigMetadata, error) { - meta := awsConfigMetadata{} - - if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" { - meta.awsRegion = val - } - +func parseAwsAMPMetadata(config *scalersconfig.ScalerConfig) (*AuthorizationMetadata, error) { auth, err := GetAwsAuthorization(config.TriggerUniqueKey, config.PodIdentity, config.TriggerMetadata, config.AuthParams, config.ResolvedEnv) if err != nil { return nil, err } - - meta.awsAuthorization = auth - return &meta, nil + return &auth, nil } // NewSigV4RoundTripper returns a new http.RoundTripper that will sign requests @@ -100,11 +92,11 @@ func NewSigV4RoundTripper(config *scalersconfig.ScalerConfig) (http.RoundTripper // which is probably the reason to create a SigV4RoundTripper. // To prevent failures we check if the metadata is nil // (missing AWS info) and we hide the error - metadata, _ := parseAwsAMPMetadata(config) - if metadata == nil { + awsAuthorization, _ := parseAwsAMPMetadata(config) + if awsAuthorization == nil { return nil, nil } - awsCfg, err := GetAwsConfig(context.Background(), metadata.awsRegion, metadata.awsAuthorization) + awsCfg, err := GetAwsConfig(context.Background(), *awsAuthorization) if err != nil { return nil, err } diff --git a/pkg/scalers/aws_cloudwatch_scaler.go b/pkg/scalers/aws_cloudwatch_scaler.go index 9eacc785714..ee2483e898f 100644 --- a/pkg/scalers/aws_cloudwatch_scaler.go +++ b/pkg/scalers/aws_cloudwatch_scaler.go @@ -115,7 +115,7 @@ func NewAwsCloudwatchScaler(ctx context.Context, config *scalersconfig.ScalerCon } func createCloudwatchClient(ctx context.Context, metadata *awsCloudwatchMetadata) (*cloudwatch.Client, error) { - cfg, err := awsutils.GetAwsConfig(ctx, metadata.AwsRegion, metadata.awsAuthorization) + cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsAuthorization) if err != nil { return nil, err diff --git a/pkg/scalers/aws_dynamodb_scaler.go b/pkg/scalers/aws_dynamodb_scaler.go index f3c9bcac1f6..9668ff07ea7 100644 --- a/pkg/scalers/aws_dynamodb_scaler.go +++ b/pkg/scalers/aws_dynamodb_scaler.go @@ -125,7 +125,7 @@ func parseAwsDynamoDBMetadata(config *scalersconfig.ScalerConfig) (*awsDynamoDBM } func createDynamoDBClient(ctx context.Context, metadata *awsDynamoDBMetadata) (*dynamodb.Client, error) { - cfg, err := awsutils.GetAwsConfig(ctx, metadata.AwsRegion, metadata.awsAuthorization) + cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsAuthorization) if err != nil { return nil, err } diff --git a/pkg/scalers/aws_dynamodb_streams_scaler.go b/pkg/scalers/aws_dynamodb_streams_scaler.go index cdcd8548320..57e49821eff 100644 --- a/pkg/scalers/aws_dynamodb_streams_scaler.go +++ b/pkg/scalers/aws_dynamodb_streams_scaler.go @@ -92,7 +92,7 @@ func parseAwsDynamoDBStreamsMetadata(config *scalersconfig.ScalerConfig) (*awsDy } func createClientsForDynamoDBStreamsScaler(ctx context.Context, metadata *awsDynamoDBStreamsMetadata) (*dynamodb.Client, *dynamodbstreams.Client, error) { - cfg, err := awsutils.GetAwsConfig(ctx, metadata.AwsRegion, metadata.awsAuthorization) + cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsAuthorization) if err != nil { return nil, nil, err } diff --git a/pkg/scalers/aws_kinesis_stream_scaler.go b/pkg/scalers/aws_kinesis_stream_scaler.go index 65fb90a4e23..efbb2d4cb32 100644 --- a/pkg/scalers/aws_kinesis_stream_scaler.go +++ b/pkg/scalers/aws_kinesis_stream_scaler.go @@ -131,7 +131,7 @@ func parseAwsKinesisStreamMetadata(config *scalersconfig.ScalerConfig, logger lo } func createKinesisClient(ctx context.Context, metadata *awsKinesisStreamMetadata) (*kinesis.Client, error) { - cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsRegion, metadata.awsAuthorization) + cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsAuthorization) if err != nil { return nil, err } diff --git a/pkg/scalers/aws_sqs_queue_scaler.go b/pkg/scalers/aws_sqs_queue_scaler.go index 6f1b6b5d0ee..457be4522ad 100644 --- a/pkg/scalers/aws_sqs_queue_scaler.go +++ b/pkg/scalers/aws_sqs_queue_scaler.go @@ -122,7 +122,7 @@ func parseAwsSqsQueueMetadata(config *scalersconfig.ScalerConfig) (*awsSqsQueueM } func createSqsClient(ctx context.Context, metadata *awsSqsQueueMetadata) (*sqs.Client, error) { - cfg, err := awsutils.GetAwsConfig(ctx, metadata.AwsRegion, metadata.awsAuthorization) + cfg, err := awsutils.GetAwsConfig(ctx, metadata.awsAuthorization) if err != nil { return nil, err } diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index b353c1313b4..5b1adaf77ad 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -669,7 +669,7 @@ func getKafkaClientConfig(ctx context.Context, metadata kafkaMetadata) (*sarama. case KafkaSASLOAuthTokenProviderBearer: config.Net.SASL.TokenProvider = kafka.OAuthBearerTokenProvider(metadata.username, metadata.password, metadata.oauthTokenEndpointURI, metadata.scopes, metadata.oauthExtensions) case KafkaSASLOAuthTokenProviderAWSMSKIAM: - awsAuth, err := awsutils.GetAwsConfig(ctx, metadata.awsRegion, metadata.awsAuthorization) + awsAuth, err := awsutils.GetAwsConfig(ctx, metadata.awsAuthorization) if err != nil { return nil, fmt.Errorf("error getting AWS config: %w", err) } diff --git a/pkg/scaling/resolver/aws_secretmanager_handler.go b/pkg/scaling/resolver/aws_secretmanager_handler.go index 6c07281262a..595ffb4c6f1 100644 --- a/pkg/scaling/resolver/aws_secretmanager_handler.go +++ b/pkg/scaling/resolver/aws_secretmanager_handler.go @@ -73,6 +73,7 @@ func (ash *AwsSecretManagerHandler) Initialize(ctx context.Context, client clien if ash.secretManager.Region != "" { awsRegion = ash.secretManager.Region } + ash.awsMetadata.AwsRegion = awsRegion podIdentity := ash.secretManager.PodIdentity if podIdentity == nil { podIdentity = &kedav1alpha1.AuthPodIdentity{} @@ -100,7 +101,7 @@ func (ash *AwsSecretManagerHandler) Initialize(ctx context.Context, client clien return fmt.Errorf("pod identity provider %s not supported", podIdentity.Provider) } - config, err := awsutils.GetAwsConfig(ctx, awsRegion, ash.awsMetadata) + config, err := awsutils.GetAwsConfig(ctx, ash.awsMetadata) if err != nil { logger.Error(err, "Error getting credentials") return err From c8be1c7cd0a3bf08d7fc4c70f57df80be8cb7f94 Mon Sep 17 00:00:00 2001 From: Eng Zer Jun Date: Wed, 4 Dec 2024 18:34:21 +0800 Subject: [PATCH 3/4] refactor: replace experimental `maps` and `slices` with stdlib (#6372) Signed-off-by: Eng Zer Jun Signed-off-by: Jan Wozniak Co-authored-by: Jan Wozniak --- CHANGELOG.md | 1 + apis/eventing/v1alpha1/cloudeventsource_webhook.go | 2 +- go.mod | 6 ++---- pkg/eventemitter/eventfilter.go | 2 +- pkg/scalers/scalersconfig/typed_config.go | 11 ++++------- .../opentelemetry_metrics_test.go | 2 +- 6 files changed, 10 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 266aea0115b..f28f04d14e5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -94,6 +94,7 @@ New deprecation(s): ### Other - **General**: Bump newrelic-client-go deps to 2.51.2 (latest) ([#6325](https://github.com/kedacore/keda/pull/6325)) +- **General**: refactor: replace experimental `maps` and `slices` with stdlib ([#6372](https://github.com/kedacore/keda/pull/6372)) ## v2.16.0 diff --git a/apis/eventing/v1alpha1/cloudeventsource_webhook.go b/apis/eventing/v1alpha1/cloudeventsource_webhook.go index af7e69d7fb4..2dc577665de 100644 --- a/apis/eventing/v1alpha1/cloudeventsource_webhook.go +++ b/apis/eventing/v1alpha1/cloudeventsource_webhook.go @@ -19,8 +19,8 @@ package v1alpha1 import ( "encoding/json" "fmt" + "slices" - "golang.org/x/exp/slices" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" diff --git a/go.mod b/go.mod index 1d90fed97f9..ab53e4324fa 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,6 @@ module github.com/kedacore/keda/v2 -go 1.22.1 - -toolchain go1.23.3 +go 1.23.3 require ( cloud.google.com/go/compute/metadata v0.5.2 @@ -347,7 +345,7 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.28.0 - golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa + golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa // indirect golang.org/x/mod v0.21.0 // indirect golang.org/x/net v0.30.0 // indirect golang.org/x/sys v0.26.0 // indirect diff --git a/pkg/eventemitter/eventfilter.go b/pkg/eventemitter/eventfilter.go index 11af3af1ac8..b6b0dc32b0e 100644 --- a/pkg/eventemitter/eventfilter.go +++ b/pkg/eventemitter/eventfilter.go @@ -17,7 +17,7 @@ limitations under the License. package eventemitter import ( - "golang.org/x/exp/slices" + "slices" eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1" ) diff --git a/pkg/scalers/scalersconfig/typed_config.go b/pkg/scalers/scalersconfig/typed_config.go index 028028c8de0..62e5a71b9af 100644 --- a/pkg/scalers/scalersconfig/typed_config.go +++ b/pkg/scalers/scalersconfig/typed_config.go @@ -20,14 +20,13 @@ import ( "encoding/json" "errors" "fmt" + "maps" "net/url" "reflect" "runtime/debug" + "slices" "strconv" "strings" - - "golang.org/x/exp/maps" - "golang.org/x/exp/slices" ) // CustomValidator is an interface that can be implemented to validate the configuration of the typed config @@ -204,8 +203,7 @@ func (sc *ScalerConfig) setValue(field reflect.Value, params Params) error { } if !exists && !(params.Optional || params.IsDeprecated()) { if len(params.Order) == 0 { - apo := maps.Keys(allowedParsingOrderMap) - slices.Sort(apo) + apo := slices.Sorted(maps.Keys(allowedParsingOrderMap)) return fmt.Errorf("missing required parameter %q, no 'order' tag, provide any from %v", params.Name(), apo) } return fmt.Errorf("missing required parameter %q in %v", params.Name(), params.Order) @@ -463,8 +461,7 @@ func paramsFromTag(tag string, field reflect.StructField) (Params, error) { for _, po := range order { poTyped := ParsingOrder(strings.TrimSpace(po)) if !allowedParsingOrderMap[poTyped] { - apo := maps.Keys(allowedParsingOrderMap) - slices.Sort(apo) + apo := slices.Sorted(maps.Keys(allowedParsingOrderMap)) return params, fmt.Errorf("unknown parsing order value %s, has to be one of %s", po, apo) } params.Order = append(params.Order, poTyped) diff --git a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go index be9fafdc46e..98e287e2d08 100644 --- a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go +++ b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go @@ -9,6 +9,7 @@ import ( "fmt" "maps" "os/exec" + "slices" "strings" "testing" "time" @@ -17,7 +18,6 @@ import ( "github.com/prometheus/common/expfmt" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "golang.org/x/exp/slices" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" From c2e19c19dc566984277c165e5d0bf81c45bb8e67 Mon Sep 17 00:00:00 2001 From: rickbrouwer Date: Wed, 4 Dec 2024 15:56:08 +0100 Subject: [PATCH 4/4] Add KEDAScalersInfo to display important information (#6330) Signed-off-by: rickbrouwer --- CHANGELOG.md | 2 +- pkg/eventreason/eventreason.go | 3 ++ pkg/scalers/cpu_memory_scaler.go | 13 ++--- pkg/scalers/cpu_memory_scaler_test.go | 4 +- pkg/scalers/ibmmq_scaler.go | 9 ++-- pkg/scalers/scalersconfig/scalersconfig.go | 11 +++++ pkg/scalers/scalersconfig/typed_config.go | 40 ++++++++++++---- .../scalersconfig/typed_config_test.go | 48 +++++++++++++++++++ pkg/scaling/scalers_builder.go | 3 ++ 9 files changed, 110 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f28f04d14e5..4e85235b4e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -94,9 +94,9 @@ New deprecation(s): ### Other - **General**: Bump newrelic-client-go deps to 2.51.2 (latest) ([#6325](https://github.com/kedacore/keda/pull/6325)) +- **General**: New eventreason KEDAScalersInfo to display important information ([#6328](https://github.com/kedacore/keda/issues/6328)) - **General**: refactor: replace experimental `maps` and `slices` with stdlib ([#6372](https://github.com/kedacore/keda/pull/6372)) - ## v2.16.0 ### New diff --git a/pkg/eventreason/eventreason.go b/pkg/eventreason/eventreason.go index dd41cb6639a..45b162fb667 100644 --- a/pkg/eventreason/eventreason.go +++ b/pkg/eventreason/eventreason.go @@ -41,6 +41,9 @@ const ( // ScaledJobDeleted is for event when ScaledJob is deleted ScaledJobDeleted = "ScaledJobDeleted" + // KEDAScalersInfo is for event when Scaler has additional info + KEDAScalersInfo = "KEDAScalerInfo" + // KEDAScalersStarted is for event when scalers watch started for ScaledObject or ScaledJob KEDAScalersStarted = "KEDAScalersStarted" diff --git a/pkg/scalers/cpu_memory_scaler.go b/pkg/scalers/cpu_memory_scaler.go index 8da440ab77e..bcdd1b9490e 100644 --- a/pkg/scalers/cpu_memory_scaler.go +++ b/pkg/scalers/cpu_memory_scaler.go @@ -21,7 +21,7 @@ type cpuMemoryScaler struct { } type cpuMemoryMetadata struct { - Type string `keda:"name=type, order=triggerMetadata, enum=Utilization;AverageValue, optional"` + Type string `keda:"name=type, order=triggerMetadata, enum=Utilization;AverageValue, optional, deprecatedAnnounce=The 'type' setting is DEPRECATED and will be removed in v2.18 - Use 'metricType' instead."` Value string `keda:"name=value, order=triggerMetadata"` ContainerName string `keda:"name=containerName, order=triggerMetadata, optional"` AverageValue *resource.Quantity @@ -33,19 +33,21 @@ type cpuMemoryMetadata struct { func NewCPUMemoryScaler(resourceName v1.ResourceName, config *scalersconfig.ScalerConfig) (Scaler, error) { logger := InitializeLogger(config, "cpu_memory_scaler") - meta, err := parseResourceMetadata(config, logger) + meta, err := parseResourceMetadata(config) if err != nil { return nil, fmt.Errorf("error parsing %s metadata: %w", resourceName, err) } - return &cpuMemoryScaler{ + scaler := &cpuMemoryScaler{ metadata: meta, resourceName: resourceName, logger: logger, - }, nil + } + + return scaler, nil } -func parseResourceMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (cpuMemoryMetadata, error) { +func parseResourceMetadata(config *scalersconfig.ScalerConfig) (cpuMemoryMetadata, error) { meta := cpuMemoryMetadata{} err := config.TypedConfig(&meta) if err != nil { @@ -58,7 +60,6 @@ func parseResourceMetadata(config *scalersconfig.ScalerConfig, logger logr.Logge // This is deprecated and can be removed later if meta.Type != "" { - logger.Info("The 'type' setting is DEPRECATED and will be removed in v2.18 - Use 'metricType' instead.") switch meta.Type { case "AverageValue": meta.MetricType = v2.AverageValueMetricType diff --git a/pkg/scalers/cpu_memory_scaler_test.go b/pkg/scalers/cpu_memory_scaler_test.go index 78f662de247..e6b63220737 100644 --- a/pkg/scalers/cpu_memory_scaler_test.go +++ b/pkg/scalers/cpu_memory_scaler_test.go @@ -4,7 +4,6 @@ import ( "context" "testing" - "github.com/go-logr/logr" "github.com/stretchr/testify/assert" v2 "k8s.io/api/autoscaling/v2" v1 "k8s.io/api/core/v1" @@ -43,13 +42,12 @@ var testCPUMemoryMetadata = []parseCPUMemoryMetadataTestData{ } func TestCPUMemoryParseMetadata(t *testing.T) { - logger := logr.Discard() for i, testData := range testCPUMemoryMetadata { config := &scalersconfig.ScalerConfig{ TriggerMetadata: testData.metadata, MetricType: testData.metricType, } - _, err := parseResourceMetadata(config, logger) + _, err := parseResourceMetadata(config) if err != nil && !testData.isError { t.Errorf("Test case %d: Expected success but got error: %v", i, err) } diff --git a/pkg/scalers/ibmmq_scaler.go b/pkg/scalers/ibmmq_scaler.go index f2d95099241..5963ca1b222 100644 --- a/pkg/scalers/ibmmq_scaler.go +++ b/pkg/scalers/ibmmq_scaler.go @@ -34,7 +34,7 @@ type ibmmqMetadata struct { Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"` Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"` UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"` - TLS bool `keda:"name=tls, order=triggerMetadata, default=false"` // , deprecated=use unsafeSsl instead + TLS bool `keda:"name=tls, order=triggerMetadata, default=false, deprecatedAnnounce=The 'tls' setting is DEPRECATED and will be removed in v2.18 - Use 'unsafeSsl' instead"` CA string `keda:"name=ca, order=authParams, optional"` Cert string `keda:"name=cert, order=authParams, optional"` Key string `keda:"name=key, order=authParams, optional"` @@ -92,7 +92,6 @@ func NewIBMMQScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { // TODO: DEPRECATED to be removed in v2.18 if meta.TLS { - logger.Info("The 'tls' setting is DEPRECATED and will be removed in v2.18 - Use 'unsafeSsl' instead") meta.UnsafeSsl = meta.TLS } @@ -106,12 +105,14 @@ func NewIBMMQScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { httpClient.Transport = kedautil.CreateHTTPTransportWithTLSConfig(tlsConfig) } - return &ibmmqScaler{ + scaler := &ibmmqScaler{ metricType: metricType, metadata: meta, httpClient: httpClient, logger: logger, - }, nil + } + + return scaler, nil } func (s *ibmmqScaler) Close(context.Context) error { diff --git a/pkg/scalers/scalersconfig/scalersconfig.go b/pkg/scalers/scalersconfig/scalersconfig.go index 67ed86cd7a0..fab341744df 100644 --- a/pkg/scalers/scalersconfig/scalersconfig.go +++ b/pkg/scalers/scalersconfig/scalersconfig.go @@ -20,6 +20,8 @@ import ( "time" v2 "k8s.io/api/autoscaling/v2" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" ) @@ -41,6 +43,9 @@ type ScalerConfig struct { // Name of the trigger TriggerName string + // Trigger type (name of the trigger, also the scaler name) + TriggerType string + // Marks whether we should query metrics only during the polling interval // Any requests for metrics in between are read from the cache TriggerUseCachedMetrics bool @@ -68,4 +73,10 @@ type ScalerConfig struct { // When we use the scaler for composite scaler, we shouldn't require the value because it'll be ignored AsMetricSource bool + + // For events + Recorder record.EventRecorder + + // ScaledObjct + ScaledObject runtime.Object } diff --git a/pkg/scalers/scalersconfig/typed_config.go b/pkg/scalers/scalersconfig/typed_config.go index 62e5a71b9af..ac485fe91f2 100644 --- a/pkg/scalers/scalersconfig/typed_config.go +++ b/pkg/scalers/scalersconfig/typed_config.go @@ -27,6 +27,10 @@ import ( "slices" "strconv" "strings" + + corev1 "k8s.io/api/core/v1" + + "github.com/kedacore/keda/v2/pkg/eventreason" ) // CustomValidator is an interface that can be implemented to validate the configuration of the typed config @@ -66,15 +70,16 @@ const ( // field tag parameters const ( - optionalTag = "optional" - deprecatedTag = "deprecated" - defaultTag = "default" - orderTag = "order" - nameTag = "name" - enumTag = "enum" - exclusiveSetTag = "exclusiveSet" - rangeTag = "range" - separatorTag = "separator" + optionalTag = "optional" + deprecatedTag = "deprecated" + deprecatedAnnounceTag = "deprecatedAnnounce" + defaultTag = "default" + orderTag = "order" + nameTag = "name" + enumTag = "enum" + exclusiveSetTag = "exclusiveSet" + rangeTag = "range" + separatorTag = "separator" ) // Params is a struct that represents the parameter list that can be used in the keda tag @@ -100,6 +105,10 @@ type Params struct { // as an error and the DeprecatedMessage should be returned to the user Deprecated string + // DeprecatedAnnounce is the 'deprecatedAnnounce' tag parameter, if set this will trigger + // an info event with the deprecation message + DeprecatedAnnounce string + // Enum is the 'enum' tag parameter defining the list of possible values for the parameter Enum []string @@ -194,6 +203,13 @@ func (sc *ScalerConfig) setValue(field reflect.Value, params Params) error { if exists && params.IsDeprecated() { return fmt.Errorf("parameter %q is deprecated%v", params.Name(), params.DeprecatedMessage()) } + if exists && params.DeprecatedAnnounce != "" { + if sc.Recorder != nil { + message := fmt.Sprintf("Scaler %s info: %s", sc.TriggerType, params.DeprecatedAnnounce) + fmt.Print(message) + sc.Recorder.Event(sc.ScaledObject, corev1.EventTypeNormal, eventreason.KEDAScalersInfo, message) + } + } if !exists && params.Default != "" { exists = true valFromConfig = params.Default @@ -477,6 +493,12 @@ func paramsFromTag(tag string, field reflect.StructField) (Params, error) { } else { params.Deprecated = strings.TrimSpace(tsplit[1]) } + case deprecatedAnnounceTag: + if len(tsplit) == 1 { + params.DeprecatedAnnounce = deprecatedAnnounceTag + } else { + params.DeprecatedAnnounce = strings.TrimSpace(tsplit[1]) + } case defaultTag: if len(tsplit) > 1 { params.Default = strings.TrimSpace(tsplit[1]) diff --git a/pkg/scalers/scalersconfig/typed_config_test.go b/pkg/scalers/scalersconfig/typed_config_test.go index 866311f574a..c17952d9908 100644 --- a/pkg/scalers/scalersconfig/typed_config_test.go +++ b/pkg/scalers/scalersconfig/typed_config_test.go @@ -21,6 +21,7 @@ import ( "testing" . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/runtime" ) // TestBasicTypedConfig tests the basic types for typed config @@ -583,3 +584,50 @@ func TestMultiName(t *testing.T) { Expect(err).To(BeNil()) Expect(ts.Property).To(Equal("bbb")) } + +// TestDeprecatedAnnounce tests the deprecatedAnnounce tag +func TestDeprecatedAnnounce(t *testing.T) { + RegisterTestingT(t) + + // Create a mock recorder to capture the event + mockRecorder := &MockEventRecorder{} + + sc := &ScalerConfig{ + TriggerMetadata: map[string]string{ + "oldParam": "value1", + }, + Recorder: mockRecorder, + } + + type testStruct struct { + OldParam string `keda:"name=oldParam, order=triggerMetadata, deprecatedAnnounce=This parameter is deprecated. Use newParam instead"` + } + + ts := testStruct{} + err := sc.TypedConfig(&ts) + Expect(err).To(BeNil()) + Expect(ts.OldParam).To(Equal("value1")) + + // Verify that the deprecation event was recorded + Expect(mockRecorder.EventCalled).To(BeTrue()) + Expect(mockRecorder.Message).To(Equal("Scaler info: This parameter is deprecated. Use newParam instead")) +} + +// MockEventRecorder is a mock implementation of record.EventRecorder +type MockEventRecorder struct { + EventCalled bool + Message string +} + +func (m *MockEventRecorder) Event(object runtime.Object, eventtype, reason, message string) { + m.EventCalled = true + m.Message = message +} + +func (m *MockEventRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) { + // Not needed +} + +func (m *MockEventRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) { + // Not needed +} diff --git a/pkg/scaling/scalers_builder.go b/pkg/scaling/scalers_builder.go index 9c44ac5004e..11d294bbec3 100644 --- a/pkg/scaling/scalers_builder.go +++ b/pkg/scaling/scalers_builder.go @@ -59,6 +59,7 @@ func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alp ScalableObjectType: withTriggers.Kind, TriggerName: trigger.Name, TriggerMetadata: trigger.Metadata, + TriggerType: trigger.Type, TriggerUseCachedMetrics: trigger.UseCachedMetrics, ResolvedEnv: resolvedEnv, AuthParams: make(map[string]string), @@ -66,6 +67,8 @@ func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alp TriggerIndex: triggerIndex, MetricType: trigger.MetricType, AsMetricSource: asMetricSource, + ScaledObject: withTriggers, + Recorder: h.recorder, TriggerUniqueKey: fmt.Sprintf("%s-%s-%s-%d", withTriggers.Kind, withTriggers.Namespace, withTriggers.Name, triggerIndex), }