Skip to content

Commit 3307164

Browse files
committed
feat: elevate adapter common configuration
Signed-off-by: wangyizhi1 <[email protected]>
1 parent 40a76dc commit 3307164

File tree

4 files changed

+169
-75
lines changed

4 files changed

+169
-75
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package k8sadapter
2+
3+
import (
4+
v1 "k8s.io/client-go/informers/core/v1"
5+
"k8s.io/client-go/kubernetes"
6+
7+
"github.com/kosmos.io/kosmos/pkg/knode-manager/utils/manager"
8+
)
9+
10+
type AdapterConfig struct {
11+
Client kubernetes.Interface
12+
Master kubernetes.Interface
13+
14+
PodInformer v1.PodInformer
15+
NamespaceInformer v1.NamespaceInformer
16+
NodeInformer v1.NodeInformer
17+
ConfigmapInformer v1.ConfigMapInformer
18+
SecretInformer v1.SecretInformer
19+
ServiceInformer v1.ServiceInformer
20+
21+
ResourceManager *manager.ResourceManager
22+
}

pkg/knode-manager/adapters/k8s/node.go

+11-21
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"k8s.io/apimachinery/pkg/api/resource"
1010
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1111
"k8s.io/apimachinery/pkg/fields"
12-
"k8s.io/client-go/informers"
1312
"k8s.io/client-go/kubernetes"
1413
"k8s.io/client-go/tools/cache"
1514
"k8s.io/klog/v2"
@@ -21,8 +20,8 @@ import (
2120
)
2221

2322
type NodeAdapter struct {
24-
client kubernetes.Interface
25-
mClient kubernetes.Interface
23+
client kubernetes.Interface
24+
master kubernetes.Interface
2625

2726
cfg *config.Opts
2827
knode *kosmosv1alpha1.Knode
@@ -35,20 +34,16 @@ type NodeAdapter struct {
3534
stopCh <-chan struct{}
3635
}
3736

38-
func NewNodeAdapter(ctx context.Context, knode *kosmosv1alpha1.Knode, wClient kubernetes.Interface, mClient kubernetes.Interface, c *config.Opts) (*NodeAdapter, error) {
37+
func NewNodeAdapter(ctx context.Context, cr *kosmosv1alpha1.Knode, ac *AdapterConfig, c *config.Opts) (*NodeAdapter, error) {
3938
adapter := &NodeAdapter{
40-
client: wClient,
41-
mClient: mClient,
42-
cfg: c,
43-
knode: knode,
44-
stopCh: ctx.Done(),
39+
client: ac.Client,
40+
master: ac.Master,
41+
cfg: c,
42+
knode: cr,
43+
stopCh: ctx.Done(),
4544
}
4645

47-
informerFactory := informers.NewSharedInformerFactory(wClient, 0)
48-
nodesInformer := informerFactory.Core().V1().Nodes().Informer()
49-
podInformer := informerFactory.Core().V1().Pods().Informer()
50-
51-
_, err := nodesInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
46+
_, err := ac.NodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
5247
AddFunc: adapter.addNode,
5348
UpdateFunc: adapter.updateNode,
5449
DeleteFunc: adapter.deleteNode,
@@ -57,7 +52,7 @@ func NewNodeAdapter(ctx context.Context, knode *kosmosv1alpha1.Knode, wClient ku
5752
return nil, err
5853
}
5954

60-
_, err = podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
55+
_, err = ac.PodInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
6156
AddFunc: adapter.addPod,
6257
UpdateFunc: adapter.updatePod,
6358
DeleteFunc: adapter.deletePod,
@@ -66,16 +61,11 @@ func NewNodeAdapter(ctx context.Context, knode *kosmosv1alpha1.Knode, wClient ku
6661
return nil, err
6762
}
6863

69-
informerFactory.Start(ctx.Done())
70-
if !cache.WaitForCacheSync(ctx.Done(), nodesInformer.HasSynced, podInformer.HasSynced) {
71-
return nil, fmt.Errorf("nodesInformer waitForCacheSync failed")
72-
}
73-
7464
return adapter, nil
7565
}
7666

7767
func (n *NodeAdapter) Probe(_ context.Context) error {
78-
_, err := n.mClient.Discovery().ServerVersion()
68+
_, err := n.master.Discovery().ServerVersion()
7969
if err != nil {
8070
klog.Error("Failed ping")
8171
return fmt.Errorf("could not list master apiserver statuses: %v", err)

pkg/knode-manager/adapters/k8s/pod.go

+65-41
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,9 @@ import (
1212
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1313
"k8s.io/apimachinery/pkg/labels"
1414
"k8s.io/apimachinery/pkg/util/wait"
15-
kubeinformers "k8s.io/client-go/informers"
1615
"k8s.io/client-go/kubernetes"
1716
v1 "k8s.io/client-go/listers/core/v1"
18-
"k8s.io/client-go/rest"
17+
"k8s.io/client-go/tools/cache"
1918
"k8s.io/klog/v2"
2019

2120
"github.com/kosmos.io/kosmos/pkg/knode-manager/utils"
@@ -65,57 +64,82 @@ type PodAdapter struct {
6564
stopCh <-chan struct{}
6665
}
6766

68-
func NewPodAdapter(cfg PodAdapterConfig, ignoreLabelsStr string, cc *ClientConfig, enableServiceAccount bool) (*PodAdapter, error) {
67+
func NewPodAdapter(ctx context.Context, ac *AdapterConfig, ignoreLabelsStr string, cc *ClientConfig, enableServiceAccount bool) (*PodAdapter, error) {
6968
ignoreLabels := strings.Split(ignoreLabelsStr, ",")
7069
if len(cc.ClientKubeConfig) == 0 {
7170
panic("client kubeconfig path can not be empty")
7271
}
73-
// client config
74-
// var clientConfig *rest.Config
75-
client, err := utils.NewClientFromByte(cc.ClientKubeConfig, func(config *rest.Config) {
76-
config.QPS = float32(cc.KubeClientQPS)
77-
config.Burst = cc.KubeClientBurst
78-
// Set config for clientConfig
79-
// clientConfig = config
80-
})
81-
if err != nil {
82-
return nil, fmt.Errorf("could not build clientset for cluster: %v", err)
72+
73+
adapter := &PodAdapter{
74+
master: ac.Master,
75+
client: ac.Client,
76+
ignoreLabels: ignoreLabels,
77+
enableServiceAccount: enableServiceAccount,
78+
clientCache: clientCache{
79+
podLister: ac.PodInformer.Lister(),
80+
nsLister: ac.NamespaceInformer.Lister(),
81+
cmLister: ac.ConfigmapInformer.Lister(),
82+
secretLister: ac.SecretInformer.Lister(),
83+
nodeLister: ac.NodeInformer.Lister(),
84+
},
85+
rm: ac.ResourceManager,
86+
updatedPod: make(chan *corev1.Pod, 100000),
87+
stopCh: ctx.Done(),
8388
}
8489

85-
// master config, maybe a real node or a pod
86-
master, err := utils.NewClient(cfg.ConfigPath, func(config *rest.Config) {
87-
// config.QPS = float32(opts.KubeAPIQPS)
88-
// config.Burst = int(opts.KubeAPIBurst)
90+
_, err := ac.PodInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
91+
AddFunc: adapter.addPod,
92+
UpdateFunc: adapter.updatePod,
93+
DeleteFunc: adapter.deletePod,
8994
})
9095
if err != nil {
91-
return nil, fmt.Errorf("could not build clientset for cluster: %v", err)
96+
return nil, err
9297
}
9398

94-
informer := kubeinformers.NewSharedInformerFactory(client, 0)
95-
podInformer := informer.Core().V1().Pods()
96-
nsInformer := informer.Core().V1().Namespaces()
97-
nodeInformer := informer.Core().V1().Nodes()
98-
cmInformer := informer.Core().V1().ConfigMaps()
99-
secretInformer := informer.Core().V1().Secrets()
99+
return adapter, nil
100+
}
100101

101-
ctx := context.TODO()
102+
func (p *PodAdapter) addPod(obj interface{}) {
103+
pod, ok := obj.(*corev1.Pod)
104+
if !ok {
105+
return
106+
}
107+
podCopy := pod.DeepCopy()
108+
utils.TrimObjectMeta(&podCopy.ObjectMeta)
109+
if utils.IsVirtualPod(podCopy) {
110+
p.updatedPod <- podCopy
111+
}
112+
}
102113

103-
return &PodAdapter{
104-
master: master,
105-
client: client,
106-
ignoreLabels: ignoreLabels,
107-
enableServiceAccount: enableServiceAccount,
108-
clientCache: clientCache{
109-
podLister: podInformer.Lister(),
110-
nsLister: nsInformer.Lister(),
111-
cmLister: cmInformer.Lister(),
112-
secretLister: secretInformer.Lister(),
113-
nodeLister: nodeInformer.Lister(),
114-
},
115-
rm: cfg.ResourceManager,
116-
updatedPod: make(chan *corev1.Pod, 100000),
117-
stopCh: ctx.Done(),
118-
}, nil
114+
func (p *PodAdapter) updatePod(oldObj, newObj interface{}) {
115+
oldPod, ok1 := oldObj.(*corev1.Pod)
116+
newPod, ok2 := newObj.(*corev1.Pod)
117+
oldCopy := oldPod.DeepCopy()
118+
newCopy := newPod.DeepCopy()
119+
if !ok1 || !ok2 {
120+
return
121+
}
122+
123+
if newCopy.DeletionTimestamp != nil {
124+
newCopy.DeletionGracePeriodSeconds = nil
125+
}
126+
127+
if utils.IsVirtualPod(newCopy) && (!reflect.DeepEqual(oldCopy.Status, newCopy.Status) || newCopy.DeletionTimestamp != nil) {
128+
utils.TrimObjectMeta(&newCopy.ObjectMeta)
129+
p.updatedPod <- newCopy
130+
}
131+
}
132+
133+
func (p *PodAdapter) deletePod(obj interface{}) {
134+
pod, ok := obj.(*corev1.Pod)
135+
if !ok {
136+
return
137+
}
138+
podCopy := pod.DeepCopy()
139+
utils.TrimObjectMeta(&podCopy.ObjectMeta)
140+
if utils.IsVirtualPod(podCopy) {
141+
p.updatedPod <- podCopy
142+
}
119143
}
120144

121145
func (p *PodAdapter) createConfigMaps(ctx context.Context, configmaps []string, ns string) error {

pkg/knode-manager/knode.go

+71-13
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ import (
55
"fmt"
66

77
"github.com/pkg/errors"
8+
kubeinformers "k8s.io/client-go/informers"
9+
"k8s.io/client-go/kubernetes"
810
"k8s.io/client-go/rest"
11+
"k8s.io/client-go/tools/cache"
912
klogv2 "k8s.io/klog/v2"
1013

1114
"github.com/kosmos.io/kosmos/cmd/knode-manager/app/config"
@@ -14,51 +17,82 @@ import (
1417
k8sadapter "github.com/kosmos.io/kosmos/pkg/knode-manager/adapters/k8s"
1518
"github.com/kosmos.io/kosmos/pkg/knode-manager/controllers"
1619
"github.com/kosmos.io/kosmos/pkg/knode-manager/utils"
20+
"github.com/kosmos.io/kosmos/pkg/knode-manager/utils/manager"
1721
)
1822

1923
type Knode struct {
24+
client kubernetes.Interface
25+
master kubernetes.Interface
26+
2027
podController *controllers.PodController
2128
nodeController *controllers.NodeController
29+
30+
informerFactory kubeinformers.SharedInformerFactory
31+
32+
ac *k8sadapter.AdapterConfig
2233
}
2334

24-
func NewKnode(ctx context.Context, knode *kosmosv1alpha1.Knode, c *config.Opts) (*Knode, error) {
35+
func NewKnode(ctx context.Context, knode *kosmosv1alpha1.Knode, cmdConfig *config.Opts) (*Knode, error) {
2536
if len(knode.Spec.Kubeconfig) == 0 {
2637
return nil, fmt.Errorf("kubeconfig of knode %s is empty", knode.Name)
2738
}
2839

29-
mClient, err := utils.NewClientFromConfigPath(c.KubeConfigPath, func(config *rest.Config) {
30-
config.QPS = c.KubeAPIQPS
31-
config.Burst = c.KubeAPIBurst
40+
master, err := utils.NewClientFromConfigPath(cmdConfig.KubeConfigPath, func(config *rest.Config) {
41+
config.QPS = cmdConfig.KubeAPIQPS
42+
config.Burst = cmdConfig.KubeAPIBurst
3243
})
3344
if err != nil {
3445
return nil, fmt.Errorf("could not build clientset for master cluster: %v", err)
3546
}
3647

37-
wClient, err := utils.NewClientFromBytes(knode.Spec.Kubeconfig, func(config *rest.Config) {
48+
client, err := utils.NewClientFromBytes(knode.Spec.Kubeconfig, func(config *rest.Config) {
3849
config.QPS = knode.Spec.KubeAPIQPS
3950
config.Burst = knode.Spec.KubeAPIBurst
4051
})
4152
if err != nil {
4253
return nil, fmt.Errorf("could not build clientset for worker cluster %s: %v", knode.Name, err)
4354
}
4455

56+
informer := kubeinformers.NewSharedInformerFactory(client, 0)
57+
podInformer := informer.Core().V1().Pods()
58+
nsInformer := informer.Core().V1().Namespaces()
59+
nodeInformer := informer.Core().V1().Nodes()
60+
cmInformer := informer.Core().V1().ConfigMaps()
61+
secretInformer := informer.Core().V1().Secrets()
62+
serviceInformer := informer.Core().V1().Services()
63+
64+
rm, err := manager.NewResourceManager(podInformer.Lister(), secretInformer.Lister(), cmInformer.Lister(), serviceInformer.Lister())
65+
if err != nil {
66+
return nil, errors.Wrap(err, "could not create resource manager")
67+
}
68+
69+
ac := &k8sadapter.AdapterConfig{
70+
Client: client,
71+
Master: master,
72+
PodInformer: podInformer,
73+
NamespaceInformer: nsInformer,
74+
NodeInformer: nodeInformer,
75+
ConfigmapInformer: cmInformer,
76+
SecretInformer: secretInformer,
77+
ServiceInformer: serviceInformer,
78+
ResourceManager: rm,
79+
}
80+
4581
var podAdapter adapters.PodHandler
4682
var nodeAdapter adapters.NodeHandler
4783
if knode.Spec.Type == kosmosv1alpha1.K8sAdapter {
48-
initConfig := k8sadapter.PodAdapterConfig{}
49-
podAdapter, err = k8sadapter.NewPodAdapter(initConfig, "", &k8sadapter.ClientConfig{}, true)
84+
podAdapter, err = k8sadapter.NewPodAdapter(ctx, ac, "", &k8sadapter.ClientConfig{}, true)
5085
if err != nil {
5186
return nil, err
5287
}
53-
54-
nodeAdapter, err = k8sadapter.NewNodeAdapter(ctx, knode, wClient, mClient, c)
88+
nodeAdapter, err = k8sadapter.NewNodeAdapter(ctx, knode, ac, cmdConfig)
5589
if err != nil {
5690
return nil, err
5791
}
5892
}
5993

6094
dummyNode := controllers.BuildDummyNode(ctx, knode, nodeAdapter)
61-
nc, err := controllers.NewNodeController(nodeAdapter, mClient, dummyNode)
95+
nc, err := controllers.NewNodeController(nodeAdapter, master, dummyNode)
6296
if err != nil {
6397
return nil, err
6498
}
@@ -71,15 +105,39 @@ func NewKnode(ctx context.Context, knode *kosmosv1alpha1.Knode, c *config.Opts)
71105
}
72106

73107
return &Knode{
74-
podController: pc,
75-
nodeController: nc,
108+
client: client,
109+
master: master,
110+
informerFactory: informer,
111+
ac: ac,
112+
podController: pc,
113+
nodeController: nc,
76114
}, nil
77115
}
78116

79117
func (kn *Knode) Run(ctx context.Context, c *config.Opts) {
118+
kn.informerFactory.Start(ctx.Done())
119+
120+
if !cache.WaitForCacheSync(ctx.Done(),
121+
kn.ac.NodeInformer.Informer().HasSynced,
122+
kn.ac.PodInformer.Informer().HasSynced,
123+
kn.ac.ConfigmapInformer.Informer().HasSynced,
124+
kn.ac.NamespaceInformer.Informer().HasSynced,
125+
kn.ac.SecretInformer.Informer().HasSynced,
126+
) {
127+
klogv2.Fatal("nodesInformer waitForCacheSync failed")
128+
}
129+
80130
go func() {
81-
if err := kn.podController.Run(ctx, c.PodSyncWorkers); err != nil && errors.Cause(err) != context.Canceled {
131+
if err := kn.podController.Run(ctx, c.PodSyncWorkers); err != nil && !errors.Is(errors.Cause(err), context.Canceled) {
82132
klogv2.Fatal(err)
83133
}
84134
}()
135+
136+
go func() {
137+
if err := kn.nodeController.Run(ctx); err != nil {
138+
klogv2.Fatal(err)
139+
}
140+
}()
141+
142+
<-ctx.Done()
85143
}

0 commit comments

Comments
 (0)