From 547d9c746cc4125631231af81f54de0907cb7350 Mon Sep 17 00:00:00 2001 From: Prateek Date: Tue, 29 Oct 2024 02:08:59 +0530 Subject: [PATCH 1/2] Enhancement : reducing locks coverage Signed-off-by: Prateek --- KubeArmor/core/kubeUpdate.go | 117 ++++++++++++++---------- KubeArmor/core/unorchestratedUpdates.go | 1 + 2 files changed, 72 insertions(+), 46 deletions(-) diff --git a/KubeArmor/core/kubeUpdate.go b/KubeArmor/core/kubeUpdate.go index 882b9b5b6..a191614b1 100644 --- a/KubeArmor/core/kubeUpdate.go +++ b/KubeArmor/core/kubeUpdate.go @@ -345,6 +345,8 @@ func (dm *KubeArmorDaemon) UpdateEndPointWithPod(action string, pod tp.K8sPod) { // add the endpoint into the endpoint list dm.EndPoints = append(dm.EndPoints, endpoints...) + dm.EndPointsLock.Unlock() + if cfg.GlobalCfg.Policy { // update security policies for _, endpoint := range endpoints { @@ -360,19 +362,18 @@ func (dm *KubeArmorDaemon) UpdateEndPointWithPod(action string, pod tp.K8sPod) { } } - dm.EndPointsLock.Unlock() - } else if action == "MODIFIED" { newEndPoint := tp.EndPoint{} endpoints := []tp.EndPoint{} - dm.EndPointsLock.Lock() + dm.EndPointsLock.RLock() for _, endPoint := range dm.EndPoints { if pod.Metadata["namespaceName"] == endPoint.NamespaceName && pod.Metadata["podName"] == endPoint.EndPointName { endpoints = append(endpoints, endPoint) + break } } - dm.EndPointsLock.Unlock() + dm.EndPointsLock.RUnlock() if len(endpoints) == 0 { // No endpoints were added as containers ID have been just added // Same logic as ADDED @@ -527,7 +528,7 @@ func (dm *KubeArmorDaemon) UpdateEndPointWithPod(action string, pod tp.K8sPod) { } idx++ } - + dm.EndPointsLock.Unlock() for _, endpoint := range endpoints { if cfg.GlobalCfg.Policy { // update security policies @@ -543,8 +544,6 @@ func (dm *KubeArmorDaemon) UpdateEndPointWithPod(action string, pod tp.K8sPod) { } } } - - dm.EndPointsLock.Unlock() } } else { // DELETED @@ -746,7 +745,7 @@ func (dm *KubeArmorDaemon) WatchK8sPods() { if event.Type == "ADDED" || event.Type == "MODIFIED" { exist := false - dm.K8sPodsLock.Lock() + dm.K8sPodsLock.RLock() for _, k8spod := range dm.K8sPods { if k8spod.Metadata["namespaceName"] == pod.Metadata["namespaceName"] && k8spod.Metadata["podName"] == pod.Metadata["podName"] { if k8spod.Annotations["kubearmor-policy"] == "patched" { @@ -755,7 +754,7 @@ func (dm *KubeArmorDaemon) WatchK8sPods() { } } } - dm.K8sPodsLock.Unlock() + dm.K8sPodsLock.RUnlock() if exist { continue @@ -1020,8 +1019,8 @@ func matchClusterSecurityPolicyRule(policy tp.SecurityPolicy) bool { // GetSecurityPolicies Function func (dm *KubeArmorDaemon) GetSecurityPolicies(identities []string, namespaceName string) []tp.SecurityPolicy { - dm.SecurityPoliciesLock.Lock() - defer dm.SecurityPoliciesLock.Unlock() + dm.SecurityPoliciesLock.RLock() + defer dm.SecurityPoliciesLock.RUnlock() secPolicies := []tp.SecurityPolicy{} @@ -1049,10 +1048,15 @@ func containsPolicy(endPointPolicies []tp.SecurityPolicy, secPolicy tp.SecurityP // UpdateSecurityPolicy Function func (dm *KubeArmorDaemon) UpdateSecurityPolicy(action string, secPolicyType string, secPolicy tp.SecurityPolicy) { - dm.EndPointsLock.Lock() - defer dm.EndPointsLock.Unlock() + dm.EndPointsLock.RLock() + endPointsLength := len(dm.EndPoints) + dm.EndPointsLock.RUnlock() + + for idx := 0; idx < endPointsLength; idx++ { + dm.EndPointsLock.RLock() + endPoint := dm.EndPoints[idx] + dm.EndPointsLock.RUnlock() - for idx, endPoint := range dm.EndPoints { // update a security policy if secPolicyType == KubeArmorPolicy { if kl.MatchIdentities(secPolicy.Spec.Selector.Identities, endPoint.Identities) && (len(secPolicy.Spec.Selector.Containers) == 0 || kl.ContainsElement(secPolicy.Spec.Selector.Containers, endPoint.ContainerName)) { @@ -1066,12 +1070,12 @@ func (dm *KubeArmorDaemon) UpdateSecurityPolicy(action string, secPolicyType str } } if new { - dm.EndPoints[idx].SecurityPolicies = append(dm.EndPoints[idx].SecurityPolicies, secPolicy) + endPoint.SecurityPolicies = append(endPoint.SecurityPolicies, secPolicy) } } else if action == "MODIFIED" { for idxP, policy := range endPoint.SecurityPolicies { if policy.Metadata["namespaceName"] == secPolicy.Metadata["namespaceName"] && policy.Metadata["policyName"] == secPolicy.Metadata["policyName"] { - dm.EndPoints[idx].SecurityPolicies[idxP] = secPolicy + endPoint.SecurityPolicies[idxP] = secPolicy break } } @@ -1079,23 +1083,27 @@ func (dm *KubeArmorDaemon) UpdateSecurityPolicy(action string, secPolicyType str // remove the given policy from the security policy list of this endpoint for idxP, policy := range endPoint.SecurityPolicies { if policy.Metadata["namespaceName"] == secPolicy.Metadata["namespaceName"] && policy.Metadata["policyName"] == secPolicy.Metadata["policyName"] { - dm.EndPoints[idx].SecurityPolicies = append(dm.EndPoints[idx].SecurityPolicies[:idxP], dm.EndPoints[idx].SecurityPolicies[idxP+1:]...) + endPoint.SecurityPolicies = append(endPoint.SecurityPolicies[:idxP], endPoint.SecurityPolicies[idxP+1:]...) break } } } + dm.EndPointsLock.Lock() + dm.EndPoints[idx] = endPoint + dm.EndPointsLock.Unlock() + if cfg.GlobalCfg.Policy { // update security policies - dm.Logger.UpdateSecurityPolicies("UPDATED", dm.EndPoints[idx]) + dm.Logger.UpdateSecurityPolicies("UPDATED", endPoint) if dm.RuntimeEnforcer != nil { - if dm.EndPoints[idx].PolicyEnabled == tp.KubeArmorPolicyEnabled { + if endPoint.PolicyEnabled == tp.KubeArmorPolicyEnabled { // enforce security policies - if !kl.ContainsElement(dm.SystemMonitor.UntrackedNamespaces, dm.EndPoints[idx].NamespaceName) { - dm.RuntimeEnforcer.UpdateSecurityPolicies(dm.EndPoints[idx]) + if !kl.ContainsElement(dm.SystemMonitor.UntrackedNamespaces, endPoint.NamespaceName) { + dm.RuntimeEnforcer.UpdateSecurityPolicies(endPoint) } else { - dm.Logger.Warnf("Policy cannot be enforced in untracked namespace %s", dm.EndPoints[idx].NamespaceName) + dm.Logger.Warnf("Policy cannot be enforced in untracked namespace %s", endPoint.NamespaceName) } } } @@ -1115,7 +1123,7 @@ func (dm *KubeArmorDaemon) UpdateSecurityPolicy(action string, secPolicyType str } } if new { - dm.EndPoints[idx].SecurityPolicies = append(dm.EndPoints[idx].SecurityPolicies, secPolicy) + endPoint.SecurityPolicies = append(endPoint.SecurityPolicies, secPolicy) } } else if action == "MODIFIED" { // when policy is modified and new ns is added in secPolicy.Spec.Selector.MatchExpressions[i].Values @@ -1125,39 +1133,43 @@ func (dm *KubeArmorDaemon) UpdateSecurityPolicy(action string, secPolicyType str if policy.Metadata["policyName"] == secPolicy.Metadata["policyName"] { if !kl.ContainsElement(secPolicy.Spec.Selector.NamespaceList, endPoint.NamespaceName) { // when policy is modified and this endPoint's ns is removed from secPolicy.Spec.Selector.MatchExpressions[i].Values - dm.EndPoints[idx].SecurityPolicies = append(dm.EndPoints[idx].SecurityPolicies[:idxP], dm.EndPoints[idx].SecurityPolicies[idxP+1:]...) + endPoint.SecurityPolicies = append(endPoint.SecurityPolicies[:idxP], endPoint.SecurityPolicies[idxP+1:]...) addNewPolicy = false break } - dm.EndPoints[idx].SecurityPolicies[idxP] = secPolicy + endPoint.SecurityPolicies[idxP] = secPolicy addNewPolicy = false break } } if addNewPolicy { - dm.EndPoints[idx].SecurityPolicies = append(dm.EndPoints[idx].SecurityPolicies, secPolicy) + endPoint.SecurityPolicies = append(endPoint.SecurityPolicies, secPolicy) } } else if action == "DELETED" { // remove the given policy from the security policy list of this endpoint for idxP, policy := range endPoint.SecurityPolicies { if policy.Metadata["policyName"] == secPolicy.Metadata["policyName"] { - dm.EndPoints[idx].SecurityPolicies = append(dm.EndPoints[idx].SecurityPolicies[:idxP], dm.EndPoints[idx].SecurityPolicies[idxP+1:]...) + endPoint.SecurityPolicies = append(endPoint.SecurityPolicies[:idxP], endPoint.SecurityPolicies[idxP+1:]...) break } } } + dm.EndPointsLock.Lock() + dm.EndPoints[idx] = endPoint + dm.EndPointsLock.Unlock() + if cfg.GlobalCfg.Policy { // update security policies - dm.Logger.UpdateSecurityPolicies("UPDATED", dm.EndPoints[idx]) + dm.Logger.UpdateSecurityPolicies("UPDATED", endPoint) if dm.RuntimeEnforcer != nil { - if dm.EndPoints[idx].PolicyEnabled == tp.KubeArmorPolicyEnabled { + if endPoint.PolicyEnabled == tp.KubeArmorPolicyEnabled { // enforce security policies - if !kl.ContainsElement(dm.SystemMonitor.UntrackedNamespaces, dm.EndPoints[idx].NamespaceName) { - dm.RuntimeEnforcer.UpdateSecurityPolicies(dm.EndPoints[idx]) + if !kl.ContainsElement(dm.SystemMonitor.UntrackedNamespaces, endPoint.NamespaceName) { + dm.RuntimeEnforcer.UpdateSecurityPolicies(endPoint) } else { - dm.Logger.Warnf("Policy cannot be enforced in untracked namespace %s", dm.EndPoints[idx].NamespaceName) + dm.Logger.Warnf("Policy cannot be enforced in untracked namespace %s", endPoint.NamespaceName) } } } @@ -1830,12 +1842,17 @@ func (dm *KubeArmorDaemon) WatchClusterSecurityPolicies(timeout time.Duration) c // UpdateHostSecurityPolicies Function func (dm *KubeArmorDaemon) UpdateHostSecurityPolicies() { - dm.HostSecurityPoliciesLock.Lock() - defer dm.HostSecurityPoliciesLock.Unlock() + dm.HostSecurityPoliciesLock.RLock() + hostSecurityPoliciesLength := len(dm.HostSecurityPolicies) + dm.HostSecurityPoliciesLock.RUnlock() secPolicies := []tp.HostSecurityPolicy{} - for _, policy := range dm.HostSecurityPolicies { + for idx := 0; idx < hostSecurityPoliciesLength; idx++ { + dm.EndPointsLock.RLock() + policy := dm.HostSecurityPolicies[idx] + dm.EndPointsLock.RUnlock() + if kl.MatchIdentities(policy.Spec.NodeSelector.Identities, dm.Node.Identities) { secPolicies = append(secPolicies, policy) } @@ -2465,9 +2482,6 @@ func validateDefaultPosture(key string, ns *corev1.Namespace, defaultPosture str // UpdateDefaultPosture Function func (dm *KubeArmorDaemon) UpdateDefaultPosture(action string, namespace string, defaultPosture tp.DefaultPosture, annotated bool) { - dm.EndPointsLock.Lock() - defer dm.EndPointsLock.Unlock() - dm.DefaultPosturesLock.Lock() defer dm.DefaultPosturesLock.Unlock() @@ -2485,25 +2499,36 @@ func (dm *KubeArmorDaemon) UpdateDefaultPosture(action string, namespace string, } dm.Logger.UpdateDefaultPosture(action, namespace, defaultPosture) - for idx, endPoint := range dm.EndPoints { + dm.EndPointsLock.RLock() + endPointsLen := len(dm.EndPoints) + dm.EndPointsLock.RUnlock() + + for idx := 0; idx < endPointsLen; idx++ { + dm.EndPointsLock.RLock() + endPoint := dm.EndPoints[idx] + dm.EndPointsLock.RUnlock() // update a security policy if namespace == endPoint.NamespaceName { - if dm.EndPoints[idx].DefaultPosture == defaultPosture { + if endPoint.DefaultPosture == defaultPosture { continue } - dm.Logger.Printf("Updating default posture for %s with %v namespace default %v", endPoint.EndPointName, dm.EndPoints[idx].DefaultPosture, defaultPosture) - dm.EndPoints[idx].DefaultPosture = defaultPosture + dm.Logger.Printf("Updating default posture for %s with %v namespace default %v", endPoint.EndPointName, endPoint.DefaultPosture, defaultPosture) + endPoint.DefaultPosture = defaultPosture + + dm.EndPointsLock.Lock() + dm.EndPoints[idx] = endPoint + dm.EndPointsLock.Unlock() if cfg.GlobalCfg.Policy { // update security policies if dm.RuntimeEnforcer != nil { - if dm.EndPoints[idx].PolicyEnabled == tp.KubeArmorPolicyEnabled { + if endPoint.PolicyEnabled == tp.KubeArmorPolicyEnabled { // enforce security policies - if !kl.ContainsElement(dm.SystemMonitor.UntrackedNamespaces, dm.EndPoints[idx].NamespaceName) { - dm.RuntimeEnforcer.UpdateSecurityPolicies(dm.EndPoints[idx]) + if !kl.ContainsElement(dm.SystemMonitor.UntrackedNamespaces, endPoint.NamespaceName) { + dm.RuntimeEnforcer.UpdateSecurityPolicies(endPoint) } else { - dm.Logger.Warnf("Policy cannot be enforced in untracked namespace %s", dm.EndPoints[idx].NamespaceName) + dm.Logger.Warnf("Policy cannot be enforced in untracked namespace %s", endPoint.NamespaceName) } } diff --git a/KubeArmor/core/unorchestratedUpdates.go b/KubeArmor/core/unorchestratedUpdates.go index 209d3dfd7..3038bdb3b 100644 --- a/KubeArmor/core/unorchestratedUpdates.go +++ b/KubeArmor/core/unorchestratedUpdates.go @@ -587,6 +587,7 @@ func (dm *KubeArmorDaemon) ParseAndUpdateContainerSecurityPolicy(event tp.K8sKub newPoint := tp.EndPoint{} policyStatus := pb.PolicyStatus_Applied + // consider reducing coverage for this lock dm.EndPointsLock.Lock() defer dm.EndPointsLock.Unlock() for idx, endPoint := range dm.EndPoints { From e95305e2b84017ccd37d5432f9c6112fed4be481 Mon Sep 17 00:00:00 2001 From: Prateek Date: Fri, 15 Nov 2024 13:33:56 +0530 Subject: [PATCH 2/2] fix: race condition by ensuring goroutine completes writing to stdin before going further Signed-off-by: Prateek --- KubeArmor/common/common.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/KubeArmor/common/common.go b/KubeArmor/common/common.go index 284a54983..8f06cc70d 100644 --- a/KubeArmor/common/common.go +++ b/KubeArmor/common/common.go @@ -18,6 +18,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" kc "github.com/kubearmor/KubeArmor/KubeArmor/config" @@ -291,7 +292,11 @@ func GetCommandOutputWithoutErr(cmd string, args []string) string { return "" } + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() defer func() { if err = stdin.Close(); err != nil { kg.Warnf("Error closing stdin %s\n", err) @@ -300,6 +305,9 @@ func GetCommandOutputWithoutErr(cmd string, args []string) string { _, _ = io.WriteString(stdin, "values written to stdin are passed to cmd's standard input") }() + // Wait for the stdin writing to complete + wg.Wait() + out, err := res.CombinedOutput() if err != nil { return ""