Skip to content

Commit

Permalink
Merge pull request kosmos-io#46 from wangyizhi1/main
Browse files Browse the repository at this point in the history
feat: elevate adapter common configuration
  • Loading branch information
kosmos-robot authored Sep 15, 2023
2 parents 6a2b29a + 3307164 commit 1122911
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 75 deletions.
22 changes: 22 additions & 0 deletions pkg/knode-manager/adapters/k8s/adapter_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package k8sadapter

import (
v1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"

"github.com/kosmos.io/kosmos/pkg/knode-manager/utils/manager"
)

type AdapterConfig struct {
Client kubernetes.Interface
Master kubernetes.Interface

PodInformer v1.PodInformer
NamespaceInformer v1.NamespaceInformer
NodeInformer v1.NodeInformer
ConfigmapInformer v1.ConfigMapInformer
SecretInformer v1.SecretInformer
ServiceInformer v1.ServiceInformer

ResourceManager *manager.ResourceManager
}
32 changes: 11 additions & 21 deletions pkg/knode-manager/adapters/k8s/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
Expand All @@ -21,8 +20,8 @@ import (
)

type NodeAdapter struct {
client kubernetes.Interface
mClient kubernetes.Interface
client kubernetes.Interface
master kubernetes.Interface

cfg *config.Opts
knode *kosmosv1alpha1.Knode
Expand All @@ -35,20 +34,16 @@ type NodeAdapter struct {
stopCh <-chan struct{}
}

func NewNodeAdapter(ctx context.Context, knode *kosmosv1alpha1.Knode, wClient kubernetes.Interface, mClient kubernetes.Interface, c *config.Opts) (*NodeAdapter, error) {
func NewNodeAdapter(ctx context.Context, cr *kosmosv1alpha1.Knode, ac *AdapterConfig, c *config.Opts) (*NodeAdapter, error) {
adapter := &NodeAdapter{
client: wClient,
mClient: mClient,
cfg: c,
knode: knode,
stopCh: ctx.Done(),
client: ac.Client,
master: ac.Master,
cfg: c,
knode: cr,
stopCh: ctx.Done(),
}

informerFactory := informers.NewSharedInformerFactory(wClient, 0)
nodesInformer := informerFactory.Core().V1().Nodes().Informer()
podInformer := informerFactory.Core().V1().Pods().Informer()

_, err := nodesInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
_, err := ac.NodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: adapter.addNode,
UpdateFunc: adapter.updateNode,
DeleteFunc: adapter.deleteNode,
Expand All @@ -57,7 +52,7 @@ func NewNodeAdapter(ctx context.Context, knode *kosmosv1alpha1.Knode, wClient ku
return nil, err
}

_, err = podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
_, err = ac.PodInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: adapter.addPod,
UpdateFunc: adapter.updatePod,
DeleteFunc: adapter.deletePod,
Expand All @@ -66,16 +61,11 @@ func NewNodeAdapter(ctx context.Context, knode *kosmosv1alpha1.Knode, wClient ku
return nil, err
}

informerFactory.Start(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), nodesInformer.HasSynced, podInformer.HasSynced) {
return nil, fmt.Errorf("nodesInformer waitForCacheSync failed")
}

return adapter, nil
}

func (n *NodeAdapter) Probe(_ context.Context) error {
_, err := n.mClient.Discovery().ServerVersion()
_, err := n.master.Discovery().ServerVersion()
if err != nil {
klog.Error("Failed ping")
return fmt.Errorf("could not list master apiserver statuses: %v", err)
Expand Down
106 changes: 65 additions & 41 deletions pkg/knode-manager/adapters/k8s/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"github.com/kosmos.io/kosmos/pkg/knode-manager/utils"
Expand Down Expand Up @@ -65,57 +64,82 @@ type PodAdapter struct {
stopCh <-chan struct{}
}

func NewPodAdapter(cfg PodAdapterConfig, ignoreLabelsStr string, cc *ClientConfig, enableServiceAccount bool) (*PodAdapter, error) {
func NewPodAdapter(ctx context.Context, ac *AdapterConfig, ignoreLabelsStr string, cc *ClientConfig, enableServiceAccount bool) (*PodAdapter, error) {
ignoreLabels := strings.Split(ignoreLabelsStr, ",")
if len(cc.ClientKubeConfig) == 0 {
panic("client kubeconfig path can not be empty")
}
// client config
// var clientConfig *rest.Config
client, err := utils.NewClientFromByte(cc.ClientKubeConfig, func(config *rest.Config) {
config.QPS = float32(cc.KubeClientQPS)
config.Burst = cc.KubeClientBurst
// Set config for clientConfig
// clientConfig = config
})
if err != nil {
return nil, fmt.Errorf("could not build clientset for cluster: %v", err)

adapter := &PodAdapter{
master: ac.Master,
client: ac.Client,
ignoreLabels: ignoreLabels,
enableServiceAccount: enableServiceAccount,
clientCache: clientCache{
podLister: ac.PodInformer.Lister(),
nsLister: ac.NamespaceInformer.Lister(),
cmLister: ac.ConfigmapInformer.Lister(),
secretLister: ac.SecretInformer.Lister(),
nodeLister: ac.NodeInformer.Lister(),
},
rm: ac.ResourceManager,
updatedPod: make(chan *corev1.Pod, 100000),
stopCh: ctx.Done(),
}

// master config, maybe a real node or a pod
master, err := utils.NewClient(cfg.ConfigPath, func(config *rest.Config) {
// config.QPS = float32(opts.KubeAPIQPS)
// config.Burst = int(opts.KubeAPIBurst)
_, err := ac.PodInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: adapter.addPod,
UpdateFunc: adapter.updatePod,
DeleteFunc: adapter.deletePod,
})
if err != nil {
return nil, fmt.Errorf("could not build clientset for cluster: %v", err)
return nil, err
}

informer := kubeinformers.NewSharedInformerFactory(client, 0)
podInformer := informer.Core().V1().Pods()
nsInformer := informer.Core().V1().Namespaces()
nodeInformer := informer.Core().V1().Nodes()
cmInformer := informer.Core().V1().ConfigMaps()
secretInformer := informer.Core().V1().Secrets()
return adapter, nil
}

ctx := context.TODO()
func (p *PodAdapter) addPod(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
return
}
podCopy := pod.DeepCopy()
utils.TrimObjectMeta(&podCopy.ObjectMeta)
if utils.IsVirtualPod(podCopy) {
p.updatedPod <- podCopy
}
}

return &PodAdapter{
master: master,
client: client,
ignoreLabels: ignoreLabels,
enableServiceAccount: enableServiceAccount,
clientCache: clientCache{
podLister: podInformer.Lister(),
nsLister: nsInformer.Lister(),
cmLister: cmInformer.Lister(),
secretLister: secretInformer.Lister(),
nodeLister: nodeInformer.Lister(),
},
rm: cfg.ResourceManager,
updatedPod: make(chan *corev1.Pod, 100000),
stopCh: ctx.Done(),
}, nil
func (p *PodAdapter) updatePod(oldObj, newObj interface{}) {
oldPod, ok1 := oldObj.(*corev1.Pod)
newPod, ok2 := newObj.(*corev1.Pod)
oldCopy := oldPod.DeepCopy()
newCopy := newPod.DeepCopy()
if !ok1 || !ok2 {
return
}

if newCopy.DeletionTimestamp != nil {
newCopy.DeletionGracePeriodSeconds = nil
}

if utils.IsVirtualPod(newCopy) && (!reflect.DeepEqual(oldCopy.Status, newCopy.Status) || newCopy.DeletionTimestamp != nil) {
utils.TrimObjectMeta(&newCopy.ObjectMeta)
p.updatedPod <- newCopy
}
}

func (p *PodAdapter) deletePod(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
return
}
podCopy := pod.DeepCopy()
utils.TrimObjectMeta(&podCopy.ObjectMeta)
if utils.IsVirtualPod(podCopy) {
p.updatedPod <- podCopy
}
}

func (p *PodAdapter) createConfigMaps(ctx context.Context, configmaps []string, ns string) error {
Expand Down
84 changes: 71 additions & 13 deletions pkg/knode-manager/knode.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import (
"fmt"

"github.com/pkg/errors"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
klogv2 "k8s.io/klog/v2"

"github.com/kosmos.io/kosmos/cmd/knode-manager/app/config"
Expand All @@ -14,51 +17,82 @@ import (
k8sadapter "github.com/kosmos.io/kosmos/pkg/knode-manager/adapters/k8s"
"github.com/kosmos.io/kosmos/pkg/knode-manager/controllers"
"github.com/kosmos.io/kosmos/pkg/knode-manager/utils"
"github.com/kosmos.io/kosmos/pkg/knode-manager/utils/manager"
)

type Knode struct {
client kubernetes.Interface
master kubernetes.Interface

podController *controllers.PodController
nodeController *controllers.NodeController

informerFactory kubeinformers.SharedInformerFactory

ac *k8sadapter.AdapterConfig
}

func NewKnode(ctx context.Context, knode *kosmosv1alpha1.Knode, c *config.Opts) (*Knode, error) {
func NewKnode(ctx context.Context, knode *kosmosv1alpha1.Knode, cmdConfig *config.Opts) (*Knode, error) {
if len(knode.Spec.Kubeconfig) == 0 {
return nil, fmt.Errorf("kubeconfig of knode %s is empty", knode.Name)
}

mClient, err := utils.NewClientFromConfigPath(c.KubeConfigPath, func(config *rest.Config) {
config.QPS = c.KubeAPIQPS
config.Burst = c.KubeAPIBurst
master, err := utils.NewClientFromConfigPath(cmdConfig.KubeConfigPath, func(config *rest.Config) {
config.QPS = cmdConfig.KubeAPIQPS
config.Burst = cmdConfig.KubeAPIBurst
})
if err != nil {
return nil, fmt.Errorf("could not build clientset for master cluster: %v", err)
}

wClient, err := utils.NewClientFromBytes(knode.Spec.Kubeconfig, func(config *rest.Config) {
client, err := utils.NewClientFromBytes(knode.Spec.Kubeconfig, func(config *rest.Config) {
config.QPS = knode.Spec.KubeAPIQPS
config.Burst = knode.Spec.KubeAPIBurst
})
if err != nil {
return nil, fmt.Errorf("could not build clientset for worker cluster %s: %v", knode.Name, err)
}

informer := kubeinformers.NewSharedInformerFactory(client, 0)
podInformer := informer.Core().V1().Pods()
nsInformer := informer.Core().V1().Namespaces()
nodeInformer := informer.Core().V1().Nodes()
cmInformer := informer.Core().V1().ConfigMaps()
secretInformer := informer.Core().V1().Secrets()
serviceInformer := informer.Core().V1().Services()

rm, err := manager.NewResourceManager(podInformer.Lister(), secretInformer.Lister(), cmInformer.Lister(), serviceInformer.Lister())
if err != nil {
return nil, errors.Wrap(err, "could not create resource manager")
}

ac := &k8sadapter.AdapterConfig{
Client: client,
Master: master,
PodInformer: podInformer,
NamespaceInformer: nsInformer,
NodeInformer: nodeInformer,
ConfigmapInformer: cmInformer,
SecretInformer: secretInformer,
ServiceInformer: serviceInformer,
ResourceManager: rm,
}

var podAdapter adapters.PodHandler
var nodeAdapter adapters.NodeHandler
if knode.Spec.Type == kosmosv1alpha1.K8sAdapter {
initConfig := k8sadapter.PodAdapterConfig{}
podAdapter, err = k8sadapter.NewPodAdapter(initConfig, "", &k8sadapter.ClientConfig{}, true)
podAdapter, err = k8sadapter.NewPodAdapter(ctx, ac, "", &k8sadapter.ClientConfig{}, true)
if err != nil {
return nil, err
}

nodeAdapter, err = k8sadapter.NewNodeAdapter(ctx, knode, wClient, mClient, c)
nodeAdapter, err = k8sadapter.NewNodeAdapter(ctx, knode, ac, cmdConfig)
if err != nil {
return nil, err
}
}

dummyNode := controllers.BuildDummyNode(ctx, knode, nodeAdapter)
nc, err := controllers.NewNodeController(nodeAdapter, mClient, dummyNode)
nc, err := controllers.NewNodeController(nodeAdapter, master, dummyNode)
if err != nil {
return nil, err
}
Expand All @@ -71,15 +105,39 @@ func NewKnode(ctx context.Context, knode *kosmosv1alpha1.Knode, c *config.Opts)
}

return &Knode{
podController: pc,
nodeController: nc,
client: client,
master: master,
informerFactory: informer,
ac: ac,
podController: pc,
nodeController: nc,
}, nil
}

func (kn *Knode) Run(ctx context.Context, c *config.Opts) {
kn.informerFactory.Start(ctx.Done())

if !cache.WaitForCacheSync(ctx.Done(),
kn.ac.NodeInformer.Informer().HasSynced,
kn.ac.PodInformer.Informer().HasSynced,
kn.ac.ConfigmapInformer.Informer().HasSynced,
kn.ac.NamespaceInformer.Informer().HasSynced,
kn.ac.SecretInformer.Informer().HasSynced,
) {
klogv2.Fatal("nodesInformer waitForCacheSync failed")
}

go func() {
if err := kn.podController.Run(ctx, c.PodSyncWorkers); err != nil && errors.Cause(err) != context.Canceled {
if err := kn.podController.Run(ctx, c.PodSyncWorkers); err != nil && !errors.Is(errors.Cause(err), context.Canceled) {
klogv2.Fatal(err)
}
}()

go func() {
if err := kn.nodeController.Run(ctx); err != nil {
klogv2.Fatal(err)
}
}()

<-ctx.Done()
}

0 comments on commit 1122911

Please sign in to comment.