Skip to content

Commit

Permalink
add a control switch and fix watch bug
Browse files Browse the repository at this point in the history
Co-authored-by: zhangyongxi <[email protected]>
Co-authored-by: wuyingjun <[email protected]>
Co-authored-by: zhouhao <[email protected]>
Signed-off-by: baoyinghai <[email protected]>
  • Loading branch information
4 people committed Dec 18, 2023
1 parent 21b15f7 commit 05da669
Show file tree
Hide file tree
Showing 20 changed files with 161 additions and 125 deletions.
15 changes: 9 additions & 6 deletions cmd/apiserver/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher"
watchcomponents "github.com/clusterpedia-io/clusterpedia/pkg/watcher/components"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
watchoptions "github.com/clusterpedia-io/clusterpedia/pkg/watcher/options"
)

Expand All @@ -46,7 +47,7 @@ type ClusterPediaServerOptions struct {

Storage *storageoptions.StorageOptions

Subscriber *watchoptions.MiddlerwareOptions
Subscriber *watchoptions.MiddlewareOptions
}

func NewServerOptions() *ClusterPediaServerOptions {
Expand Down Expand Up @@ -129,11 +130,13 @@ func (o *ClusterPediaServerOptions) Config() (*apiserver.Config, error) {
StorageFactory: storage,
}

err = watcher.NewSubscriber(o.Subscriber)
watchcomponents.InitEventCacheSize(o.Subscriber.CacheSize)

if err != nil {
return nil, err
middleware.SubscriberEnabled = o.Subscriber.Enabled
if middleware.SubscriberEnabled {
err = watcher.NewSubscriber(o.Subscriber)
if err != nil {
return nil, err
}
watchcomponents.InitEventCacheSize(o.Subscriber.CacheSize)
}

return config, nil
Expand Down
12 changes: 8 additions & 4 deletions cmd/clustersynchro-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
watchoptions "github.com/clusterpedia-io/clusterpedia/pkg/watcher/options"
)

Expand All @@ -50,7 +51,7 @@ type Options struct {
WorkerNumber int // WorkerNumber is the number of worker goroutines
PageSizeForResourceSync int64
ShardingName string
Publisher *watchoptions.MiddlerwareOptions
Publisher *watchoptions.MiddlewareOptions
}

func NewClusterSynchroManagerOptions() (*Options, error) {
Expand Down Expand Up @@ -137,9 +138,12 @@ func (o *Options) Config() (*config.Config, error) {
return nil, err
}

err = watcher.NewPulisher(o.Publisher)
if err != nil {
return nil, err
middleware.PublisherEnabled = o.Publisher.Enabled
if middleware.PublisherEnabled {
err = watcher.NewPulisher(o.Publisher)
if err != nil {
return nil, err
}
}

kubeconfig, err := clientcmd.BuildConfigFromFlags(o.Master, o.Kubeconfig)
Expand Down
18 changes: 18 additions & 0 deletions cmd/clustersynchro-manager/app/synchro.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager"
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
"github.com/clusterpedia-io/clusterpedia/pkg/version/verflag"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/components"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
)

func init() {
Expand Down Expand Up @@ -98,6 +100,12 @@ func Run(ctx context.Context, c *config.Config) error {
}

if !c.LeaderElection.LeaderElect {
if middleware.PublisherEnabled {
err := middleware.GlobalPublisher.InitPublisher(ctx)
if err != nil {
return err
}
}
synchromanager.Run(c.WorkerNumber, ctx.Done())
return nil
}
Expand Down Expand Up @@ -138,13 +146,23 @@ func Run(ctx context.Context, c *config.Config) error {
defer close(done)

stopCh := ctx.Done()
if middleware.PublisherEnabled {
err := middleware.GlobalPublisher.InitPublisher(ctx)
if err != nil {
return
}
}
synchromanager.Run(c.WorkerNumber, stopCh)
},
OnStoppedLeading: func() {
klog.Info("leaderelection lost")
if done != nil {
<-done
}
if middleware.PublisherEnabled {
middleware.GlobalPublisher.StopPublisher()
components.EC.CloseChannels()
}
},
},
})
Expand Down
10 changes: 6 additions & 4 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,12 @@ func (config completedConfig) New() (*ClusterPediaServer, error) {

// init event cache pool
eventStop := make(chan struct{})
watchcomponents.InitEventCachePool(eventStop)
err := middleware.GlobalSubscriber.InitSubscriber(eventStop)
if err != nil {
return nil, err
if middleware.SubscriberEnabled {
watchcomponents.InitEventCachePool(eventStop)
err := middleware.GlobalSubscriber.InitSubscriber(eventStop)
if err != nil {
return nil, err
}
}

discoveryClient, err := discovery.NewDiscoveryClientForConfig(config.ClientConfig)
Expand Down
6 changes: 6 additions & 0 deletions pkg/generated/openapi/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 31 additions & 10 deletions pkg/storage/internalstorage/resource_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,19 +269,40 @@ func (s *ResourceStorage) GetObj(ctx context.Context, cluster, namespace, name s
return obj, nil
}

func (s *ResourceStorage) GenGetObjectQuery(ctx context.Context, cluster, namespace, name string) *gorm.DB {
condition := map[string]interface{}{
"namespace": namespace,
"name": name,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"deleted": false,
}

if cluster != "" {
condition["cluster"] = cluster
}
return s.db.WithContext(ctx).Model(&Resource{}).Select("cluster_resource_version, object").Where(condition)
}

func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name string, into runtime.Object) error {
var objects [][]byte
if result := s.genGetObjectQuery(ctx, cluster, namespace, name).First(&objects); result.Error != nil {
var resource Resource
if result := s.GenGetObjectQuery(ctx, cluster, namespace, name).First(&resource); result.Error != nil {
return InterpretResourceDBError(cluster, namespace+"/"+name, result.Error)
}

obj, _, err := s.codec.Decode(objects[0], nil, into)
obj, _, err := s.codec.Decode(resource.Object, nil, into)
if err != nil {
return err
}
if obj != into {
return fmt.Errorf("Failed to decode resource, into is %T", into)
}
metaObj, err := meta.Accessor(obj)
if err != nil {
return err
}
metaObj.SetResourceVersion(utils.ParseInt642Str(resource.ClusterResourceVersion))
return nil
}

Expand All @@ -291,14 +312,13 @@ func (s *ResourceStorage) genListObjectsQuery(ctx context.Context, opts *interna
result = &ResourceMetadataList{}
}

var condition map[string]interface{}
condition := map[string]interface{}{
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
}
if !isAll {
condition = map[string]interface{}{
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"deleted": false,
}
condition["deleted"] = false
}

query := s.db.WithContext(ctx).Model(&Resource{}).Where(condition)
Expand All @@ -313,6 +333,7 @@ func (s *ResourceStorage) genListQuery(ctx context.Context, newfunc func() runti
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"deleted": false,
}
query := s.db.WithContext(ctx).Model(&Resource{}).Select("object").Where(condition)
_, _, query, err := applyListOptionsToResourceQuery(s.db, query, opts)
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/internalstorage/resource_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ func TestResourceStorage_genGetObjectQuery(t *testing.T) {
"",
"",
expected{
`SELECT "object" FROM "resources" WHERE "cluster" = '' AND "group" = '' AND "name" = '' AND "namespace" = '' AND "resource" = '' AND "version" = '' ORDER BY "resources"."id" LIMIT 1`,
"SELECT `object` FROM `resources` WHERE `cluster` = '' AND `group` = '' AND `name` = '' AND `namespace` = '' AND `resource` = '' AND `version` = '' ORDER BY `resources`.`id` LIMIT 1",
`SELECT cluster_resource_version, object FROM "resources" WHERE "deleted" = false AND "group" = '' AND "name" = '' AND "namespace" = '' AND "resource" = '' AND "version" = '' ORDER BY "resources"."id" LIMIT 1`,
"SELECT cluster_resource_version, object FROM `resources` WHERE `deleted` = false AND `group` = '' AND `name` = '' AND `namespace` = '' AND `resource` = '' AND `version` = '' ORDER BY `resources`.`id` LIMIT 1",
"",
},
},
Expand All @@ -212,8 +212,8 @@ func TestResourceStorage_genGetObjectQuery(t *testing.T) {
"ns-1",
"resource-1",
expected{
`SELECT "object" FROM "resources" WHERE "cluster" = 'cluster-1' AND "group" = 'apps' AND "name" = 'resource-1' AND "namespace" = 'ns-1' AND "resource" = 'deployments' AND "version" = 'v1' ORDER BY "resources"."id" LIMIT 1`,
"SELECT `object` FROM `resources` WHERE `cluster` = 'cluster-1' AND `group` = 'apps' AND `name` = 'resource-1' AND `namespace` = 'ns-1' AND `resource` = 'deployments' AND `version` = 'v1' ORDER BY `resources`.`id` LIMIT 1",
`SELECT cluster_resource_version, object FROM "resources" WHERE "cluster" = 'cluster-1' AND "deleted" = false AND "group" = 'apps' AND "name" = 'resource-1' AND "namespace" = 'ns-1' AND "resource" = 'deployments' AND "version" = 'v1' ORDER BY "resources"."id" LIMIT 1`,
"SELECT cluster_resource_version, object FROM `resources` WHERE `cluster` = 'cluster-1' AND `deleted` = false AND `group` = 'apps' AND `name` = 'resource-1' AND `namespace` = 'ns-1' AND `resource` = 'deployments' AND `version` = 'v1' ORDER BY `resources`.`id` LIMIT 1",
"",
},
},
Expand Down
22 changes: 16 additions & 6 deletions pkg/storage/internalstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

internal "github.com/clusterpedia-io/api/clusterpedia"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro/informer"
"github.com/clusterpedia-io/clusterpedia/pkg/utils"
watchcomponents "github.com/clusterpedia-io/clusterpedia/pkg/watcher/components"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
Expand Down Expand Up @@ -43,8 +44,8 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi
KeyFunc: utils.GetKeyFunc(gvr, config.Namespaced),
}

// initEventCache is true when Apiserver starts, false when clustersynchro-manager starts
if initEventCache {
// SubscriberEnabled is true when Apiserver starts and middleware enabled
if middleware.SubscriberEnabled {
var cache *watchcomponents.EventCache
buffer := watchcomponents.GetMultiClusterEventPool().GetClusterBufferByGVR(gvr)
cachePool := watchcomponents.GetInitEventCachePool()
Expand Down Expand Up @@ -72,7 +73,7 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi

resourceStorage.buffer = buffer
resourceStorage.eventCache = cache
} else {
} else if middleware.PublisherEnabled { // PublisherEnabled is true when clustersynchro-manager starts and middleware enabled
err := middleware.GlobalPublisher.PublishTopic(gvr, config.Codec)
if err != nil {
return nil, err
Expand All @@ -99,8 +100,11 @@ func (s *StorageFactory) NewCollectionResourceStorage(cr *internal.CollectionRes

func (f *StorageFactory) GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]map[string]interface{}, error) {
var resources []Resource
result := f.db.WithContext(ctx).Select("group", "version", "resource", "namespace", "name", "resource_version").
Where(map[string]interface{}{"cluster": cluster}).
result := f.db.WithContext(ctx).Select("group", "version", "resource",
"namespace", "name", "resource_version", "deleted", "published").
Where(map[string]interface{}{"cluster": cluster, "deleted": false}).
//In case deleted event be losted when synchro manager do a leaderelection or reboot
Or(map[string]interface{}{"cluster": cluster, "deleted": true, "published": false}).
Find(&resources)
if result.Error != nil {
return nil, InterpretDBError(cluster, result.Error)
Expand All @@ -119,7 +123,13 @@ func (f *StorageFactory) GetResourceVersions(ctx context.Context, cluster string
if resource.Namespace != "" {
key = resource.Namespace + "/" + resource.Name
}
versions[key] = resource.ResourceVersion
versions[key] = informer.StorageElement{
Version: resource.ResourceVersion,
Deleted: resource.Deleted,
Published: resource.Published,
Name: resource.Name,
Namespace: resource.Namespace,
}
}
return resourceversions, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,21 @@ func (c *ResourceVersionStorage) Add(obj interface{}) error {
if err != nil {
return cache.KeyError{Obj: obj, Err: err}
}

c.cacheStorage.Delete(key)

accessor, err := meta.Accessor(obj)
if err != nil {
return err
}

c.cacheStorage.Add(key, accessor.GetResourceVersion())
c.cacheStorage.Add(key, StorageElement{
Version: accessor.GetResourceVersion(),
Deleted: false,
Published: true,
Name: accessor.GetName(),
Namespace: accessor.GetNamespace(),
})
return nil
}

Expand All @@ -52,7 +61,13 @@ func (c *ResourceVersionStorage) Update(obj interface{}) error {
return err
}

c.cacheStorage.Update(key, accessor.GetResourceVersion())
c.cacheStorage.Update(key, StorageElement{
Version: accessor.GetResourceVersion(),
Deleted: false,
Published: true,
Name: accessor.GetName(),
Namespace: accessor.GetNamespace(),
})
return nil
}

Expand Down
33 changes: 11 additions & 22 deletions pkg/synchromanager/clustersynchro/resource_synchro.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/features"
"github.com/clusterpedia-io/clusterpedia/pkg/utils"
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
)

type ResourceSynchroConfig struct {
Expand Down Expand Up @@ -394,22 +395,6 @@ func (synchro *ResourceSynchro) handleResourceEvent(event *queue.Event) {
if !ok {
if _, ok = event.Object.(cache.DeletedFinalStateUnknown); !ok {
return
} else {
dfs := event.Object.(cache.DeletedFinalStateUnknown)
var se informer.StorageElement
if se, ok = dfs.Obj.(informer.StorageElement); !ok {
return
}
var err error
obj, err = synchro.storage.GetObj(synchro.ctx, synchro.cluster, se.Namespace, se.Name)
if err != nil {
return
}
metaObj, err := meta.Accessor(obj)
if err == nil {
klog.Warning("DeletedFinalStateUnknown, name: ", metaObj.GetName(), ", time: ", metaObj.GetDeletionTimestamp(),
", kind: ", obj.GetObjectKind().GroupVersionKind().Kind, ", cluster: ", synchro.cluster)
}
}
}
key, _ := cache.MetaNamespaceKeyFunc(obj)
Expand Down Expand Up @@ -442,19 +427,23 @@ func (synchro *ResourceSynchro) handleResourceEvent(event *queue.Event) {
Published: true,
}
synchro.rvsLock.Unlock()
err := synchro.storage.ProcessEvent(context.TODO(), eventType, obj, synchro.cluster)
if err != nil {
return
if middleware.PublisherEnabled {
err := synchro.storage.ProcessEvent(context.TODO(), eventType, obj, synchro.cluster)
if err != nil {
return
}
}
}
} else {
handler, callback = synchro.deleteResource, func(_ runtime.Object, eventType watch.EventType) {
synchro.rvsLock.Lock()
delete(synchro.rvs, key)
synchro.rvsLock.Unlock()
err := synchro.storage.ProcessEvent(context.TODO(), eventType, obj, synchro.cluster)
if err != nil {
return
if middleware.PublisherEnabled {
err := synchro.storage.ProcessEvent(context.TODO(), eventType, obj, synchro.cluster)
if err != nil {
return
}
}
}
}
Expand Down
Loading

0 comments on commit 05da669

Please sign in to comment.