Skip to content

Commit

Permalink
Merge pull request #1883 from Prateeknandle/deadlock
Browse files Browse the repository at this point in the history
feat(core) : reducing locks coverage to avoid potential deadlocks
  • Loading branch information
daemon1024 authored Dec 6, 2024
2 parents f20ae6b + e95305e commit 67cde68
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 46 deletions.
8 changes: 8 additions & 0 deletions KubeArmor/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

kc "github.com/kubearmor/KubeArmor/KubeArmor/config"
Expand Down Expand Up @@ -291,7 +292,11 @@ func GetCommandOutputWithoutErr(cmd string, args []string) string {
return ""
}

var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()
defer func() {
if err = stdin.Close(); err != nil {
kg.Warnf("Error closing stdin %s\n", err)
Expand All @@ -300,6 +305,9 @@ func GetCommandOutputWithoutErr(cmd string, args []string) string {
_, _ = io.WriteString(stdin, "values written to stdin are passed to cmd's standard input")
}()

// Wait for the stdin writing to complete
wg.Wait()

out, err := res.CombinedOutput()
if err != nil {
return ""
Expand Down
117 changes: 71 additions & 46 deletions KubeArmor/core/kubeUpdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ func (dm *KubeArmorDaemon) UpdateEndPointWithPod(action string, pod tp.K8sPod) {
// add the endpoint into the endpoint list
dm.EndPoints = append(dm.EndPoints, endpoints...)

dm.EndPointsLock.Unlock()

if cfg.GlobalCfg.Policy {
// update security policies
for _, endpoint := range endpoints {
Expand All @@ -360,19 +362,18 @@ func (dm *KubeArmorDaemon) UpdateEndPointWithPod(action string, pod tp.K8sPod) {
}
}

dm.EndPointsLock.Unlock()

} else if action == "MODIFIED" {
newEndPoint := tp.EndPoint{}
endpoints := []tp.EndPoint{}

dm.EndPointsLock.Lock()
dm.EndPointsLock.RLock()
for _, endPoint := range dm.EndPoints {
if pod.Metadata["namespaceName"] == endPoint.NamespaceName && pod.Metadata["podName"] == endPoint.EndPointName {
endpoints = append(endpoints, endPoint)
break
}
}
dm.EndPointsLock.Unlock()
dm.EndPointsLock.RUnlock()
if len(endpoints) == 0 {
// No endpoints were added as containers ID have been just added
// Same logic as ADDED
Expand Down Expand Up @@ -527,7 +528,7 @@ func (dm *KubeArmorDaemon) UpdateEndPointWithPod(action string, pod tp.K8sPod) {
}
idx++
}

dm.EndPointsLock.Unlock()
for _, endpoint := range endpoints {
if cfg.GlobalCfg.Policy {
// update security policies
Expand All @@ -543,8 +544,6 @@ func (dm *KubeArmorDaemon) UpdateEndPointWithPod(action string, pod tp.K8sPod) {
}
}
}

dm.EndPointsLock.Unlock()
}

} else { // DELETED
Expand Down Expand Up @@ -746,7 +745,7 @@ func (dm *KubeArmorDaemon) WatchK8sPods() {
if event.Type == "ADDED" || event.Type == "MODIFIED" {
exist := false

dm.K8sPodsLock.Lock()
dm.K8sPodsLock.RLock()
for _, k8spod := range dm.K8sPods {
if k8spod.Metadata["namespaceName"] == pod.Metadata["namespaceName"] && k8spod.Metadata["podName"] == pod.Metadata["podName"] {
if k8spod.Annotations["kubearmor-policy"] == "patched" {
Expand All @@ -755,7 +754,7 @@ func (dm *KubeArmorDaemon) WatchK8sPods() {
}
}
}
dm.K8sPodsLock.Unlock()
dm.K8sPodsLock.RUnlock()

if exist {
continue
Expand Down Expand Up @@ -1020,8 +1019,8 @@ func matchClusterSecurityPolicyRule(policy tp.SecurityPolicy) bool {

// GetSecurityPolicies Function
func (dm *KubeArmorDaemon) GetSecurityPolicies(identities []string, namespaceName string) []tp.SecurityPolicy {
dm.SecurityPoliciesLock.Lock()
defer dm.SecurityPoliciesLock.Unlock()
dm.SecurityPoliciesLock.RLock()
defer dm.SecurityPoliciesLock.RUnlock()

secPolicies := []tp.SecurityPolicy{}

Expand Down Expand Up @@ -1049,10 +1048,15 @@ func containsPolicy(endPointPolicies []tp.SecurityPolicy, secPolicy tp.SecurityP

// UpdateSecurityPolicy Function
func (dm *KubeArmorDaemon) UpdateSecurityPolicy(action string, secPolicyType string, secPolicy tp.SecurityPolicy) {
dm.EndPointsLock.Lock()
defer dm.EndPointsLock.Unlock()
dm.EndPointsLock.RLock()
endPointsLength := len(dm.EndPoints)
dm.EndPointsLock.RUnlock()

for idx := 0; idx < endPointsLength; idx++ {
dm.EndPointsLock.RLock()
endPoint := dm.EndPoints[idx]
dm.EndPointsLock.RUnlock()

for idx, endPoint := range dm.EndPoints {
// update a security policy
if secPolicyType == KubeArmorPolicy {
if kl.MatchIdentities(secPolicy.Spec.Selector.Identities, endPoint.Identities) && (len(secPolicy.Spec.Selector.Containers) == 0 || kl.ContainsElement(secPolicy.Spec.Selector.Containers, endPoint.ContainerName)) {
Expand All @@ -1066,36 +1070,40 @@ func (dm *KubeArmorDaemon) UpdateSecurityPolicy(action string, secPolicyType str
}
}
if new {
dm.EndPoints[idx].SecurityPolicies = append(dm.EndPoints[idx].SecurityPolicies, secPolicy)
endPoint.SecurityPolicies = append(endPoint.SecurityPolicies, secPolicy)
}
} else if action == "MODIFIED" {
for idxP, policy := range endPoint.SecurityPolicies {
if policy.Metadata["namespaceName"] == secPolicy.Metadata["namespaceName"] && policy.Metadata["policyName"] == secPolicy.Metadata["policyName"] {
dm.EndPoints[idx].SecurityPolicies[idxP] = secPolicy
endPoint.SecurityPolicies[idxP] = secPolicy
break
}
}
} else if action == "DELETED" {
// remove the given policy from the security policy list of this endpoint
for idxP, policy := range endPoint.SecurityPolicies {
if policy.Metadata["namespaceName"] == secPolicy.Metadata["namespaceName"] && policy.Metadata["policyName"] == secPolicy.Metadata["policyName"] {
dm.EndPoints[idx].SecurityPolicies = append(dm.EndPoints[idx].SecurityPolicies[:idxP], dm.EndPoints[idx].SecurityPolicies[idxP+1:]...)
endPoint.SecurityPolicies = append(endPoint.SecurityPolicies[:idxP], endPoint.SecurityPolicies[idxP+1:]...)
break
}
}
}

dm.EndPointsLock.Lock()
dm.EndPoints[idx] = endPoint
dm.EndPointsLock.Unlock()

if cfg.GlobalCfg.Policy {
// update security policies
dm.Logger.UpdateSecurityPolicies("UPDATED", dm.EndPoints[idx])
dm.Logger.UpdateSecurityPolicies("UPDATED", endPoint)

if dm.RuntimeEnforcer != nil {
if dm.EndPoints[idx].PolicyEnabled == tp.KubeArmorPolicyEnabled {
if endPoint.PolicyEnabled == tp.KubeArmorPolicyEnabled {
// enforce security policies
if !kl.ContainsElement(dm.SystemMonitor.UntrackedNamespaces, dm.EndPoints[idx].NamespaceName) {
dm.RuntimeEnforcer.UpdateSecurityPolicies(dm.EndPoints[idx])
if !kl.ContainsElement(dm.SystemMonitor.UntrackedNamespaces, endPoint.NamespaceName) {
dm.RuntimeEnforcer.UpdateSecurityPolicies(endPoint)
} else {
dm.Logger.Warnf("Policy cannot be enforced in untracked namespace %s", dm.EndPoints[idx].NamespaceName)
dm.Logger.Warnf("Policy cannot be enforced in untracked namespace %s", endPoint.NamespaceName)
}
}
}
Expand All @@ -1115,7 +1123,7 @@ func (dm *KubeArmorDaemon) UpdateSecurityPolicy(action string, secPolicyType str
}
}
if new {
dm.EndPoints[idx].SecurityPolicies = append(dm.EndPoints[idx].SecurityPolicies, secPolicy)
endPoint.SecurityPolicies = append(endPoint.SecurityPolicies, secPolicy)
}
} else if action == "MODIFIED" {
// when policy is modified and new ns is added in secPolicy.Spec.Selector.MatchExpressions[i].Values
Expand All @@ -1125,39 +1133,43 @@ func (dm *KubeArmorDaemon) UpdateSecurityPolicy(action string, secPolicyType str
if policy.Metadata["policyName"] == secPolicy.Metadata["policyName"] {
if !kl.ContainsElement(secPolicy.Spec.Selector.NamespaceList, endPoint.NamespaceName) {
// when policy is modified and this endPoint's ns is removed from secPolicy.Spec.Selector.MatchExpressions[i].Values
dm.EndPoints[idx].SecurityPolicies = append(dm.EndPoints[idx].SecurityPolicies[:idxP], dm.EndPoints[idx].SecurityPolicies[idxP+1:]...)
endPoint.SecurityPolicies = append(endPoint.SecurityPolicies[:idxP], endPoint.SecurityPolicies[idxP+1:]...)
addNewPolicy = false
break
}
dm.EndPoints[idx].SecurityPolicies[idxP] = secPolicy
endPoint.SecurityPolicies[idxP] = secPolicy
addNewPolicy = false
break
}
}
if addNewPolicy {
dm.EndPoints[idx].SecurityPolicies = append(dm.EndPoints[idx].SecurityPolicies, secPolicy)
endPoint.SecurityPolicies = append(endPoint.SecurityPolicies, secPolicy)
}
} else if action == "DELETED" {
// remove the given policy from the security policy list of this endpoint
for idxP, policy := range endPoint.SecurityPolicies {
if policy.Metadata["policyName"] == secPolicy.Metadata["policyName"] {
dm.EndPoints[idx].SecurityPolicies = append(dm.EndPoints[idx].SecurityPolicies[:idxP], dm.EndPoints[idx].SecurityPolicies[idxP+1:]...)
endPoint.SecurityPolicies = append(endPoint.SecurityPolicies[:idxP], endPoint.SecurityPolicies[idxP+1:]...)
break
}
}
}

dm.EndPointsLock.Lock()
dm.EndPoints[idx] = endPoint
dm.EndPointsLock.Unlock()

if cfg.GlobalCfg.Policy {
// update security policies
dm.Logger.UpdateSecurityPolicies("UPDATED", dm.EndPoints[idx])
dm.Logger.UpdateSecurityPolicies("UPDATED", endPoint)

if dm.RuntimeEnforcer != nil {
if dm.EndPoints[idx].PolicyEnabled == tp.KubeArmorPolicyEnabled {
if endPoint.PolicyEnabled == tp.KubeArmorPolicyEnabled {
// enforce security policies
if !kl.ContainsElement(dm.SystemMonitor.UntrackedNamespaces, dm.EndPoints[idx].NamespaceName) {
dm.RuntimeEnforcer.UpdateSecurityPolicies(dm.EndPoints[idx])
if !kl.ContainsElement(dm.SystemMonitor.UntrackedNamespaces, endPoint.NamespaceName) {
dm.RuntimeEnforcer.UpdateSecurityPolicies(endPoint)
} else {
dm.Logger.Warnf("Policy cannot be enforced in untracked namespace %s", dm.EndPoints[idx].NamespaceName)
dm.Logger.Warnf("Policy cannot be enforced in untracked namespace %s", endPoint.NamespaceName)
}
}
}
Expand Down Expand Up @@ -1830,12 +1842,17 @@ func (dm *KubeArmorDaemon) WatchClusterSecurityPolicies(timeout time.Duration) c

// UpdateHostSecurityPolicies Function
func (dm *KubeArmorDaemon) UpdateHostSecurityPolicies() {
dm.HostSecurityPoliciesLock.Lock()
defer dm.HostSecurityPoliciesLock.Unlock()
dm.HostSecurityPoliciesLock.RLock()
hostSecurityPoliciesLength := len(dm.HostSecurityPolicies)
dm.HostSecurityPoliciesLock.RUnlock()

secPolicies := []tp.HostSecurityPolicy{}

for _, policy := range dm.HostSecurityPolicies {
for idx := 0; idx < hostSecurityPoliciesLength; idx++ {
dm.EndPointsLock.RLock()
policy := dm.HostSecurityPolicies[idx]
dm.EndPointsLock.RUnlock()

if kl.MatchIdentities(policy.Spec.NodeSelector.Identities, dm.Node.Identities) {
secPolicies = append(secPolicies, policy)
}
Expand Down Expand Up @@ -2465,9 +2482,6 @@ func validateDefaultPosture(key string, ns *corev1.Namespace, defaultPosture str

// UpdateDefaultPosture Function
func (dm *KubeArmorDaemon) UpdateDefaultPosture(action string, namespace string, defaultPosture tp.DefaultPosture, annotated bool) {
dm.EndPointsLock.Lock()
defer dm.EndPointsLock.Unlock()

dm.DefaultPosturesLock.Lock()
defer dm.DefaultPosturesLock.Unlock()

Expand All @@ -2485,25 +2499,36 @@ func (dm *KubeArmorDaemon) UpdateDefaultPosture(action string, namespace string,
}
dm.Logger.UpdateDefaultPosture(action, namespace, defaultPosture)

for idx, endPoint := range dm.EndPoints {
dm.EndPointsLock.RLock()
endPointsLen := len(dm.EndPoints)
dm.EndPointsLock.RUnlock()

for idx := 0; idx < endPointsLen; idx++ {
dm.EndPointsLock.RLock()
endPoint := dm.EndPoints[idx]
dm.EndPointsLock.RUnlock()
// update a security policy
if namespace == endPoint.NamespaceName {
if dm.EndPoints[idx].DefaultPosture == defaultPosture {
if endPoint.DefaultPosture == defaultPosture {
continue
}

dm.Logger.Printf("Updating default posture for %s with %v namespace default %v", endPoint.EndPointName, dm.EndPoints[idx].DefaultPosture, defaultPosture)
dm.EndPoints[idx].DefaultPosture = defaultPosture
dm.Logger.Printf("Updating default posture for %s with %v namespace default %v", endPoint.EndPointName, endPoint.DefaultPosture, defaultPosture)
endPoint.DefaultPosture = defaultPosture

dm.EndPointsLock.Lock()
dm.EndPoints[idx] = endPoint
dm.EndPointsLock.Unlock()

if cfg.GlobalCfg.Policy {
// update security policies
if dm.RuntimeEnforcer != nil {
if dm.EndPoints[idx].PolicyEnabled == tp.KubeArmorPolicyEnabled {
if endPoint.PolicyEnabled == tp.KubeArmorPolicyEnabled {
// enforce security policies
if !kl.ContainsElement(dm.SystemMonitor.UntrackedNamespaces, dm.EndPoints[idx].NamespaceName) {
dm.RuntimeEnforcer.UpdateSecurityPolicies(dm.EndPoints[idx])
if !kl.ContainsElement(dm.SystemMonitor.UntrackedNamespaces, endPoint.NamespaceName) {
dm.RuntimeEnforcer.UpdateSecurityPolicies(endPoint)
} else {
dm.Logger.Warnf("Policy cannot be enforced in untracked namespace %s", dm.EndPoints[idx].NamespaceName)
dm.Logger.Warnf("Policy cannot be enforced in untracked namespace %s", endPoint.NamespaceName)
}

}
Expand Down
1 change: 1 addition & 0 deletions KubeArmor/core/unorchestratedUpdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@ func (dm *KubeArmorDaemon) ParseAndUpdateContainerSecurityPolicy(event tp.K8sKub
newPoint := tp.EndPoint{}
policyStatus := pb.PolicyStatus_Applied

// consider reducing coverage for this lock
dm.EndPointsLock.Lock()
defer dm.EndPointsLock.Unlock()
for idx, endPoint := range dm.EndPoints {
Expand Down

0 comments on commit 67cde68

Please sign in to comment.