Skip to content

Commit

Permalink
feat: auto respect rbac for discovery/sync (#532)
Browse files Browse the repository at this point in the history
* feat: respect rbac for resource exclusions

Signed-off-by: Soumya Ghosh Dastidar <[email protected]>

* feat: use list call to check for permissions

Signed-off-by: Soumya Ghosh Dastidar <[email protected]>

* feat: updated implementation to handle different levels of rbac check

Signed-off-by: Soumya Ghosh Dastidar <[email protected]>

* feat: fixed linter error

Signed-off-by: Soumya Ghosh Dastidar <[email protected]>

* feat: resolve review comments

Signed-off-by: Soumya Ghosh Dastidar <[email protected]>

---------

Signed-off-by: Soumya Ghosh Dastidar <[email protected]>
  • Loading branch information
gdsoumya authored Aug 11, 2023
1 parent ed7c77a commit 187312f
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 27 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ require (
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/go-logr/logr v1.2.2
github.com/golang/mock v1.6.0
github.com/google/gnostic v0.5.7-v3refs
github.com/spf13/cobra v1.5.0
github.com/stretchr/testify v1.7.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/protobuf v1.27.1
k8s.io/api v0.24.2
k8s.io/apiextensions-apiserver v0.24.2
k8s.io/apimachinery v0.24.2
Expand Down Expand Up @@ -42,7 +44,6 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.5.5 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
Expand Down Expand Up @@ -78,7 +79,6 @@ require (
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
Expand Down
161 changes: 136 additions & 25 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (

"github.com/go-logr/logr"
"golang.org/x/sync/semaphore"
authorizationv1 "k8s.io/api/authorization/v1"
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/errors"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -22,6 +24,8 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
authType1 "k8s.io/client-go/kubernetes/typed/authorization/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/pager"
Expand Down Expand Up @@ -55,6 +59,15 @@ const (
defaultListSemaphoreWeight = 50
)

const (
// RespectRbacDisabled default value for respectRbac
RespectRbacDisabled = iota
// RespectRbacNormal checks only api response for forbidden/unauthorized errors
RespectRbacNormal
// RespectRbacStrict checks both api response for forbidden/unauthorized errors and SelfSubjectAccessReview
RespectRbacStrict
)

type apiMeta struct {
namespaced bool
watchCancel context.CancelFunc
Expand Down Expand Up @@ -208,6 +221,8 @@ type clusterCache struct {
eventHandlers map[uint64]OnEventHandler
openAPISchema openapi.Resources
gvkParser *managedfields.GvkParser

respectRBAC int
}

type clusterCacheSync struct {
Expand Down Expand Up @@ -462,6 +477,10 @@ func (c *clusterCache) startMissingWatches() error {
if err != nil {
return err
}
clientset, err := kubernetes.NewForConfig(c.config)
if err != nil {
return err
}
namespacedResources := make(map[schema.GroupKind]bool)
for i := range apis {
api := apis[i]
Expand All @@ -470,8 +489,25 @@ func (c *clusterCache) startMissingWatches() error {
ctx, cancel := context.WithCancel(context.Background())
c.apisMeta[api.GroupKind] = &apiMeta{namespaced: api.Meta.Namespaced, watchCancel: cancel}

err = c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error {
go c.watchEvents(ctx, api, resClient, ns, "")
err := c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error {
resourceVersion, err := c.loadInitialState(ctx, api, resClient, ns)
if err != nil && c.isRestrictedResource(err) {
keep := false
if c.respectRBAC == RespectRbacStrict {
k, permErr := c.checkPermission(ctx, clientset.AuthorizationV1().SelfSubjectAccessReviews(), api)
if permErr != nil {
return fmt.Errorf("failed to check permissions for resource %s: %w, original error=%v", api.GroupKind.String(), permErr, err.Error())
}
keep = k
}
// if we are not allowed to list the resource, remove it from the watch list
if !keep {
delete(c.apisMeta, api.GroupKind)
delete(namespacedResources, api.GroupKind)
return nil
}
}
go c.watchEvents(ctx, api, resClient, ns, resourceVersion)
return nil
})
if err != nil {
Expand Down Expand Up @@ -530,6 +566,29 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso
return resourceVersion, callback(listPager)
}

func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string) (string, error) {
return c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
var items []*Resource
err := listPager.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error {
if un, ok := obj.(*unstructured.Unstructured); !ok {
return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName())
} else {
items = append(items, c.newResource(un))
}
return nil
})

if err != nil {
return fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err)
}

return runSynced(&c.lock, func() error {
c.replaceResourceCache(api.GroupKind, items, ns)
return nil
})
})
}

func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, resourceVersion string) {
kube.RetryUntilSucceed(ctx, watchResourcesRetryTimeout, fmt.Sprintf("watch %s on %s", api.GroupKind, c.config.Host), c.log, func() (err error) {
defer func() {
Expand All @@ -540,30 +599,10 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo

// load API initial state if no resource version provided
if resourceVersion == "" {
var items []*Resource
resourceVersion, err = c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
err := listPager.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error {
if un, ok := obj.(*unstructured.Unstructured); !ok {
return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName())
} else {
items = append(items, c.newResource(un))
}
return nil
})

if err != nil {
return fmt.Errorf("failed to load initial state of resource %s: %v", api.GroupKind.String(), err)
}
return nil
})

resourceVersion, err = c.loadInitialState(ctx, api, resClient, ns)
if err != nil {
return err
}

c.lock.Lock()
c.replaceResourceCache(api.GroupKind, items, ns)
c.lock.Unlock()
}

w, err := watchutil.NewRetryWatcher(resourceVersion, &cache.ListWatch{
Expand Down Expand Up @@ -687,7 +726,7 @@ func (c *clusterCache) processApi(client dynamic.Interface, api kube.APIResource
resClient := client.Resource(api.GroupVersionResource)
switch {
// if manage whole cluster or resource is cluster level and cluster resources enabled
case len(c.namespaces) == 0 || !api.Meta.Namespaced && c.clusterResources:
case len(c.namespaces) == 0 || (!api.Meta.Namespaced && c.clusterResources):
return callback(resClient, "")
// if manage some namespaces and resource is namespaced
case len(c.namespaces) != 0 && api.Meta.Namespaced:
Expand All @@ -702,6 +741,56 @@ func (c *clusterCache) processApi(client dynamic.Interface, api kube.APIResource
return nil
}

// isRestrictedResource checks if the kube api call is unauthorized or forbidden
func (c *clusterCache) isRestrictedResource(err error) bool {
return c.respectRBAC != RespectRbacDisabled && (k8sErrors.IsForbidden(err) || k8sErrors.IsUnauthorized(err))
}

// checkPermission runs a self subject access review to check if the controller has permissions to list the resource
func (c *clusterCache) checkPermission(ctx context.Context, reviewInterface authType1.SelfSubjectAccessReviewInterface, api kube.APIResourceInfo) (keep bool, err error) {
sar := &authorizationv1.SelfSubjectAccessReview{
Spec: authorizationv1.SelfSubjectAccessReviewSpec{
ResourceAttributes: &authorizationv1.ResourceAttributes{
Namespace: "*",
Verb: "list", // uses list verb to check for permissions
Resource: api.GroupVersionResource.Resource,
},
},
}

switch {
// if manage whole cluster or resource is cluster level and cluster resources enabled
case len(c.namespaces) == 0 || (!api.Meta.Namespaced && c.clusterResources):
resp, err := reviewInterface.Create(ctx, sar, metav1.CreateOptions{})
if err != nil {
return false, err
}
if resp != nil && resp.Status.Allowed {
return true, nil
}
// unsupported, remove from watch list
return false, nil
// if manage some namespaces and resource is namespaced
case len(c.namespaces) != 0 && api.Meta.Namespaced:
for _, ns := range c.namespaces {
sar.Spec.ResourceAttributes.Namespace = ns
resp, err := reviewInterface.Create(ctx, sar, metav1.CreateOptions{})
if err != nil {
return false, err
}
if resp != nil && resp.Status.Allowed {
return true, nil
} else {
// unsupported, remove from watch list
return false, nil
}
}
}
// checkPermission follows the same logic of determining namespace/cluster resource as the processApi function
// so if neither of the cases match it means the controller will not watch for it so it is safe to return true.
return true, nil
}

func (c *clusterCache) sync() error {
c.log.Info("Start syncing cluster")

Expand Down Expand Up @@ -748,6 +837,10 @@ func (c *clusterCache) sync() error {
if err != nil {
return err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return err
}
lock := sync.Mutex{}
err = kube.RunAllAsync(len(apis), func(i int) error {
api := apis[i]
Expand All @@ -773,7 +866,25 @@ func (c *clusterCache) sync() error {
})
})
if err != nil {
return fmt.Errorf("failed to load initial state of resource %s: %v", api.GroupKind.String(), err)
if c.isRestrictedResource(err) {
keep := false
if c.respectRBAC == RespectRbacStrict {
k, permErr := c.checkPermission(ctx, clientset.AuthorizationV1().SelfSubjectAccessReviews(), api)
if permErr != nil {
return fmt.Errorf("failed to check permissions for resource %s: %w, original error=%v", api.GroupKind.String(), permErr, err.Error())
}
keep = k
}
// if we are not allowed to list the resource, remove it from the watch list
if !keep {
lock.Lock()
delete(c.apisMeta, api.GroupKind)
delete(c.namespacedResources, api.GroupKind)
lock.Unlock()
return nil
}
}
return fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err)
}

go c.watchEvents(ctx, api, resClient, ns, resourceVersion)
Expand Down
12 changes: 12 additions & 0 deletions pkg/cache/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,15 @@ func SetRetryOptions(maxRetries int32, useBackoff bool, retryFunc ListRetryFunc)
cache.listRetryFunc = retryFunc
}
}

// SetRespectRBAC allows to set whether to respect the controller rbac in list/watches
func SetRespectRBAC(respectRBAC int) UpdateSettingsFunc {
return func(cache *clusterCache) {
// if invalid value is provided disable respect rbac
if respectRBAC < RespectRbacDisabled || respectRBAC > RespectRbacStrict {
cache.respectRBAC = RespectRbacDisabled
} else {
cache.respectRBAC = respectRBAC
}
}
}

0 comments on commit 187312f

Please sign in to comment.