From 6c71692b5542232c36675918ec4dd28890a5ea1c Mon Sep 17 00:00:00 2001 From: zhangyongxi Date: Fri, 22 Sep 2023 19:14:44 +0800 Subject: [PATCH] Make knode be runnable Signed-off-by: zhangyongxi --- pkg/knode-manager/adapters/k8s/node.go | 32 ++++---- pkg/knode-manager/controllers/node.go | 2 +- pkg/knode-manager/controllers/pod.go | 46 ++++++----- pkg/knode-manager/knode.go | 103 ++++++++++++------------- 4 files changed, 93 insertions(+), 90 deletions(-) diff --git a/pkg/knode-manager/adapters/k8s/node.go b/pkg/knode-manager/adapters/k8s/node.go index 64453910b..89d0788c1 100644 --- a/pkg/knode-manager/adapters/k8s/node.go +++ b/pkg/knode-manager/adapters/k8s/node.go @@ -9,7 +9,9 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" + v1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" @@ -28,6 +30,9 @@ type NodeAdapter struct { nodeResource resources.NodeResource + clientNodeLister v1.NodeLister + clientPodLister v1.PodLister + updatedNode chan *corev1.Node updatedPod chan *corev1.Pod @@ -36,11 +41,13 @@ type NodeAdapter struct { func NewNodeAdapter(ctx context.Context, cr *kosmosv1alpha1.Knode, ac *AdapterConfig, c *config.Opts) (*NodeAdapter, error) { adapter := &NodeAdapter{ - client: ac.Client, - master: ac.Master, - cfg: c, - knode: cr, - stopCh: ctx.Done(), + client: ac.Client, + master: ac.Master, + cfg: c, + knode: cr, + stopCh: ctx.Done(), + clientNodeLister: ac.NodeInformer.Lister(), + clientPodLister: ac.PodInformer.Lister(), } _, err := ac.NodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -101,18 +108,18 @@ func (n *NodeAdapter) Configure(ctx context.Context, node *corev1.Node) { return } - nodes, err := n.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + nodes, err := n.clientNodeLister.List(labels.Everything()) if err != nil { return } nodeResource := resources.NewResource() - for _, n := range nodes.Items { + for _, n := range nodes { if n.Spec.Unschedulable { continue } - if !checkNodeStatusReady(node) { - klog.Infof("Node %v not ready", node.Name) + if !checkNodeStatusReady(n) { + klog.Infof("Node %v not ready", n.Name) continue } nc := resources.ConvertToResource(n.Status.Capacity) @@ -339,12 +346,11 @@ func (n *NodeAdapter) updatePodResources(old, new *corev1.Pod) { func (n *NodeAdapter) getResourceFromPods(ctx context.Context) *resources.Resource { podResource := resources.NewResource() - pods, err := n.client.CoreV1().Pods("").List(ctx, metav1.ListOptions{}) + pods, err := n.clientPodLister.List(labels.Everything()) if err != nil { return podResource } - for _, pod := range pods.Items { - pod := pod + for _, pod := range pods { if pod.Status.Phase == corev1.PodPending && pod.Spec.NodeName != "" || pod.Status.Phase == corev1.PodRunning { nodeName := pod.Spec.NodeName @@ -356,7 +362,7 @@ func (n *NodeAdapter) getResourceFromPods(ctx context.Context) *resources.Resour if node.Spec.Unschedulable || !checkNodeStatusReady(node) { continue } - res := GetRequestFromPod(&pod) + res := GetRequestFromPod(pod) res.Pods = resource.MustParse("1") podResource.Add(res) } diff --git a/pkg/knode-manager/controllers/node.go b/pkg/knode-manager/controllers/node.go index 49ff8e70f..643a0dfc8 100644 --- a/pkg/knode-manager/controllers/node.go +++ b/pkg/knode-manager/controllers/node.go @@ -99,6 +99,7 @@ func NewNodeController(adapter adapters.NodeHandler, client kubernetes.Interface } func (n *NodeController) Run(ctx context.Context) error { + n.group.StartWithContext(ctx, n.nodeProbeController.Run) err := n.applyNode(ctx) if err != nil { return err @@ -109,7 +110,6 @@ func (n *NodeController) Run(ctx context.Context) error { n.statusUpdateChan <- node }) - n.group.StartWithContext(ctx, n.nodeProbeController.Run) n.group.StartWithContext(ctx, n.leaseController.Run) return n.sync(ctx) diff --git a/pkg/knode-manager/controllers/pod.go b/pkg/knode-manager/controllers/pod.go index 0d32103fa..007a1aa46 100644 --- a/pkg/knode-manager/controllers/pod.go +++ b/pkg/knode-manager/controllers/pod.go @@ -148,15 +148,14 @@ func NewPodController(cfg PodConfig) (*PodController, error) { } func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr error) { - ctx, cancel := context.WithCancel(ctx) + pc.ctx = ctx + ctx, cancel := context.WithCancel(pc.ctx) defer cancel() pc.podHandler.Notify(ctx, func(pod *corev1.Pod) { pc.enqueuePodStatusUpdate(ctx, pod.DeepCopy()) }) - pc.asyncPodFromKubeWorker.Run(1, ctx.Done()) - var eventHandler cache.ResourceEventHandler = cache.ResourceEventHandlerFuncs{ AddFunc: func(pod interface{}) { if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil { @@ -180,20 +179,22 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er aPod := obj.(*attendPod) aPod.Lock() - defer aPod.Unlock() - tmpPod := &corev1.Pod{ - Status: aPod.lastPodStatusReceivedFromAdapter.Status, - ObjectMeta: metav1.ObjectMeta{ - Annotations: aPod.lastPodStatusReceivedFromAdapter.Annotations, - Labels: aPod.lastPodStatusReceivedFromAdapter.Labels, - Finalizers: aPod.lastPodStatusReceivedFromAdapter.Finalizers, - }, - } - if aPod.lastPodStatusUpdateSkipped && podutils.IsChange(newPod, tmpPod) { - pc.asyncPodFromKubeWorker.Add(key) - // TODO: Reset this to avoid re-adding it continuously - aPod.lastPodStatusUpdateSkipped = false + if aPod.lastPodStatusReceivedFromAdapter != nil { + tmpPod := &corev1.Pod{ + Status: aPod.lastPodStatusReceivedFromAdapter.Status, + ObjectMeta: metav1.ObjectMeta{ + Annotations: aPod.lastPodStatusReceivedFromAdapter.Annotations, + Labels: aPod.lastPodStatusReceivedFromAdapter.Labels, + Finalizers: aPod.lastPodStatusReceivedFromAdapter.Finalizers, + }, + } + if aPod.lastPodStatusUpdateSkipped && podutils.IsChange(newPod, tmpPod) { + pc.asyncPodFromKubeWorker.Add(key) + // TODO: Reset this to avoid re-adding it continuously + aPod.lastPodStatusUpdateSkipped = false + } } + aPod.Unlock() if podutils.ShouldEnqueue(oldPod, newPod) { pc.asyncPodFromKubeWorker.Add(key) @@ -221,6 +222,10 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er return err } + pc.asyncPodFromKubeWorker.Run(1, ctx.Done()) + pc.deletePodFromKubeWorker.Run(1, ctx.Done()) + pc.asyncPodStatusFromAdapterWorker.Run(1, ctx.Done()) + <-ctx.Done() return nil @@ -234,7 +239,6 @@ func (pc *PodController) asyncPodFromKube(key utils.QueueKey) error { } pod, err := pc.podsLister.Pods(namespace).Get(name) - if err != nil { if !apierrors.IsNotFound(err) { // TODO: miss event @@ -295,21 +299,21 @@ func (pc *PodController) createOrUpdatePod(ctx context.Context, pod *corev1.Pod) klog.Infof("Pod %s exists, updating pod in adapter", podFromAdapter.Name) if origErr := pc.podHandler.Update(ctx, podForAdapter); origErr != nil { pc.handleAdapterError(ctx, origErr, pod) - pc.recorder.Event(pod, corev1.EventTypeWarning, podEventUpdateFailed, origErr.Error()) + //pc.recorder.Event(pod, corev1.EventTypeWarning, podEventUpdateFailed, origErr.Error()) return origErr } klog.Info("Updated pod in adapter") - pc.recorder.Event(pod, corev1.EventTypeNormal, podEventUpdateSuccess, "Update pod in adapter successfully") + //pc.recorder.Event(pod, corev1.EventTypeNormal, podEventUpdateSuccess, "Update pod in adapter successfully") } } else { if origErr := pc.podHandler.Create(ctx, podForAdapter); origErr != nil { pc.handleAdapterError(ctx, origErr, pod) - pc.recorder.Event(pod, corev1.EventTypeWarning, podEventCreateFailed, origErr.Error()) + //pc.recorder.Event(pod, corev1.EventTypeWarning, podEventCreateFailed, origErr.Error()) return origErr } klog.Info("Created pod in adapter") - pc.recorder.Event(pod, corev1.EventTypeNormal, podEventCreateSuccess, "Create pod in adapter successfully") + //pc.recorder.Event(pod, corev1.EventTypeNormal, podEventCreateSuccess, "Create pod in adapter successfully") } return nil } diff --git a/pkg/knode-manager/knode.go b/pkg/knode-manager/knode.go index 85438e96d..2f0038aeb 100644 --- a/pkg/knode-manager/knode.go +++ b/pkg/knode-manager/knode.go @@ -16,7 +16,6 @@ import ( "k8s.io/client-go/kubernetes/scheme" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/klog" klogv2 "k8s.io/klog/v2" @@ -38,10 +37,12 @@ type Knode struct { podController *controllers.PodController nodeController *controllers.NodeController - pvcController *controllers.PVCController - pvController *controllers.PVController + /* pvcController *controllers.PVCController + pvController *controllers.PVController*/ - informerFactory kubeinformers.SharedInformerFactory + clientInformerFactory kubeinformers.SharedInformerFactory + masterInformerFactory kubeinformers.SharedInformerFactory + podInformerFactory kubeinformers.SharedInformerFactory ac *k8sadapter.AdapterConfig } @@ -111,8 +112,8 @@ func NewKnode(ctx context.Context, knode *kosmosv1alpha1.Knode, cmdConfig *confi var podAdapter adapters.PodHandler var nodeAdapter adapters.NodeHandler - var pvcAdapter adapters.PVCHandler - var pvAdapter adapters.PVHandler + /* var pvcAdapter adapters.PVCHandler + var pvAdapter adapters.PVHandler*/ if knode.Spec.Type == kosmosv1alpha1.K8sAdapter { podAdapter, err = k8sadapter.NewPodAdapter(ctx, ac, "", true) if err != nil { @@ -122,14 +123,14 @@ func NewKnode(ctx context.Context, knode *kosmosv1alpha1.Knode, cmdConfig *confi if err != nil { return nil, err } - pvcAdapter, err = k8sadapter.NewPVCAdapter(ctx, ac) - if err != nil { - return nil, err - } - pvAdapter, err = k8sadapter.NewPVAdapter(ctx, ac) - if err != nil { - return nil, err - } + /* pvcAdapter, err = k8sadapter.NewPVCAdapter(ctx, ac) + if err != nil { + return nil, err + } + pvAdapter, err = k8sadapter.NewPVAdapter(ctx, ac) + if err != nil { + return nil, err + }*/ } dummyNode := controllers.BuildDummyNode(ctx, knode, nodeAdapter) @@ -166,43 +167,31 @@ func NewKnode(ctx context.Context, knode *kosmosv1alpha1.Knode, cmdConfig *confi return nil, err } - pvcController, err := controllers.NewPVCController(pvcAdapter, master, knode.Name) - if err != nil { - return nil, err - } + /* pvcController, err := controllers.NewPVCController(pvcAdapter, master, knode.Name) + if err != nil { + return nil, err + } - pvController, err := controllers.NewPVController(pvAdapter, master, knode.Name) - if err != nil { - return nil, err - } + pvController, err := controllers.NewPVController(pvAdapter, master, knode.Name) + if err != nil { + return nil, err + }*/ return &Knode{ - client: client, - master: master, - informerFactory: clientInformers.informer, - ac: ac, - podController: pc, - nodeController: nc, - pvcController: pvcController, - pvController: pvController, + client: client, + master: master, + clientInformerFactory: clientInformers.informer, + masterInformerFactory: masterInformers.informer, + podInformerFactory: podInformerForNode, + ac: ac, + podController: pc, + nodeController: nc, + /* pvcController: pvcController, + pvController: pvController,*/ }, nil } func (kn *Knode) Run(ctx context.Context, c *config.Opts) { - kn.informerFactory.Start(ctx.Done()) - - if !cache.WaitForCacheSync(ctx.Done(), - kn.ac.NodeInformer.Informer().HasSynced, - kn.ac.PodInformer.Informer().HasSynced, - kn.ac.ConfigmapInformer.Informer().HasSynced, - kn.ac.NamespaceInformer.Informer().HasSynced, - kn.ac.SecretInformer.Informer().HasSynced, - kn.ac.PersistentVolumeClaimInformer.Informer().HasSynced, - kn.ac.PersistentVolumeInformer.Informer().HasSynced, - ) { - klogv2.Fatal("nodesInformer waitForCacheSync failed") - } - go func() { if err := kn.podController.Run(ctx, c.PodSyncWorkers); err != nil && !errors.Is(errors.Cause(err), context.Canceled) { klogv2.Fatal(err) @@ -215,17 +204,21 @@ func (kn *Knode) Run(ctx context.Context, c *config.Opts) { } }() - go func() { - if err := kn.pvcController.Run(ctx); err != nil { - klogv2.Fatal(err) - } - }() - - go func() { - if err := kn.pvController.Run(ctx); err != nil { - klogv2.Fatal(err) - } - }() + /* go func() { + if err := kn.pvcController.Run(ctx); err != nil { + klogv2.Fatal(err) + } + }() + + go func() { + if err := kn.pvController.Run(ctx); err != nil { + klogv2.Fatal(err) + } + }()*/ + + kn.clientInformerFactory.Start(ctx.Done()) + kn.masterInformerFactory.Start(ctx.Done()) + kn.podInformerFactory.Start(ctx.Done()) <-ctx.Done() }