Skip to content

Commit

Permalink
admission/webhooks: cache whole webhook and cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Dr. Stefan Schimanski <[email protected]>
  • Loading branch information
sttts committed Sep 27, 2024
1 parent e8bc945 commit d27211a
Show file tree
Hide file tree
Showing 3 changed files with 297 additions and 106 deletions.
191 changes: 137 additions & 54 deletions pkg/admission/mutatingwebhook/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,22 @@ import (
kcpkubernetesclientset "github.com/kcp-dev/client-go/kubernetes"
"github.com/kcp-dev/logicalcluster/v3"

kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/configuration"
"k8s.io/apiserver/pkg/admission/plugin/webhook/generic"
"k8s.io/apiserver/pkg/admission/plugin/webhook/mutating"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/client-go/tools/cache"

kcpinitializers "github.com/kcp-dev/kcp/pkg/admission/initializers"
"github.com/kcp-dev/kcp/pkg/admission/validatingwebhook"
"github.com/kcp-dev/kcp/pkg/indexers"
apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1"
corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1"
kcpinformers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions"
)

Expand All @@ -56,10 +60,16 @@ type Plugin struct {
localKubeSharedInformerFactory kcpkubernetesinformers.SharedInformerFactory
globalKubeSharedInformerFactory kcpkubernetesinformers.SharedInformerFactory

getAPIBindings func(clusterName logicalcluster.Name) ([]*apisv1alpha1.APIBinding, error)
getAPIBinding func(clusterName logicalcluster.Name, gr schema.GroupResource) (*apisv1alpha1.APIBinding, error)
getLogicalCluster func(clusterName logicalcluster.Name) (*corev1alpha1.LogicalCluster, error)

managerLock sync.Mutex
managersCache map[logicalcluster.Name]generic.Source
lock sync.RWMutex
cache map[logicalcluster.Name]map[logicalcluster.Name]clusterCache // by request and hook source cluster.
}

type clusterCache struct {
source generic.Source
plugin *mutating.Plugin
}

var (
Expand All @@ -72,9 +82,8 @@ var (

func NewMutatingAdmissionWebhook(configFile io.Reader) (*Plugin, error) {
p := &Plugin{
managerLock: sync.Mutex{},
managersCache: make(map[logicalcluster.Name]generic.Source),
Handler: admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update),
cache: make(map[logicalcluster.Name]map[logicalcluster.Name]clusterCache),
Handler: admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update),
}
if configFile != nil {
config, err := io.ReadAll(configFile)
Expand All @@ -100,30 +109,11 @@ func (p *Plugin) Admit(ctx context.Context, attr admission.Attributes, o admissi
}
clusterName := cluster.Name

var config io.Reader
if len(p.config) > 0 {
config = bytes.NewReader(p.config)
}

hookSource, err := p.getHookSource(clusterName, attr.GetResource().GroupResource())
plugin, err := p.getPlugin(clusterName, attr)
if err != nil {
return err
}

plugin, err := mutating.NewMutatingWebhook(config)
if err != nil {
return fmt.Errorf("error creating mutating admission webhook: %w", err)
}

plugin.SetExternalKubeClientSet(p.kubeClusterClient.Cluster(clusterName.Path()))
plugin.SetNamespaceInformer(p.localKubeSharedInformerFactory.Core().V1().Namespaces().Cluster(clusterName))
plugin.SetHookSource(hookSource)
plugin.SetReadyFuncFromKCP(p.localKubeSharedInformerFactory.Core().V1().Namespaces().Cluster(clusterName))

if err := plugin.ValidateInitialization(); err != nil {
return fmt.Errorf("error validating MutatingWebhook initialization: %w", err)
}

// Add cluster annotation on create
if attr.GetOperation() == admission.Create {
u, ok := attr.GetObject().(metav1.Object)
Expand All @@ -138,40 +128,73 @@ func (p *Plugin) Admit(ctx context.Context, attr admission.Attributes, o admissi
return plugin.Admit(ctx, attr, o)
}

func (p *Plugin) getHookSource(clusterName logicalcluster.Name, groupResource schema.GroupResource) (generic.Source, error) {
clusterNameForGroupResource, err := p.getSourceClusterForGroupResource(clusterName, groupResource)
if err != nil {
return nil, err
func (p *Plugin) getPlugin(clusterName logicalcluster.Name, attr admission.Attributes) (*mutating.Plugin, error) {
var config io.Reader
if len(p.config) > 0 {
config = bytes.NewReader(p.config)
}

p.managerLock.Lock()
defer p.managerLock.Unlock()
if _, ok := p.managersCache[clusterNameForGroupResource]; !ok {
p.managersCache[clusterNameForGroupResource] = configuration.NewMutatingWebhookConfigurationManagerForInformer(
p.globalKubeSharedInformerFactory.Admissionregistration().V1().MutatingWebhookConfigurations().Cluster(clusterNameForGroupResource),
)
// get the APIBinding for the resource, or nil for local resources
gr := attr.GetResource().GroupResource()
binding, err := p.getAPIBinding(clusterName, gr)
if err != nil && !kerrors.IsNotFound(err) {
return nil, fmt.Errorf("error getting APIBinding for %q: %w", gr, err)
}
sourceClusterName := clusterName
if binding != nil {
sourceClusterName = logicalcluster.Name(binding.Status.APIExportClusterName)
}

return p.managersCache[clusterNameForGroupResource], nil
}
// fast path
p.lock.RLock()
c, ok := p.cache[clusterName][sourceClusterName]
p.lock.RUnlock()
if ok {
return c.plugin, nil
}

func (p *Plugin) getSourceClusterForGroupResource(clusterName logicalcluster.Name, groupResource schema.GroupResource) (logicalcluster.Name, error) {
objs, err := p.getAPIBindings(clusterName)
// slow path
p.lock.Lock()
defer p.lock.Unlock()
c, ok = p.cache[clusterName][sourceClusterName]
if ok {
return c.plugin, nil
}

// double check that the logical cluster is still alive
if _, err := p.getLogicalCluster(clusterName); err != nil {
return nil, fmt.Errorf("error getting LogicalCluster %q: %w", clusterName, err)
}

// create new plugin for this logical cluster and source cluster
source := configuration.NewMutatingWebhookConfigurationManagerForInformer(
// TODO(sttts): fix supporting local admission webhooks for bound resources as well
p.globalKubeSharedInformerFactory.Admissionregistration().V1().MutatingWebhookConfigurations().Cluster(sourceClusterName),
)
plugin, err := mutating.NewMutatingWebhook(config)
if err != nil {
return "", err
return nil, fmt.Errorf("error creating mutaing admission webhook: %w", err)
}
plugin.SetExternalKubeClientSet(p.kubeClusterClient.Cluster(clusterName.Path()))
plugin.SetNamespaceInformer(p.localKubeSharedInformerFactory.Core().V1().Namespaces().Cluster(clusterName))
plugin.SetHookSource(source)
plugin.SetReadyFuncFromKCP(p.localKubeSharedInformerFactory.Core().V1().Namespaces().Cluster(clusterName))

for _, apiBinding := range objs {
for _, br := range apiBinding.Status.BoundResources {
if br.Group == groupResource.Group && br.Resource == groupResource.Resource {
// GroupResource comes from an APIBinding/APIExport
return logicalcluster.Name(apiBinding.Status.APIExportClusterName), nil
}
}
if err := plugin.ValidateInitialization(); err != nil {
return nil, fmt.Errorf("error mutaing MutatingAdmissionWebhook initialization: %w", err)
}

// store in cache
c = clusterCache{
source: source,
plugin: plugin,
}
if _, ok := p.cache[clusterName]; !ok {
p.cache[clusterName] = map[logicalcluster.Name]clusterCache{}
}
p.cache[clusterName][sourceClusterName] = c

// GroupResource is local to this cluster
return clusterName, nil
return c.plugin, nil
}

func (p *Plugin) ValidateInitialization() error {
Expand All @@ -196,8 +219,68 @@ func (p *Plugin) SetKubeInformers(local, global kcpkubernetesinformers.SharedInf
p.globalKubeSharedInformerFactory = global
}

func (p *Plugin) SetKcpInformers(local, global kcpinformers.SharedInformerFactory) {
p.getAPIBindings = func(clusterName logicalcluster.Name) ([]*apisv1alpha1.APIBinding, error) {
return local.Apis().V1alpha1().APIBindings().Lister().Cluster(clusterName).List(labels.Everything())
func (p *Plugin) SetKcpInformers(local, _ kcpinformers.SharedInformerFactory) {
// watch APIBindings
_ = local.Apis().V1alpha1().APIBindings().Informer().AddIndexers(cache.Indexers{
indexers.APIBindingByBoundResources: indexers.IndexAPIBindingByBoundResources,
indexers.APIBindingsByAPIExportCluster: indexers.IndexAPIBindingsByAPIExportCluster,
}) // ignore conflict
p.getAPIBinding = func(clusterName logicalcluster.Name, gr schema.GroupResource) (*apisv1alpha1.APIBinding, error) {
key := indexers.APIBindingBoundResourceValue(clusterName, gr.Resource, gr.Group)
objs, err := local.Apis().V1alpha1().APIBindings().Informer().GetIndexer().ByIndex(indexers.APIBindingByBoundResources, key)
if err != nil {
return nil, fmt.Errorf("error getting APIBindings by bound resources: %w", err)
}
switch len(objs) {
case 0:
return nil, kerrors.NewNotFound(apisv1alpha1.Resource("APIBinding"), key)
case 1:
return objs[0].(*apisv1alpha1.APIBinding), nil
default:
// should never happen
return nil, fmt.Errorf("found multiple APIBindings for bound resources %q", key)
}
}
_, err := local.Apis().V1alpha1().APIBindings().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
p.lock.Lock()
defer p.lock.Unlock()

// delete if there is no other binding from the same logical cluster
binding := obj.(*apisv1alpha1.APIBinding)
key := indexers.APIBindingByBoundResourceValue(logicalcluster.From(binding), logicalcluster.Name(binding.Status.APIExportClusterName))
objs, err := local.Apis().V1alpha1().APIBindings().Informer().GetIndexer().ByIndex(indexers.APIBindingsByAPIExportCluster, key)
if err != nil {
runtime.HandleError(fmt.Errorf("error getting APIBindings by APIExportCluster: %w", err))
return
}
foundOther := false
for _, obj := range objs {
otherBinding := obj.(*apisv1alpha1.APIBinding)
if otherBinding.Name != binding.Name {
foundOther = true
break
}
}
if !foundOther {
delete(p.cache[logicalcluster.From(binding)], logicalcluster.Name(binding.Status.APIExportClusterName))
}
},
})

// watch logical clusters
p.getLogicalCluster = func(clusterName logicalcluster.Name) (*corev1alpha1.LogicalCluster, error) {
return local.Core().V1alpha1().LogicalClusters().Lister().Cluster(clusterName).Get(corev1alpha1.LogicalClusterName)
}
_, err = local.Core().V1alpha1().LogicalClusters().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
clusterName := logicalcluster.From(obj.(*corev1alpha1.LogicalCluster))
p.lock.Lock()
defer p.lock.Unlock()
delete(p.cache, clusterName)
},
})
if err != nil {
runtime.HandleError(fmt.Errorf("error adding LogicalCluster delete event handler: %w", err))
}
}
Loading

0 comments on commit d27211a

Please sign in to comment.