Skip to content

Commit 9a73d19

Browse files
authored
feat: add keda integration (#1058)
* refactor: move FetchLatestEvent inside util package Signed-off-by: DragonAlex98 <[email protected]> * feat: add Keda integration and ScaledObject analyzer Signed-off-by: DragonAlex98 <[email protected]> --------- Signed-off-by: DragonAlex98 <[email protected]>
1 parent 85a76a3 commit 9a73d19

File tree

10 files changed

+465
-53
lines changed

10 files changed

+465
-53
lines changed

go.mod

+3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.21
55
require (
66
github.com/aquasecurity/trivy-operator v0.17.1
77
github.com/fatih/color v1.16.0
8+
github.com/kedacore/keda/v2 v2.11.2
89
github.com/magiconair/properties v1.8.7
910
github.com/mittwald/go-helm-client v0.12.5
1011
github.com/sashabaranov/go-openai v1.20.4
@@ -92,11 +93,13 @@ require (
9293
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
9394
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
9495
go.opentelemetry.io/otel/metric v1.24.0 // indirect
96+
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
9597
google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 // indirect
9698
google.golang.org/genproto/googleapis/api v0.0.0-20240314234333-6e1732d8331c // indirect
9799
google.golang.org/genproto/googleapis/rpc v0.0.0-20240311132316-a219d84964c2 // indirect
98100
gopkg.in/evanphx/json-patch.v5 v5.7.0 // indirect
99101
gopkg.in/yaml.v3 v3.0.1 // indirect
102+
knative.dev/pkg v0.0.0-20230616134650-eb63a40adfb0 // indirect
100103
)
101104

102105
require (

go.sum

+4
Original file line numberDiff line numberDiff line change
@@ -1799,6 +1799,8 @@ github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1
17991799
github.com/karrick/godirwalk v1.16.1 h1:DynhcF+bztK8gooS0+NDJFrdNZjJ3gzVzC545UNA9iw=
18001800
github.com/karrick/godirwalk v1.16.1/go.mod h1:j4mkqPuvaLI8mp1DroR3P6ad7cyYd4c1qeJ3RV7ULlk=
18011801
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
1802+
github.com/kedacore/keda/v2 v2.11.2 h1:UgPww0NREqUkM1PGERUz+eb5PlO5oU8V/sT9Hh+ZD60=
1803+
github.com/kedacore/keda/v2 v2.11.2/go.mod h1:eutYX+QXTi3QH90F7JvY3tYtV5Jq10o5f56Chk5IVF8=
18021804
github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4gf13a4=
18031805
github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
18041806
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
@@ -3048,6 +3050,8 @@ k8s.io/kubectl v0.28.4 h1:gWpUXW/T7aFne+rchYeHkyB8eVDl5UZce8G4X//kjUQ=
30483050
k8s.io/kubectl v0.28.4/go.mod h1:CKOccVx3l+3MmDbkXtIUtibq93nN2hkDR99XDCn7c/c=
30493051
k8s.io/utils v0.0.0-20240310230437-4693a0247e57 h1:gbqbevonBh57eILzModw6mrkbwM0gQBEuevE/AaBsHY=
30503052
k8s.io/utils v0.0.0-20240310230437-4693a0247e57/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
3053+
knative.dev/pkg v0.0.0-20230616134650-eb63a40adfb0 h1:weQWWxEEbNOPuL4qtGiBZuMSFhcjF/Cu163uktd/xFE=
3054+
knative.dev/pkg v0.0.0-20230616134650-eb63a40adfb0/go.mod h1:dqC6IrvyBE7E+oZocs5PkVhq1G59pDTA7r8U17EAKMk=
30513055
lukechampine.com/uint128 v1.1.1/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk=
30523056
lukechampine.com/uint128 v1.2.0/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk=
30533057
modernc.org/cc/v3 v3.36.0/go.mod h1:NFUHyPn4ekoC/JHeZFfZurN6ixxawE1BnVonP/oahEI=

pkg/analyzer/events.go

-50
This file was deleted.

pkg/analyzer/pod.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func analyzeContainerStatusFailures(a common.Analyzer, statuses []v1.ContainerSt
9797
if containerStatus.State.Waiting.Reason == "ContainerCreating" && statusPhase == "Pending" {
9898
// This represents a container that is still being created or blocked due to conditions such as OOMKilled
9999
// parse the event log and append details
100-
evt, err := FetchLatestEvent(a.Context, a.Client, namespace, name)
100+
evt, err := util.FetchLatestEvent(a.Context, a.Client, namespace, name)
101101
if err != nil || evt == nil {
102102
continue
103103
}
@@ -123,7 +123,7 @@ func analyzeContainerStatusFailures(a common.Analyzer, statuses []v1.ContainerSt
123123
// when pod is Running but its ReadinessProbe fails
124124
if !containerStatus.Ready && statusPhase == "Running" {
125125
// parse the event log and append details
126-
evt, err := FetchLatestEvent(a.Context, a.Client, namespace, name)
126+
evt, err := util.FetchLatestEvent(a.Context, a.Client, namespace, name)
127127
if err != nil || evt == nil {
128128
continue
129129
}

pkg/analyzer/pvc.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (PvcAnalyzer) Analyze(a common.Analyzer) ([]common.Result, error) {
4747
if pvc.Status.Phase == appsv1.ClaimPending {
4848

4949
// parse the event log and append details
50-
evt, err := FetchLatestEvent(a.Context, a.Client, pvc.Namespace, pvc.Name)
50+
evt, err := util.FetchLatestEvent(a.Context, a.Client, pvc.Namespace, pvc.Name)
5151
if err != nil || evt == nil {
5252
continue
5353
}

pkg/common/types.go

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
openapi_v2 "github.com/google/gnostic/openapiv2"
2121
"github.com/k8sgpt-ai/k8sgpt/pkg/ai"
2222
"github.com/k8sgpt-ai/k8sgpt/pkg/kubernetes"
23+
keda "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
2324
regv1 "k8s.io/api/admissionregistration/v1"
2425
appsv1 "k8s.io/api/apps/v1"
2526
autov1 "k8s.io/api/autoscaling/v1"
@@ -62,6 +63,7 @@ type PreAnalysis struct {
6263
Gateway gtwapi.Gateway
6364
HTTPRoute gtwapi.HTTPRoute
6465
// Integrations
66+
ScaledObject keda.ScaledObject
6567
TrivyVulnerabilityReport trivy.VulnerabilityReport
6668
TrivyConfigAuditReport trivy.ConfigAuditReport
6769
}

pkg/integration/integration.go

+3
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@ package integration
1616
import (
1717
"errors"
1818
"fmt"
19+
1920
"github.com/k8sgpt-ai/k8sgpt/pkg/integration/aws"
2021

2122
"github.com/k8sgpt-ai/k8sgpt/pkg/common"
23+
"github.com/k8sgpt-ai/k8sgpt/pkg/integration/keda"
2224
"github.com/k8sgpt-ai/k8sgpt/pkg/integration/prometheus"
2325
"github.com/k8sgpt-ai/k8sgpt/pkg/integration/trivy"
2426
"github.com/k8sgpt-ai/k8sgpt/pkg/util"
@@ -49,6 +51,7 @@ var integrations = map[string]IIntegration{
4951
"trivy": trivy.NewTrivy(),
5052
"prometheus": prometheus.NewPrometheus(),
5153
"aws": aws.NewAWS(),
54+
"keda": keda.NewKeda(),
5255
}
5356

5457
func NewIntegration() *Integration {

pkg/integration/keda/keda.go

+229
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
package keda
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
8+
"google.golang.org/grpc/codes"
9+
"google.golang.org/grpc/status"
10+
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
12+
"github.com/fatih/color"
13+
"github.com/k8sgpt-ai/k8sgpt/pkg/common"
14+
"github.com/k8sgpt-ai/k8sgpt/pkg/kubernetes"
15+
"github.com/kedacore/keda/v2/pkg/generated/clientset/versioned/typed/keda/v1alpha1"
16+
helmclient "github.com/mittwald/go-helm-client"
17+
"github.com/spf13/viper"
18+
"helm.sh/helm/v3/pkg/repo"
19+
)
20+
21+
var (
22+
Repo = getEnv("KEDA_REPO", "https://kedacore.github.io/charts")
23+
Version = getEnv("KEDA_VERSION", "2.11.2")
24+
ChartName = getEnv("KEDA_CHART_NAME", "keda")
25+
RepoShortName = getEnv("KEDA_REPO_SHORT_NAME", "keda")
26+
ReleaseName = getEnv("KEDA_RELEASE_NAME", "keda-k8sgpt")
27+
)
28+
29+
type Keda struct {
30+
helm helmclient.Client
31+
}
32+
33+
func getEnv(key, defaultValue string) string {
34+
value := os.Getenv(key)
35+
if value == "" {
36+
return defaultValue
37+
}
38+
return value
39+
}
40+
41+
func NewKeda() *Keda {
42+
helmClient, err := helmclient.New(&helmclient.Options{})
43+
if err != nil {
44+
panic(err)
45+
}
46+
return &Keda{
47+
helm: helmClient,
48+
}
49+
}
50+
51+
func (k *Keda) Deploy(namespace string) error {
52+
// Add the repository
53+
chartRepo := repo.Entry{
54+
Name: RepoShortName,
55+
URL: Repo,
56+
}
57+
// Add a chart-repository to the client.
58+
if err := k.helm.AddOrUpdateChartRepo(chartRepo); err != nil {
59+
panic(err)
60+
}
61+
62+
chartSpec := helmclient.ChartSpec{
63+
ReleaseName: ReleaseName,
64+
ChartName: fmt.Sprintf("%s/%s", RepoShortName, ChartName),
65+
Namespace: namespace,
66+
67+
//TODO: All of this should be configurable
68+
UpgradeCRDs: true,
69+
Wait: false,
70+
Timeout: 300,
71+
CreateNamespace: true,
72+
}
73+
74+
// Install a chart release.
75+
// Note that helmclient.Options.Namespace should ideally match the namespace in chartSpec.Namespace.
76+
if _, err := k.helm.InstallOrUpgradeChart(context.Background(), &chartSpec, nil); err != nil {
77+
return err
78+
}
79+
80+
return nil
81+
}
82+
83+
func (k *Keda) UnDeploy(namespace string) error {
84+
kubecontext := viper.GetString("kubecontext")
85+
kubeconfig := viper.GetString("kubeconfig")
86+
client, err := kubernetes.NewClient(kubecontext, kubeconfig)
87+
if err != nil {
88+
// TODO: better error handling
89+
color.Red("Error initialising kubernetes client: %v", err)
90+
os.Exit(1)
91+
}
92+
93+
kedaNamespace, _ := k.GetNamespace()
94+
color.Blue(fmt.Sprintf("Keda namespace: %s\n", kedaNamespace))
95+
96+
kClient, _ := v1alpha1.NewForConfig(client.Config)
97+
98+
scaledObjectList, _ := kClient.ScaledObjects("").List(context.Background(), v1.ListOptions{})
99+
scaledJobList, _ := kClient.ScaledJobs("").List(context.Background(), v1.ListOptions{})
100+
triggerAuthenticationList, _ := kClient.TriggerAuthentications("").List(context.Background(), v1.ListOptions{})
101+
clusterTriggerAuthenticationsList, _ := kClient.ClusterTriggerAuthentications().List(context.Background(), v1.ListOptions{})
102+
103+
// Before uninstalling the Helm chart, we need to delete Keda resources
104+
for _, scaledObject := range scaledObjectList.Items {
105+
err := kClient.ScaledObjects(scaledObject.Namespace).Delete(context.Background(), scaledObject.Name, v1.DeleteOptions{})
106+
if err != nil {
107+
fmt.Printf("Error deleting scaledObject %s: %v\n", scaledObject.Name, err)
108+
} else {
109+
fmt.Printf("Deleted scaledObject %s in namespace %s\n", scaledObject.Name, scaledObject.Namespace)
110+
}
111+
}
112+
113+
for _, scaledJob := range scaledJobList.Items {
114+
err := kClient.ScaledJobs(scaledJob.Namespace).Delete(context.Background(), scaledJob.Name, v1.DeleteOptions{})
115+
if err != nil {
116+
fmt.Printf("Error deleting scaledJob %s: %v\n", scaledJob.Name, err)
117+
} else {
118+
fmt.Printf("Deleted scaledJob %s in namespace %s\n", scaledJob.Name, scaledJob.Namespace)
119+
}
120+
}
121+
122+
for _, triggerAuthentication := range triggerAuthenticationList.Items {
123+
err := kClient.TriggerAuthentications(triggerAuthentication.Namespace).Delete(context.Background(), triggerAuthentication.Name, v1.DeleteOptions{})
124+
if err != nil {
125+
fmt.Printf("Error deleting triggerAuthentication %s: %v\n", triggerAuthentication.Name, err)
126+
} else {
127+
fmt.Printf("Deleted triggerAuthentication %s in namespace %s\n", triggerAuthentication.Name, triggerAuthentication.Namespace)
128+
}
129+
}
130+
131+
for _, clusterTriggerAuthentication := range clusterTriggerAuthenticationsList.Items {
132+
err := kClient.ClusterTriggerAuthentications().Delete(context.Background(), clusterTriggerAuthentication.Name, v1.DeleteOptions{})
133+
if err != nil {
134+
fmt.Printf("Error deleting clusterTriggerAuthentication %s: %v\n", clusterTriggerAuthentication.Name, err)
135+
} else {
136+
fmt.Printf("Deleted clusterTriggerAuthentication %s\n", clusterTriggerAuthentication.Name)
137+
}
138+
}
139+
140+
chartSpec := helmclient.ChartSpec{
141+
ReleaseName: ReleaseName,
142+
ChartName: fmt.Sprintf("%s/%s", RepoShortName, ChartName),
143+
Namespace: namespace,
144+
UpgradeCRDs: true,
145+
Wait: false,
146+
Timeout: 300,
147+
}
148+
// Uninstall the chart release.
149+
// Note that helmclient.Options.Namespace should ideally match the namespace in chartSpec.Namespace.
150+
if err := k.helm.UninstallRelease(&chartSpec); err != nil {
151+
return err
152+
}
153+
return nil
154+
}
155+
156+
func (k *Keda) AddAnalyzer(mergedMap *map[string]common.IAnalyzer) {
157+
(*mergedMap)["ScaledObject"] = &ScaledObjectAnalyzer{}
158+
}
159+
160+
func (k *Keda) GetAnalyzerName() []string {
161+
return []string{
162+
"ScaledObject",
163+
}
164+
}
165+
166+
func (k *Keda) GetNamespace() (string, error) {
167+
releases, err := k.helm.ListDeployedReleases()
168+
if err != nil {
169+
return "", err
170+
}
171+
for _, rel := range releases {
172+
if rel.Name == ReleaseName {
173+
return rel.Namespace, nil
174+
}
175+
}
176+
return "", status.Error(codes.NotFound, "keda release not found")
177+
}
178+
179+
func (k *Keda) OwnsAnalyzer(analyzer string) bool {
180+
for _, a := range k.GetAnalyzerName() {
181+
if analyzer == a {
182+
return true
183+
}
184+
}
185+
return false
186+
}
187+
188+
func (k *Keda) isFilterActive() bool {
189+
activeFilters := viper.GetStringSlice("active_filters")
190+
191+
for _, filter := range k.GetAnalyzerName() {
192+
for _, af := range activeFilters {
193+
if af == filter {
194+
return true
195+
}
196+
}
197+
}
198+
199+
return false
200+
}
201+
202+
func (k *Keda) isDeployed() bool {
203+
kubecontext := viper.GetString("kubecontext")
204+
kubeconfig := viper.GetString("kubeconfig")
205+
client, err := kubernetes.NewClient(kubecontext, kubeconfig)
206+
if err != nil {
207+
// TODO: better error handling
208+
color.Red("Error initialising kubernetes client: %v", err)
209+
os.Exit(1)
210+
}
211+
groups, _, err := client.Client.Discovery().ServerGroupsAndResources()
212+
if err != nil {
213+
// TODO: better error handling
214+
color.Red("Error initialising discovery client: %v", err)
215+
os.Exit(1)
216+
}
217+
218+
for _, group := range groups {
219+
if group.Name == "keda.sh" {
220+
return true
221+
}
222+
}
223+
224+
return false
225+
}
226+
227+
func (k *Keda) IsActivate() bool {
228+
return k.isFilterActive() && k.isDeployed()
229+
}

0 commit comments

Comments
 (0)