Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core) : reducing locks coverage to avoid potential deadlocks #1883

Merged
merged 2 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions KubeArmor/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"sort"
"strconv"
"strings"
"sync"
"time"

kc "github.com/kubearmor/KubeArmor/KubeArmor/config"
Expand Down Expand Up @@ -129,7 +130,7 @@
func ObjCommaExpandFirstDupOthers(objptr interface{}) {
if ObjCommaCanBeExpanded(objptr) {
old := reflect.ValueOf(objptr).Elem()
new := reflect.New(reflect.TypeOf(objptr).Elem()).Elem()

Check warning on line 133 in KubeArmor/common/common.go

View workflow job for this annotation

GitHub Actions / go-lint

redefinition of the built-in function new

for i := 0; i < old.Len(); i++ {
for _, f := range ObjCommaExpand(old.Index(i)) {
Expand Down Expand Up @@ -291,7 +292,11 @@
return ""
}

var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()
defer func() {
if err = stdin.Close(); err != nil {
kg.Warnf("Error closing stdin %s\n", err)
Expand All @@ -300,6 +305,9 @@
_, _ = io.WriteString(stdin, "values written to stdin are passed to cmd's standard input")
}()

// Wait for the stdin writing to complete
wg.Wait()

out, err := res.CombinedOutput()
if err != nil {
return ""
Expand Down
117 changes: 71 additions & 46 deletions KubeArmor/core/kubeUpdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ func (dm *KubeArmorDaemon) UpdateEndPointWithPod(action string, pod tp.K8sPod) {
// add the endpoint into the endpoint list
dm.EndPoints = append(dm.EndPoints, endpoints...)

dm.EndPointsLock.Unlock()

if cfg.GlobalCfg.Policy {
// update security policies
for _, endpoint := range endpoints {
Expand All @@ -360,19 +362,18 @@ func (dm *KubeArmorDaemon) UpdateEndPointWithPod(action string, pod tp.K8sPod) {
}
}

dm.EndPointsLock.Unlock()

} else if action == "MODIFIED" {
newEndPoint := tp.EndPoint{}
endpoints := []tp.EndPoint{}

dm.EndPointsLock.Lock()
dm.EndPointsLock.RLock()
for _, endPoint := range dm.EndPoints {
if pod.Metadata["namespaceName"] == endPoint.NamespaceName && pod.Metadata["podName"] == endPoint.EndPointName {
endpoints = append(endpoints, endPoint)
break
}
}
dm.EndPointsLock.Unlock()
dm.EndPointsLock.RUnlock()
if len(endpoints) == 0 {
daemon1024 marked this conversation as resolved.
Show resolved Hide resolved
// No endpoints were added as containers ID have been just added
// Same logic as ADDED
Expand Down Expand Up @@ -527,7 +528,7 @@ func (dm *KubeArmorDaemon) UpdateEndPointWithPod(action string, pod tp.K8sPod) {
}
idx++
}

dm.EndPointsLock.Unlock()
for _, endpoint := range endpoints {
if cfg.GlobalCfg.Policy {
// update security policies
Expand All @@ -543,8 +544,6 @@ func (dm *KubeArmorDaemon) UpdateEndPointWithPod(action string, pod tp.K8sPod) {
}
}
}

dm.EndPointsLock.Unlock()
}

} else { // DELETED
Expand Down Expand Up @@ -746,7 +745,7 @@ func (dm *KubeArmorDaemon) WatchK8sPods() {
if event.Type == "ADDED" || event.Type == "MODIFIED" {
exist := false

dm.K8sPodsLock.Lock()
dm.K8sPodsLock.RLock()
for _, k8spod := range dm.K8sPods {
if k8spod.Metadata["namespaceName"] == pod.Metadata["namespaceName"] && k8spod.Metadata["podName"] == pod.Metadata["podName"] {
if k8spod.Annotations["kubearmor-policy"] == "patched" {
Expand All @@ -755,7 +754,7 @@ func (dm *KubeArmorDaemon) WatchK8sPods() {
}
}
}
dm.K8sPodsLock.Unlock()
dm.K8sPodsLock.RUnlock()

if exist {
continue
Expand Down Expand Up @@ -1020,8 +1019,8 @@ func matchClusterSecurityPolicyRule(policy tp.SecurityPolicy) bool {

// GetSecurityPolicies Function
func (dm *KubeArmorDaemon) GetSecurityPolicies(identities []string, namespaceName string) []tp.SecurityPolicy {
dm.SecurityPoliciesLock.Lock()
defer dm.SecurityPoliciesLock.Unlock()
dm.SecurityPoliciesLock.RLock()
defer dm.SecurityPoliciesLock.RUnlock()

secPolicies := []tp.SecurityPolicy{}

Expand Down Expand Up @@ -1049,10 +1048,15 @@ func containsPolicy(endPointPolicies []tp.SecurityPolicy, secPolicy tp.SecurityP

// UpdateSecurityPolicy Function
func (dm *KubeArmorDaemon) UpdateSecurityPolicy(action string, secPolicyType string, secPolicy tp.SecurityPolicy) {
dm.EndPointsLock.Lock()
defer dm.EndPointsLock.Unlock()
dm.EndPointsLock.RLock()
endPointsLength := len(dm.EndPoints)
dm.EndPointsLock.RUnlock()

for idx := 0; idx < endPointsLength; idx++ {
dm.EndPointsLock.RLock()
endPoint := dm.EndPoints[idx]
dm.EndPointsLock.RUnlock()

for idx, endPoint := range dm.EndPoints {
// update a security policy
if secPolicyType == KubeArmorPolicy {
if kl.MatchIdentities(secPolicy.Spec.Selector.Identities, endPoint.Identities) && (len(secPolicy.Spec.Selector.Containers) == 0 || kl.ContainsElement(secPolicy.Spec.Selector.Containers, endPoint.ContainerName)) {
Expand All @@ -1066,36 +1070,40 @@ func (dm *KubeArmorDaemon) UpdateSecurityPolicy(action string, secPolicyType str
}
}
if new {
dm.EndPoints[idx].SecurityPolicies = append(dm.EndPoints[idx].SecurityPolicies, secPolicy)
endPoint.SecurityPolicies = append(endPoint.SecurityPolicies, secPolicy)
}
} else if action == "MODIFIED" {
for idxP, policy := range endPoint.SecurityPolicies {
if policy.Metadata["namespaceName"] == secPolicy.Metadata["namespaceName"] && policy.Metadata["policyName"] == secPolicy.Metadata["policyName"] {
dm.EndPoints[idx].SecurityPolicies[idxP] = secPolicy
endPoint.SecurityPolicies[idxP] = secPolicy
break
}
}
} else if action == "DELETED" {
// remove the given policy from the security policy list of this endpoint
for idxP, policy := range endPoint.SecurityPolicies {
if policy.Metadata["namespaceName"] == secPolicy.Metadata["namespaceName"] && policy.Metadata["policyName"] == secPolicy.Metadata["policyName"] {
dm.EndPoints[idx].SecurityPolicies = append(dm.EndPoints[idx].SecurityPolicies[:idxP], dm.EndPoints[idx].SecurityPolicies[idxP+1:]...)
endPoint.SecurityPolicies = append(endPoint.SecurityPolicies[:idxP], endPoint.SecurityPolicies[idxP+1:]...)
break
}
}
}

dm.EndPointsLock.Lock()
dm.EndPoints[idx] = endPoint
dm.EndPointsLock.Unlock()

if cfg.GlobalCfg.Policy {
// update security policies
dm.Logger.UpdateSecurityPolicies("UPDATED", dm.EndPoints[idx])
dm.Logger.UpdateSecurityPolicies("UPDATED", endPoint)

if dm.RuntimeEnforcer != nil {
if dm.EndPoints[idx].PolicyEnabled == tp.KubeArmorPolicyEnabled {
if endPoint.PolicyEnabled == tp.KubeArmorPolicyEnabled {
// enforce security policies
if !kl.ContainsElement(dm.SystemMonitor.UntrackedNamespaces, dm.EndPoints[idx].NamespaceName) {
dm.RuntimeEnforcer.UpdateSecurityPolicies(dm.EndPoints[idx])
if !kl.ContainsElement(dm.SystemMonitor.UntrackedNamespaces, endPoint.NamespaceName) {
dm.RuntimeEnforcer.UpdateSecurityPolicies(endPoint)
} else {
dm.Logger.Warnf("Policy cannot be enforced in untracked namespace %s", dm.EndPoints[idx].NamespaceName)
dm.Logger.Warnf("Policy cannot be enforced in untracked namespace %s", endPoint.NamespaceName)
}
}
}
Expand All @@ -1115,7 +1123,7 @@ func (dm *KubeArmorDaemon) UpdateSecurityPolicy(action string, secPolicyType str
}
}
if new {
dm.EndPoints[idx].SecurityPolicies = append(dm.EndPoints[idx].SecurityPolicies, secPolicy)
endPoint.SecurityPolicies = append(endPoint.SecurityPolicies, secPolicy)
}
} else if action == "MODIFIED" {
// when policy is modified and new ns is added in secPolicy.Spec.Selector.MatchExpressions[i].Values
Expand All @@ -1125,39 +1133,43 @@ func (dm *KubeArmorDaemon) UpdateSecurityPolicy(action string, secPolicyType str
if policy.Metadata["policyName"] == secPolicy.Metadata["policyName"] {
if !kl.ContainsElement(secPolicy.Spec.Selector.NamespaceList, endPoint.NamespaceName) {
// when policy is modified and this endPoint's ns is removed from secPolicy.Spec.Selector.MatchExpressions[i].Values
dm.EndPoints[idx].SecurityPolicies = append(dm.EndPoints[idx].SecurityPolicies[:idxP], dm.EndPoints[idx].SecurityPolicies[idxP+1:]...)
endPoint.SecurityPolicies = append(endPoint.SecurityPolicies[:idxP], endPoint.SecurityPolicies[idxP+1:]...)
addNewPolicy = false
break
}
dm.EndPoints[idx].SecurityPolicies[idxP] = secPolicy
endPoint.SecurityPolicies[idxP] = secPolicy
addNewPolicy = false
break
}
}
if addNewPolicy {
dm.EndPoints[idx].SecurityPolicies = append(dm.EndPoints[idx].SecurityPolicies, secPolicy)
endPoint.SecurityPolicies = append(endPoint.SecurityPolicies, secPolicy)
}
} else if action == "DELETED" {
// remove the given policy from the security policy list of this endpoint
for idxP, policy := range endPoint.SecurityPolicies {
if policy.Metadata["policyName"] == secPolicy.Metadata["policyName"] {
dm.EndPoints[idx].SecurityPolicies = append(dm.EndPoints[idx].SecurityPolicies[:idxP], dm.EndPoints[idx].SecurityPolicies[idxP+1:]...)
endPoint.SecurityPolicies = append(endPoint.SecurityPolicies[:idxP], endPoint.SecurityPolicies[idxP+1:]...)
break
}
}
}

dm.EndPointsLock.Lock()
dm.EndPoints[idx] = endPoint
dm.EndPointsLock.Unlock()

if cfg.GlobalCfg.Policy {
// update security policies
dm.Logger.UpdateSecurityPolicies("UPDATED", dm.EndPoints[idx])
dm.Logger.UpdateSecurityPolicies("UPDATED", endPoint)

if dm.RuntimeEnforcer != nil {
if dm.EndPoints[idx].PolicyEnabled == tp.KubeArmorPolicyEnabled {
if endPoint.PolicyEnabled == tp.KubeArmorPolicyEnabled {
// enforce security policies
if !kl.ContainsElement(dm.SystemMonitor.UntrackedNamespaces, dm.EndPoints[idx].NamespaceName) {
dm.RuntimeEnforcer.UpdateSecurityPolicies(dm.EndPoints[idx])
if !kl.ContainsElement(dm.SystemMonitor.UntrackedNamespaces, endPoint.NamespaceName) {
dm.RuntimeEnforcer.UpdateSecurityPolicies(endPoint)
} else {
dm.Logger.Warnf("Policy cannot be enforced in untracked namespace %s", dm.EndPoints[idx].NamespaceName)
dm.Logger.Warnf("Policy cannot be enforced in untracked namespace %s", endPoint.NamespaceName)
}
}
}
Expand Down Expand Up @@ -1830,12 +1842,17 @@ func (dm *KubeArmorDaemon) WatchClusterSecurityPolicies(timeout time.Duration) c

// UpdateHostSecurityPolicies Function
func (dm *KubeArmorDaemon) UpdateHostSecurityPolicies() {
dm.HostSecurityPoliciesLock.Lock()
defer dm.HostSecurityPoliciesLock.Unlock()
dm.HostSecurityPoliciesLock.RLock()
hostSecurityPoliciesLength := len(dm.HostSecurityPolicies)
dm.HostSecurityPoliciesLock.RUnlock()

secPolicies := []tp.HostSecurityPolicy{}

for _, policy := range dm.HostSecurityPolicies {
for idx := 0; idx < hostSecurityPoliciesLength; idx++ {
dm.EndPointsLock.RLock()
policy := dm.HostSecurityPolicies[idx]
dm.EndPointsLock.RUnlock()

if kl.MatchIdentities(policy.Spec.NodeSelector.Identities, dm.Node.Identities) {
secPolicies = append(secPolicies, policy)
}
Expand Down Expand Up @@ -2465,9 +2482,6 @@ func validateDefaultPosture(key string, ns *corev1.Namespace, defaultPosture str

// UpdateDefaultPosture Function
func (dm *KubeArmorDaemon) UpdateDefaultPosture(action string, namespace string, defaultPosture tp.DefaultPosture, annotated bool) {
dm.EndPointsLock.Lock()
defer dm.EndPointsLock.Unlock()

dm.DefaultPosturesLock.Lock()
defer dm.DefaultPosturesLock.Unlock()

Expand All @@ -2485,25 +2499,36 @@ func (dm *KubeArmorDaemon) UpdateDefaultPosture(action string, namespace string,
}
dm.Logger.UpdateDefaultPosture(action, namespace, defaultPosture)

for idx, endPoint := range dm.EndPoints {
dm.EndPointsLock.RLock()
endPointsLen := len(dm.EndPoints)
dm.EndPointsLock.RUnlock()

for idx := 0; idx < endPointsLen; idx++ {
dm.EndPointsLock.RLock()
endPoint := dm.EndPoints[idx]
dm.EndPointsLock.RUnlock()
// update a security policy
if namespace == endPoint.NamespaceName {
if dm.EndPoints[idx].DefaultPosture == defaultPosture {
if endPoint.DefaultPosture == defaultPosture {
continue
}

dm.Logger.Printf("Updating default posture for %s with %v namespace default %v", endPoint.EndPointName, dm.EndPoints[idx].DefaultPosture, defaultPosture)
dm.EndPoints[idx].DefaultPosture = defaultPosture
dm.Logger.Printf("Updating default posture for %s with %v namespace default %v", endPoint.EndPointName, endPoint.DefaultPosture, defaultPosture)
endPoint.DefaultPosture = defaultPosture

dm.EndPointsLock.Lock()
dm.EndPoints[idx] = endPoint
dm.EndPointsLock.Unlock()

if cfg.GlobalCfg.Policy {
// update security policies
if dm.RuntimeEnforcer != nil {
if dm.EndPoints[idx].PolicyEnabled == tp.KubeArmorPolicyEnabled {
if endPoint.PolicyEnabled == tp.KubeArmorPolicyEnabled {
// enforce security policies
if !kl.ContainsElement(dm.SystemMonitor.UntrackedNamespaces, dm.EndPoints[idx].NamespaceName) {
dm.RuntimeEnforcer.UpdateSecurityPolicies(dm.EndPoints[idx])
if !kl.ContainsElement(dm.SystemMonitor.UntrackedNamespaces, endPoint.NamespaceName) {
dm.RuntimeEnforcer.UpdateSecurityPolicies(endPoint)
} else {
dm.Logger.Warnf("Policy cannot be enforced in untracked namespace %s", dm.EndPoints[idx].NamespaceName)
dm.Logger.Warnf("Policy cannot be enforced in untracked namespace %s", endPoint.NamespaceName)
}

}
Expand Down
1 change: 1 addition & 0 deletions KubeArmor/core/unorchestratedUpdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@ func (dm *KubeArmorDaemon) ParseAndUpdateContainerSecurityPolicy(event tp.K8sKub
newPoint := tp.EndPoint{}
policyStatus := pb.PolicyStatus_Applied

// consider reducing coverage for this lock
dm.EndPointsLock.Lock()
defer dm.EndPointsLock.Unlock()
for idx, endPoint := range dm.EndPoints {
Expand Down
Loading