Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(operator): fetch cluster name from providers #1881

Merged
merged 1 commit into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions KubeArmor/core/kubeUpdate.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -2761,6 +2761,10 @@ func (dm *KubeArmorDaemon) WatchConfigMap() cache.InformerSynced {
if cm, ok := obj.(*corev1.ConfigMap); ok && cm.Namespace == cmNS {
cfg.GlobalCfg.HostVisibility = cm.Data[cfg.ConfigHostVisibility]
cfg.GlobalCfg.Visibility = cm.Data[cfg.ConfigVisibility]
cfg.GlobalCfg.Cluster = cm.Data[cfg.ConfigCluster]
rootxrishabh marked this conversation as resolved.
Show resolved Hide resolved
dm.NodeLock.Lock()
dm.Node.ClusterName = cm.Data[cfg.ConfigCluster]
dm.NodeLock.Unlock()
if _, ok := cm.Data[cfg.ConfigDefaultPostureLogs]; ok {
cfg.GlobalCfg.DefaultPostureLogs = (cm.Data[cfg.ConfigDefaultPostureLogs] == "true")
}
Expand Down Expand Up @@ -2806,6 +2810,8 @@ func (dm *KubeArmorDaemon) WatchConfigMap() cache.InformerSynced {
if cm, ok := new.(*corev1.ConfigMap); ok && cm.Namespace == cmNS {
cfg.GlobalCfg.HostVisibility = cm.Data[cfg.ConfigHostVisibility]
cfg.GlobalCfg.Visibility = cm.Data[cfg.ConfigVisibility]
cfg.GlobalCfg.Cluster = cm.Data[cfg.ConfigCluster]
dm.Node.ClusterName = cm.Data[cfg.ConfigCluster]
if _, ok := cm.Data[cfg.ConfigDefaultPostureLogs]; ok {
cfg.GlobalCfg.DefaultPostureLogs = (cm.Data[cfg.ConfigDefaultPostureLogs] == "true")
}
Expand Down
6 changes: 3 additions & 3 deletions KubeArmor/feeder/feeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func (fd *Feeder) PushMessage(level, message string) {
pbMsg.UpdatedTime = updatedTime

//pbMsg.ClusterName = cfg.GlobalCfg.Cluster
pbMsg.ClusterName = fd.Node.ClusterName
pbMsg.ClusterName = cfg.GlobalCfg.Cluster

pbMsg.HostName = cfg.GlobalCfg.Host
pbMsg.HostIP = fd.Node.NodeIP
Expand Down Expand Up @@ -598,7 +598,7 @@ func (fd *Feeder) PushLog(log tp.Log) {
pbAlert.Timestamp = log.Timestamp
pbAlert.UpdatedTime = log.UpdatedTime

pbAlert.ClusterName = fd.Node.ClusterName
pbAlert.ClusterName = cfg.GlobalCfg.Cluster
pbAlert.HostName = fd.Node.NodeName

pbAlert.NamespaceName = log.NamespaceName
Expand Down Expand Up @@ -696,7 +696,7 @@ func (fd *Feeder) PushLog(log tp.Log) {
pbLog.Timestamp = log.Timestamp
pbLog.UpdatedTime = log.UpdatedTime

pbLog.ClusterName = fd.Node.ClusterName
pbLog.ClusterName = cfg.GlobalCfg.Cluster
pbLog.HostName = fd.Node.NodeName

pbLog.NamespaceName = log.NamespaceName
Expand Down
4 changes: 3 additions & 1 deletion deployments/helm/KubeArmorOperator/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ spec:
{{- if or (eq $tag "latest") (and (hasPrefix "v" $tag) (semverCompare "^1.4.0" $tag)) }}
# initDeploy flag is only supported from v1.4.0
args:
- --initDeploy={{.Values.kubearmorOperator.initDeploy }}
{{- if .Values.kubearmorOperator.args -}}
{{- toYaml .Values.kubearmorOperator.args | trim | nindent 8 }}
{{- end }}
{{- end }}

serviceAccountName: {{ .Values.kubearmorOperator.name }}
Expand Down
3 changes: 2 additions & 1 deletion deployments/helm/KubeArmorOperator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ kubearmorOperator:
repository: kubearmor/kubearmor-operator
tag: ""
imagePullPolicy: IfNotPresent
initDeploy: true
args:
- "--initDeploy=true"

kubearmorConfig:
defaultCapabilitiesPosture: audit
Expand Down
5 changes: 4 additions & 1 deletion pkg/KubeArmorOperator/cmd/operator/main.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var Opv1Client *opv1client.Clientset
var Secv1Client *secv1client.Clientset
var InitDeploy bool
var LogLevel string
var ProviderHostname, ProviderEndpoint string

// Cmd represents the base command when called without any subcommands
var Cmd = &cobra.Command{
Expand All @@ -55,7 +56,7 @@ var Cmd = &cobra.Command{
return nil
},
Run: func(cmd *cobra.Command, args []string) {
nodeWatcher := controllers.NewClusterWatcher(K8sClient, Logger, ExtClient, Opv1Client, Secv1Client, PathPrefix, DeploymentName, InitDeploy)
nodeWatcher := controllers.NewClusterWatcher(K8sClient, Logger, ExtClient, Opv1Client, Secv1Client, PathPrefix, DeploymentName, ProviderHostname, ProviderEndpoint, InitDeploy)
go nodeWatcher.WatchConfigCrd()
nodeWatcher.WatchNodes()

Expand All @@ -81,6 +82,8 @@ func init() {
Cmd.PersistentFlags().StringVar(&LsmOrder, "lsm", "bpf,apparmor,selinux", "lsm preference order to use")
Cmd.PersistentFlags().StringVar(&PathPrefix, "pathprefix", "/rootfs/", "path prefix for runtime search")
Cmd.PersistentFlags().StringVar(&DeploymentName, "deploymentName", "kubearmor-operator", "operator deployment name")
Cmd.PersistentFlags().StringVar(&ProviderHostname, "providerHostname", "", "IMDS URL hostname for retrieving cluster name")
Cmd.PersistentFlags().StringVar(&ProviderEndpoint, "providerEndpoint", "", "IMDS URL endpoint for retrieving cluster name")
// TODO:- set initDeploy to false by default once this change is added to stable
Cmd.PersistentFlags().BoolVar(&InitDeploy, "initDeploy", true, "Init container deployment")
Cmd.PersistentFlags().StringVar(&LogLevel, "loglevel", "info", "log level, e.g., debug, info, warn, error")
Expand Down
6 changes: 5 additions & 1 deletion pkg/KubeArmorOperator/internal/controller/cluster.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var deployment_uuid types.UID
var deployment_name string = "kubearmor-operator"
var PathPrefix string
var initDeploy bool
var ProviderHostname, ProviderEndpoint string

type ClusterWatcher struct {
Nodes []Node
Expand All @@ -68,7 +69,7 @@ type Node struct {
Seccomp string
}

func NewClusterWatcher(client *kubernetes.Clientset, log *zap.SugaredLogger, extClient *apiextensionsclientset.Clientset, opv1Client *opv1client.Clientset, secv1Client *secv1client.Clientset, pathPrefix, deploy_name string, initdeploy bool) *ClusterWatcher {
func NewClusterWatcher(client *kubernetes.Clientset, log *zap.SugaredLogger, extClient *apiextensionsclientset.Clientset, opv1Client *opv1client.Clientset, secv1Client *secv1client.Clientset, pathPrefix, deploy_name, providerHostname, providerEndpoint string, initdeploy bool) *ClusterWatcher {
if informer == nil {
informer = informers.NewSharedInformerFactory(client, 0)
}
Expand All @@ -85,6 +86,9 @@ func NewClusterWatcher(client *kubernetes.Clientset, log *zap.SugaredLogger, ext
PathPrefix = pathPrefix
deployment_name = deploy_name
initDeploy = initdeploy
ProviderHostname = providerHostname
ProviderEndpoint = providerEndpoint

return &ClusterWatcher{
Nodes: []Node{},
Daemonsets: make(map[string]int),
Expand Down
158 changes: 158 additions & 0 deletions pkg/KubeArmorOperator/internal/controller/resources.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"regexp"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -470,6 +473,160 @@ func (clusterWatcher *ClusterWatcher) deployControllerDeployment(deployment *app
return nil
}

func (clusterWatcher *ClusterWatcher) getProvider(providerHostname, providerEndpoint string) (string, string, string) {
nodes, err := clusterWatcher.Client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
clusterWatcher.Log.Warnf("Error listing nodes: %s\n", err.Error())
}

for _, node := range nodes.Items {
for key, label := range node.Labels {
if strings.Contains(key, "gke") || strings.Contains(label, "gke") {
if providerHostname != "" && providerEndpoint == "" {
providerEndpoint = "/computeMetadata/v1/instance/attributes/cluster-name"
} else if providerHostname == "" && providerEndpoint != "" {
providerHostname = "http://metadata.google.internal"
} else if providerHostname == "" && providerEndpoint == "" {
providerHostname = "http://metadata.google.internal"
providerEndpoint = "/computeMetadata/v1/instance/attributes/cluster-name"
}
return "gke", providerHostname, providerEndpoint
} else if strings.Contains(key, "eks") || strings.Contains(label, "eks") {
if providerHostname != "" && providerEndpoint == "" {
providerEndpoint = "/latest/user-data"
} else if providerHostname == "" && providerEndpoint != "" {
providerHostname = "http://169.254.169.254"
} else if providerHostname == "" && providerEndpoint == "" {
providerHostname = "http://169.254.169.254"
providerEndpoint = "/latest/user-data"
}
return "eks", providerHostname, providerEndpoint
}
}
}
return "default", "", ""
}

func (clusterWatcher *ClusterWatcher) fetchClusterNameFromGKE(providerHostname, providerEndpoint string) (string, error) {
url := providerHostname + providerEndpoint
req, err := http.NewRequest("GET", url, nil)
if err != nil {
clusterWatcher.Log.Warnf("failed to create request: %w, check provider host name and endpoint", err)
return "", err
}

// Set the required header
req.Header.Set("Metadata-Flavor", "Google")

// Create an HTTP client and make the request
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
clusterWatcher.Log.Warnf("error making request: %w, check provider host name and endpoint", err)
return "", err
}
defer resp.Body.Close()

// Check for a successful response
if resp.StatusCode != http.StatusOK {
clusterWatcher.Log.Warnf("failed to fetch from metadata, status code: %d", resp.StatusCode)
return "", err
}

// Read the response body
body, err := io.ReadAll(resp.Body)
if err != nil {
clusterWatcher.Log.Warnf("error reading response body: %w", err)
return "", err
}

return string(body), nil
}

func (clusterWatcher *ClusterWatcher) fetchClusterNameFromAWS(providerHostname, providerEndpoint string) (string, error) {
var token []byte
client := &http.Client{Timeout: 2 * time.Second}
req, err := http.NewRequest("PUT", providerHostname+"/latest/api/token", nil)
if err != nil {
clusterWatcher.Log.Warnf("failed to create request for fetching token: %w, check provider host name", err)
return "", err
}
req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", "21600")

resp, err := client.Do(req)
if err != nil {
clusterWatcher.Log.Warnf("error making request: %w", err)
return "", err
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusOK {
token, err = io.ReadAll(resp.Body)
if err != nil {
clusterWatcher.Log.Warnf("failed to read token: %d", err)
return "", err
}
}

// Fetch the EKS cluster name from user data
url := providerHostname + providerEndpoint
req, err = http.NewRequest("GET", url, nil)
client = &http.Client{Timeout: 2 * time.Second}
if err != nil {
clusterWatcher.Log.Warnf("failed to create request for fetching metadata: %w, check provider host name and endpoint", err)
return "", err
}
req.Header.Set("X-aws-ec2-metadata-token", string(token))

resp, err = client.Do(req)
if err != nil {
clusterWatcher.Log.Warnf("error making request: %w", err)
return "", err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
clusterWatcher.Log.Warnf("failed to fetch from metadata, status code: %d", resp.StatusCode)
return "", err
}

body, err := io.ReadAll(resp.Body)
if err != nil {
clusterWatcher.Log.Warnf("failed to read metadata: %d", err)
return "", err
}

// Extract EKS cluster name
re := regexp.MustCompile(`/etc/eks/bootstrap\.sh (\S+)`)
match := re.FindStringSubmatch(string(body))
if len(match) > 0 {
return match[1], nil
}

return "", err
}

func (clusterWatcher *ClusterWatcher) GetClusterName(providerHostname, providerEndpoint string) string {
provider, pHostname, pEndpoint := clusterWatcher.getProvider(ProviderHostname, providerEndpoint)
if provider == "gke" {
clusterWatcher.Log.Infof("Provider is GKE")
if clusterName, err := clusterWatcher.fetchClusterNameFromGKE(pHostname, pEndpoint); err != nil {
clusterWatcher.Log.Warnf("Cannot fetch cluster name for GKE %s", err.Error())
} else {
return clusterName
}
} else if provider == "eks" {
clusterWatcher.Log.Infof("Provider is EKS")
if clusterName, err := clusterWatcher.fetchClusterNameFromAWS(pHostname, pEndpoint); err != nil {
clusterWatcher.Log.Warnf("Cannot fetch cluster name for EKS %s", err.Error())
} else {
return clusterName
}
}

return "default"
}

func (clusterWatcher *ClusterWatcher) WatchRequiredResources() {
var caCert, tlsCrt, tlsKey *bytes.Buffer
var kGenErr, err, installErr error
Expand Down Expand Up @@ -647,6 +804,7 @@ func (clusterWatcher *ClusterWatcher) WatchRequiredResources() {
// kubearmor configmap
configmap := addOwnership(deployments.GetKubearmorConfigMap(common.Namespace, deployments.KubeArmorConfigMapName)).(*corev1.ConfigMap)
configmap.Data = common.ConfigMapData
configmap.Data["cluster"] = clusterWatcher.GetClusterName(ProviderHostname, ProviderEndpoint)

for {
caCert, tlsCrt, tlsKey, kGenErr = common.GeneratePki(common.Namespace, deployments.KubeArmorControllerWebhookServiceName)
Expand Down
Loading