Skip to content

Commit

Permalink
Merge pull request kosmos-io#67 from xyz2277/clusterrouter_zyx_1
Browse files Browse the repository at this point in the history
Make knode be runnable
  • Loading branch information
kosmos-robot authored Sep 24, 2023
2 parents a9df56c + 6c71692 commit e19acbd
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 90 deletions.
32 changes: 19 additions & 13 deletions pkg/knode-manager/adapters/k8s/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

Expand All @@ -28,6 +30,9 @@ type NodeAdapter struct {

nodeResource resources.NodeResource

clientNodeLister v1.NodeLister
clientPodLister v1.PodLister

updatedNode chan *corev1.Node
updatedPod chan *corev1.Pod

Expand All @@ -36,11 +41,13 @@ type NodeAdapter struct {

func NewNodeAdapter(ctx context.Context, cr *kosmosv1alpha1.Knode, ac *AdapterConfig, c *config.Opts) (*NodeAdapter, error) {
adapter := &NodeAdapter{
client: ac.Client,
master: ac.Master,
cfg: c,
knode: cr,
stopCh: ctx.Done(),
client: ac.Client,
master: ac.Master,
cfg: c,
knode: cr,
stopCh: ctx.Done(),
clientNodeLister: ac.NodeInformer.Lister(),
clientPodLister: ac.PodInformer.Lister(),
}

_, err := ac.NodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -101,18 +108,18 @@ func (n *NodeAdapter) Configure(ctx context.Context, node *corev1.Node) {
return
}

nodes, err := n.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
nodes, err := n.clientNodeLister.List(labels.Everything())
if err != nil {
return
}

nodeResource := resources.NewResource()
for _, n := range nodes.Items {
for _, n := range nodes {
if n.Spec.Unschedulable {
continue
}
if !checkNodeStatusReady(node) {
klog.Infof("Node %v not ready", node.Name)
if !checkNodeStatusReady(n) {
klog.Infof("Node %v not ready", n.Name)
continue
}
nc := resources.ConvertToResource(n.Status.Capacity)
Expand Down Expand Up @@ -339,12 +346,11 @@ func (n *NodeAdapter) updatePodResources(old, new *corev1.Pod) {

func (n *NodeAdapter) getResourceFromPods(ctx context.Context) *resources.Resource {
podResource := resources.NewResource()
pods, err := n.client.CoreV1().Pods("").List(ctx, metav1.ListOptions{})
pods, err := n.clientPodLister.List(labels.Everything())
if err != nil {
return podResource
}
for _, pod := range pods.Items {
pod := pod
for _, pod := range pods {
if pod.Status.Phase == corev1.PodPending && pod.Spec.NodeName != "" ||
pod.Status.Phase == corev1.PodRunning {
nodeName := pod.Spec.NodeName
Expand All @@ -356,7 +362,7 @@ func (n *NodeAdapter) getResourceFromPods(ctx context.Context) *resources.Resour
if node.Spec.Unschedulable || !checkNodeStatusReady(node) {
continue
}
res := GetRequestFromPod(&pod)
res := GetRequestFromPod(pod)
res.Pods = resource.MustParse("1")
podResource.Add(res)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/knode-manager/controllers/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func NewNodeController(adapter adapters.NodeHandler, client kubernetes.Interface
}

func (n *NodeController) Run(ctx context.Context) error {
n.group.StartWithContext(ctx, n.nodeProbeController.Run)
err := n.applyNode(ctx)
if err != nil {
return err
Expand All @@ -109,7 +110,6 @@ func (n *NodeController) Run(ctx context.Context) error {
n.statusUpdateChan <- node
})

n.group.StartWithContext(ctx, n.nodeProbeController.Run)
n.group.StartWithContext(ctx, n.leaseController.Run)

return n.sync(ctx)
Expand Down
46 changes: 25 additions & 21 deletions pkg/knode-manager/controllers/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,14 @@ func NewPodController(cfg PodConfig) (*PodController, error) {
}

func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr error) {
ctx, cancel := context.WithCancel(ctx)
pc.ctx = ctx
ctx, cancel := context.WithCancel(pc.ctx)
defer cancel()

pc.podHandler.Notify(ctx, func(pod *corev1.Pod) {
pc.enqueuePodStatusUpdate(ctx, pod.DeepCopy())
})

pc.asyncPodFromKubeWorker.Run(1, ctx.Done())

var eventHandler cache.ResourceEventHandler = cache.ResourceEventHandlerFuncs{
AddFunc: func(pod interface{}) {
if key, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
Expand All @@ -180,20 +179,22 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er

aPod := obj.(*attendPod)
aPod.Lock()
defer aPod.Unlock()
tmpPod := &corev1.Pod{
Status: aPod.lastPodStatusReceivedFromAdapter.Status,
ObjectMeta: metav1.ObjectMeta{
Annotations: aPod.lastPodStatusReceivedFromAdapter.Annotations,
Labels: aPod.lastPodStatusReceivedFromAdapter.Labels,
Finalizers: aPod.lastPodStatusReceivedFromAdapter.Finalizers,
},
}
if aPod.lastPodStatusUpdateSkipped && podutils.IsChange(newPod, tmpPod) {
pc.asyncPodFromKubeWorker.Add(key)
// TODO: Reset this to avoid re-adding it continuously
aPod.lastPodStatusUpdateSkipped = false
if aPod.lastPodStatusReceivedFromAdapter != nil {
tmpPod := &corev1.Pod{
Status: aPod.lastPodStatusReceivedFromAdapter.Status,
ObjectMeta: metav1.ObjectMeta{
Annotations: aPod.lastPodStatusReceivedFromAdapter.Annotations,
Labels: aPod.lastPodStatusReceivedFromAdapter.Labels,
Finalizers: aPod.lastPodStatusReceivedFromAdapter.Finalizers,
},
}
if aPod.lastPodStatusUpdateSkipped && podutils.IsChange(newPod, tmpPod) {
pc.asyncPodFromKubeWorker.Add(key)
// TODO: Reset this to avoid re-adding it continuously
aPod.lastPodStatusUpdateSkipped = false
}
}
aPod.Unlock()

if podutils.ShouldEnqueue(oldPod, newPod) {
pc.asyncPodFromKubeWorker.Add(key)
Expand Down Expand Up @@ -221,6 +222,10 @@ func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr er
return err
}

pc.asyncPodFromKubeWorker.Run(1, ctx.Done())
pc.deletePodFromKubeWorker.Run(1, ctx.Done())
pc.asyncPodStatusFromAdapterWorker.Run(1, ctx.Done())

<-ctx.Done()

return nil
Expand All @@ -234,7 +239,6 @@ func (pc *PodController) asyncPodFromKube(key utils.QueueKey) error {
}

pod, err := pc.podsLister.Pods(namespace).Get(name)

if err != nil {
if !apierrors.IsNotFound(err) {
// TODO: miss event
Expand Down Expand Up @@ -295,21 +299,21 @@ func (pc *PodController) createOrUpdatePod(ctx context.Context, pod *corev1.Pod)
klog.Infof("Pod %s exists, updating pod in adapter", podFromAdapter.Name)
if origErr := pc.podHandler.Update(ctx, podForAdapter); origErr != nil {
pc.handleAdapterError(ctx, origErr, pod)
pc.recorder.Event(pod, corev1.EventTypeWarning, podEventUpdateFailed, origErr.Error())
//pc.recorder.Event(pod, corev1.EventTypeWarning, podEventUpdateFailed, origErr.Error())

return origErr
}
klog.Info("Updated pod in adapter")
pc.recorder.Event(pod, corev1.EventTypeNormal, podEventUpdateSuccess, "Update pod in adapter successfully")
//pc.recorder.Event(pod, corev1.EventTypeNormal, podEventUpdateSuccess, "Update pod in adapter successfully")
}
} else {
if origErr := pc.podHandler.Create(ctx, podForAdapter); origErr != nil {
pc.handleAdapterError(ctx, origErr, pod)
pc.recorder.Event(pod, corev1.EventTypeWarning, podEventCreateFailed, origErr.Error())
//pc.recorder.Event(pod, corev1.EventTypeWarning, podEventCreateFailed, origErr.Error())
return origErr
}
klog.Info("Created pod in adapter")
pc.recorder.Event(pod, corev1.EventTypeNormal, podEventCreateSuccess, "Create pod in adapter successfully")
//pc.recorder.Event(pod, corev1.EventTypeNormal, podEventCreateSuccess, "Create pod in adapter successfully")
}
return nil
}
Expand Down
103 changes: 48 additions & 55 deletions pkg/knode-manager/knode.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"k8s.io/client-go/kubernetes/scheme"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
klogv2 "k8s.io/klog/v2"
Expand All @@ -38,10 +37,12 @@ type Knode struct {

podController *controllers.PodController
nodeController *controllers.NodeController
pvcController *controllers.PVCController
pvController *controllers.PVController
/* pvcController *controllers.PVCController
pvController *controllers.PVController*/

informerFactory kubeinformers.SharedInformerFactory
clientInformerFactory kubeinformers.SharedInformerFactory
masterInformerFactory kubeinformers.SharedInformerFactory
podInformerFactory kubeinformers.SharedInformerFactory

ac *k8sadapter.AdapterConfig
}
Expand Down Expand Up @@ -111,8 +112,8 @@ func NewKnode(ctx context.Context, knode *kosmosv1alpha1.Knode, cmdConfig *confi

var podAdapter adapters.PodHandler
var nodeAdapter adapters.NodeHandler
var pvcAdapter adapters.PVCHandler
var pvAdapter adapters.PVHandler
/* var pvcAdapter adapters.PVCHandler
var pvAdapter adapters.PVHandler*/
if knode.Spec.Type == kosmosv1alpha1.K8sAdapter {
podAdapter, err = k8sadapter.NewPodAdapter(ctx, ac, "", true)
if err != nil {
Expand All @@ -122,14 +123,14 @@ func NewKnode(ctx context.Context, knode *kosmosv1alpha1.Knode, cmdConfig *confi
if err != nil {
return nil, err
}
pvcAdapter, err = k8sadapter.NewPVCAdapter(ctx, ac)
if err != nil {
return nil, err
}
pvAdapter, err = k8sadapter.NewPVAdapter(ctx, ac)
if err != nil {
return nil, err
}
/* pvcAdapter, err = k8sadapter.NewPVCAdapter(ctx, ac)
if err != nil {
return nil, err
}
pvAdapter, err = k8sadapter.NewPVAdapter(ctx, ac)
if err != nil {
return nil, err
}*/
}

dummyNode := controllers.BuildDummyNode(ctx, knode, nodeAdapter)
Expand Down Expand Up @@ -166,43 +167,31 @@ func NewKnode(ctx context.Context, knode *kosmosv1alpha1.Knode, cmdConfig *confi
return nil, err
}

pvcController, err := controllers.NewPVCController(pvcAdapter, master, knode.Name)
if err != nil {
return nil, err
}
/* pvcController, err := controllers.NewPVCController(pvcAdapter, master, knode.Name)
if err != nil {
return nil, err
}
pvController, err := controllers.NewPVController(pvAdapter, master, knode.Name)
if err != nil {
return nil, err
}
pvController, err := controllers.NewPVController(pvAdapter, master, knode.Name)
if err != nil {
return nil, err
}*/

return &Knode{
client: client,
master: master,
informerFactory: clientInformers.informer,
ac: ac,
podController: pc,
nodeController: nc,
pvcController: pvcController,
pvController: pvController,
client: client,
master: master,
clientInformerFactory: clientInformers.informer,
masterInformerFactory: masterInformers.informer,
podInformerFactory: podInformerForNode,
ac: ac,
podController: pc,
nodeController: nc,
/* pvcController: pvcController,
pvController: pvController,*/
}, nil
}

func (kn *Knode) Run(ctx context.Context, c *config.Opts) {
kn.informerFactory.Start(ctx.Done())

if !cache.WaitForCacheSync(ctx.Done(),
kn.ac.NodeInformer.Informer().HasSynced,
kn.ac.PodInformer.Informer().HasSynced,
kn.ac.ConfigmapInformer.Informer().HasSynced,
kn.ac.NamespaceInformer.Informer().HasSynced,
kn.ac.SecretInformer.Informer().HasSynced,
kn.ac.PersistentVolumeClaimInformer.Informer().HasSynced,
kn.ac.PersistentVolumeInformer.Informer().HasSynced,
) {
klogv2.Fatal("nodesInformer waitForCacheSync failed")
}

go func() {
if err := kn.podController.Run(ctx, c.PodSyncWorkers); err != nil && !errors.Is(errors.Cause(err), context.Canceled) {
klogv2.Fatal(err)
Expand All @@ -215,17 +204,21 @@ func (kn *Knode) Run(ctx context.Context, c *config.Opts) {
}
}()

go func() {
if err := kn.pvcController.Run(ctx); err != nil {
klogv2.Fatal(err)
}
}()

go func() {
if err := kn.pvController.Run(ctx); err != nil {
klogv2.Fatal(err)
}
}()
/* go func() {
if err := kn.pvcController.Run(ctx); err != nil {
klogv2.Fatal(err)
}
}()
go func() {
if err := kn.pvController.Run(ctx); err != nil {
klogv2.Fatal(err)
}
}()*/

kn.clientInformerFactory.Start(ctx.Done())
kn.masterInformerFactory.Start(ctx.Done())
kn.podInformerFactory.Start(ctx.Done())

<-ctx.Done()
}

0 comments on commit e19acbd

Please sign in to comment.