Skip to content

Commit

Permalink
Merge pull request #824 from duanmengkk/feature_etcd_watcher
Browse files Browse the repository at this point in the history
add etcd watcher for blockaffinity
  • Loading branch information
duanmengkk authored Jan 25, 2025
2 parents 0c599cc + e7068a9 commit c619741
Show file tree
Hide file tree
Showing 8 changed files with 819 additions and 103 deletions.
88 changes: 0 additions & 88 deletions go.sum

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package blockwatchsyncer

import (
apiv3 "github.com/projectcalico/api/pkg/apis/projectcalico/v3"
"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
"github.com/projectcalico/calico/libcalico-go/lib/backend/model"
"github.com/projectcalico/calico/libcalico-go/lib/backend/watchersyncer"
)

// NewBlockWatchSyncer creates a new BlockAffinity v1 Syncer.
func NewBlockWatchSyncer(client api.Client, callbacks api.SyncerCallbacks) api.Syncer {
resourceTypes := []watchersyncer.ResourceType{
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindBlockAffinity},
},
}

return watchersyncer.New(client, resourceTypes, callbacks)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package blockwatchsyncer

import (
"github.com/kosmos.io/kosmos/pkg/utils/lifted"
"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"time"
)

// syncedPollPeriod controls how often you look at the status of your sync funcs
var syncedPollPeriod = 100 * time.Millisecond

type BlockEventHandler struct {
// Channel for getting updates and status updates from syncer.
syncerC chan interface{}

processor lifted.AsyncWorker
// Channel to indicate node status reporter routine is not needed anymore.
done chan struct{}

// Flag to show we are in-sync.
inSync bool
}

func NewBlockEventHandler(processor lifted.AsyncWorker) *BlockEventHandler {
return &BlockEventHandler{
processor: processor,
}
}

func (b *BlockEventHandler) Run(stopCh <-chan struct{}) {
for {
select {
case <-stopCh:
return
case <-b.done:
return
case event := <-b.syncerC:
switch event := event.(type) {
case []api.Update:
b.onupdate(event)
case api.SyncStatus:
b.inSync = true
}
}
}
}

func (b *BlockEventHandler) Stop() {
b.done <- struct{}{}
}

func (b *BlockEventHandler) Done() <-chan struct{} {
return b.done
}

func (b *BlockEventHandler) InSync() bool {
return b.inSync
}

func (b *BlockEventHandler) OnStatusUpdated(status api.SyncStatus) {
if status == api.InSync {
b.syncerC <- status
}
}

func (b *BlockEventHandler) OnUpdates(updates []api.Update) {
b.syncerC <- updates
}

// todo put etcd's event info AsyncWorker's queue
func (b *BlockEventHandler) onupdate(event []api.Update) {

}

func (b *BlockEventHandler) WaitForCacheSync(stopCh <-chan struct{}) bool {
err := wait.PollImmediateUntil(syncedPollPeriod, func() (done bool, err error) {
if b.inSync {
return true, nil
}
return false, nil
}, stopCh)

if err != nil {
klog.V(2).Infof("stop requested")
return false
}

klog.V(4).Infof("caches populated")
return true
}
26 changes: 11 additions & 15 deletions pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package adaper

import (
"github.com/kosmos.io/kosmos/pkg/clusterlink/controllers/nodecidr/adaper/blockwatchsyncer"
clusterlister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/utils/lifted"
"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
"k8s.io/klog/v2"
)

type CalicoETCDAdapter struct {
sync bool
watchSyncer api.Syncer
etcdClient api.Client
clusterNodeLister clusterlister.ClusterNodeLister
processor lifted.AsyncWorker
Expand All @@ -25,7 +28,14 @@ func NewCalicoETCDAdapter(etcdClient api.Client,
}

func (c *CalicoETCDAdapter) Start(stopCh <-chan struct{}) error {
// todo use c.etcdClient to list and watch blockaffinity in etcd
blockEventHandler := blockwatchsyncer.NewBlockEventHandler(c.processor)
c.watchSyncer = blockwatchsyncer.NewBlockWatchSyncer(c.etcdClient, blockEventHandler)
c.watchSyncer.Start()
blockEventHandler.Run(stopCh)

blockEventHandler.WaitForCacheSync(stopCh)
c.sync = true
klog.Info("calico blockaffinities etcd watchsyncer started!")
return nil
}

Expand All @@ -39,17 +49,3 @@ func (c *CalicoETCDAdapter) GetCIDRByNodeName(nodeName string) ([]string, error)
func (c *CalicoETCDAdapter) Synced() bool {
return c.sync
}

func (c *CalicoETCDAdapter) OnAdd(obj interface{}) {
// todo add event info to c.processor
}

// OnUpdate handles object update event and push the object to queue.
func (c *CalicoETCDAdapter) OnUpdate(_, newObj interface{}) {
// todo add event info to c.processor
}

// OnDelete handles object delete event and push the object to queue.
func (c *CalicoETCDAdapter) OnDelete(obj interface{}) {
// todo add event info to c.processor
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit c619741

Please sign in to comment.