Skip to content

Commit

Permalink
add a control switch for watch middleware
Browse files Browse the repository at this point in the history
Co-authored-by: zhangyongxi <[email protected]>
Co-authored-by: baoyinghai <[email protected]>
Co-authored-by: wuyingjun <[email protected]>
Signed-off-by: zhouhaoA1 <[email protected]>
  • Loading branch information
4 people committed Dec 5, 2023
1 parent b8e1f74 commit b2080cb
Show file tree
Hide file tree
Showing 15 changed files with 99 additions and 45 deletions.
15 changes: 9 additions & 6 deletions cmd/apiserver/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher"
watchcomponents "github.com/clusterpedia-io/clusterpedia/pkg/watcher/components"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
watchoptions "github.com/clusterpedia-io/clusterpedia/pkg/watcher/options"
)

Expand All @@ -46,7 +47,7 @@ type ClusterPediaServerOptions struct {

Storage *storageoptions.StorageOptions

Subscriber *watchoptions.MiddlerwareOptions
Subscriber *watchoptions.MiddlewareOptions
}

func NewServerOptions() *ClusterPediaServerOptions {
Expand Down Expand Up @@ -129,11 +130,13 @@ func (o *ClusterPediaServerOptions) Config() (*apiserver.Config, error) {
StorageFactory: storage,
}

err = watcher.NewSubscriber(o.Subscriber)
watchcomponents.InitEventCacheSize(o.Subscriber.CacheSize)

if err != nil {
return nil, err
middleware.SubscriberEnabled = o.Subscriber.Enabled
if middleware.SubscriberEnabled {
err = watcher.NewSubscriber(o.Subscriber)
if err != nil {
return nil, err
}
watchcomponents.InitEventCacheSize(o.Subscriber.CacheSize)
}

return config, nil
Expand Down
12 changes: 8 additions & 4 deletions cmd/clustersynchro-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
watchoptions "github.com/clusterpedia-io/clusterpedia/pkg/watcher/options"
)

Expand All @@ -49,7 +50,7 @@ type Options struct {

WorkerNumber int // WorkerNumber is the number of worker goroutines
PageSizeForResourceSync int64
Publisher *watchoptions.MiddlerwareOptions
Publisher *watchoptions.MiddlewareOptions
}

func NewClusterSynchroManagerOptions() (*Options, error) {
Expand Down Expand Up @@ -135,9 +136,12 @@ func (o *Options) Config() (*config.Config, error) {
return nil, err
}

err = watcher.NewPulisher(o.Publisher)
if err != nil {
return nil, err
middleware.PublisherEnabled = o.Publisher.Enabled
if middleware.PublisherEnabled {
err = watcher.NewPulisher(o.Publisher)
if err != nil {
return nil, err
}
}

kubeconfig, err := clientcmd.BuildConfigFromFlags(o.Master, o.Kubeconfig)
Expand Down
18 changes: 18 additions & 0 deletions cmd/clustersynchro-manager/app/synchro.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager"
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
"github.com/clusterpedia-io/clusterpedia/pkg/version/verflag"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/components"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
)

func init() {
Expand Down Expand Up @@ -98,6 +100,12 @@ func Run(ctx context.Context, c *config.Config) error {
}

if !c.LeaderElection.LeaderElect {
if middleware.PublisherEnabled {
err := middleware.GlobalPublisher.InitPublisher(ctx)
if err != nil {
return err
}
}
synchromanager.Run(c.WorkerNumber, ctx.Done())
return nil
}
Expand Down Expand Up @@ -138,13 +146,23 @@ func Run(ctx context.Context, c *config.Config) error {
defer close(done)

stopCh := ctx.Done()
if middleware.PublisherEnabled {
err := middleware.GlobalPublisher.InitPublisher(ctx)
if err != nil {
return
}
}
synchromanager.Run(c.WorkerNumber, stopCh)
},
OnStoppedLeading: func() {
klog.Info("leaderelection lost")
if done != nil {
<-done
}
if middleware.PublisherEnabled {
middleware.GlobalPublisher.StopPublisher()
components.EC.CloseChannels()
}
},
},
})
Expand Down
10 changes: 6 additions & 4 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,12 @@ func (config completedConfig) New() (*ClusterPediaServer, error) {

// init event cache pool
eventStop := make(chan struct{})
watchcomponents.InitEventCachePool(eventStop)
err := middleware.GlobalSubscriber.InitSubscriber(eventStop)
if err != nil {
return nil, err
if middleware.SubscriberEnabled {
watchcomponents.InitEventCachePool(eventStop)
err := middleware.GlobalSubscriber.InitSubscriber(eventStop)
if err != nil {
return nil, err
}
}

discoveryClient, err := discovery.NewDiscoveryClientForConfig(config.ClientConfig)
Expand Down
6 changes: 6 additions & 0 deletions pkg/generated/openapi/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 11 additions & 11 deletions pkg/storage/internalstorage/resource_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,13 @@ func TestResourceStorage_genGetObjectQuery(t *testing.T) {
}{
{
"empty",
schema.GroupVersionResource{},
schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"},
"",
"",
"",
expected{
`SELECT "object" FROM "resources" WHERE "cluster" = '' AND "group" = '' AND "name" = '' AND "namespace" = '' AND "resource" = '' AND "version" = '' ORDER BY "resources"."id" LIMIT 1`,
"SELECT `object` FROM `resources` WHERE `cluster` = '' AND `group` = '' AND `name` = '' AND `namespace` = '' AND `resource` = '' AND `version` = '' ORDER BY `resources`.`id` LIMIT 1",
`SELECT cluster_resource_version, object FROM "apps_v1_deployments" WHERE "deleted" = false AND "name" = '' AND "namespace" = '' ORDER BY "apps_v1_deployments"."id" LIMIT 1`,
"SELECT cluster_resource_version, object FROM `apps_v1_deployments` WHERE `deleted` = false AND `name` = '' AND `namespace` = '' ORDER BY `apps_v1_deployments`.`id` LIMIT 1",
"",
},
},
Expand All @@ -212,8 +212,8 @@ func TestResourceStorage_genGetObjectQuery(t *testing.T) {
"ns-1",
"resource-1",
expected{
`SELECT "object" FROM "resources" WHERE "cluster" = 'cluster-1' AND "group" = 'apps' AND "name" = 'resource-1' AND "namespace" = 'ns-1' AND "resource" = 'deployments' AND "version" = 'v1' ORDER BY "resources"."id" LIMIT 1`,
"SELECT `object` FROM `resources` WHERE `cluster` = 'cluster-1' AND `group` = 'apps' AND `name` = 'resource-1' AND `namespace` = 'ns-1' AND `resource` = 'deployments' AND `version` = 'v1' ORDER BY `resources`.`id` LIMIT 1",
`SELECT cluster_resource_version, object FROM "apps_v1_deployments" WHERE "cluster" = 'cluster-1' AND "deleted" = false AND "name" = 'resource-1' AND "namespace" = 'ns-1' ORDER BY "apps_v1_deployments"."id" LIMIT 1`,
"SELECT cluster_resource_version, object FROM `apps_v1_deployments` WHERE `cluster` = 'cluster-1' AND `deleted` = false AND `name` = 'resource-1' AND `namespace` = 'ns-1' ORDER BY `apps_v1_deployments`.`id` LIMIT 1",
"",
},
},
Expand Down Expand Up @@ -257,8 +257,8 @@ func TestResourceStorage_genListObjectQuery(t *testing.T) {
appsv1.SchemeGroupVersion.WithResource("deployments"),
&internal.ListOptions{},
expected{
`SELECT * FROM "resources" WHERE "group" = 'apps' AND "resource" = 'deployments' AND "version" = 'v1'`,
"SELECT * FROM `resources` WHERE `group` = 'apps' AND `resource` = 'deployments' AND `version` = 'v1'",
`SELECT * FROM "apps_v1_deployments"`,
"SELECT * FROM `apps_v1_deployments`",
"",
},
},
Expand Down Expand Up @@ -314,8 +314,8 @@ func TestResourceStorage_deleteObject(t *testing.T) {
"",
"",
expected{
`DELETE FROM "resources" WHERE "cluster" = '' AND "group" = 'apps' AND "name" = '' AND "namespace" = '' AND "resource" = 'deployments' AND "version" = 'v1'`,
"DELETE FROM `resources` WHERE `cluster` = '' AND `group` = 'apps' AND `name` = '' AND `namespace` = '' AND `resource` = 'deployments' AND `version` = 'v1'",
`DELETE FROM "apps_v1_deployments" WHERE "cluster" = '' AND "name" = '' AND "namespace" = ''`,
"DELETE FROM `apps_v1_deployments` WHERE `cluster` = '' AND `name` = '' AND `namespace` = ''",
"",
},
},
Expand All @@ -326,8 +326,8 @@ func TestResourceStorage_deleteObject(t *testing.T) {
"ns-1",
"resource-1",
expected{
`DELETE FROM "resources" WHERE "cluster" = 'cluster-1' AND "group" = 'apps' AND "name" = 'resource-1' AND "namespace" = 'ns-1' AND "resource" = 'deployments' AND "version" = 'v1'`,
"DELETE FROM `resources` WHERE `cluster` = 'cluster-1' AND `group` = 'apps' AND `name` = 'resource-1' AND `namespace` = 'ns-1' AND `resource` = 'deployments' AND `version` = 'v1'",
`DELETE FROM "apps_v1_deployments" WHERE "cluster" = 'cluster-1' AND "name" = 'resource-1' AND "namespace" = 'ns-1'`,
"DELETE FROM `apps_v1_deployments` WHERE `cluster` = 'cluster-1' AND `name` = 'resource-1' AND `namespace` = 'ns-1'",
"",
},
},
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/internalstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi
KeyFunc: utils.GetKeyFunc(gvr, config.Namespaced),
}

// initEventCache is true when Apiserver starts, false when clustersynchro-manager starts
if initEventCache {
// SubscriberEnabled is true when Apiserver starts and middleware enabled
if middleware.SubscriberEnabled {
var cache *watchcomponents.EventCache
buffer := watchcomponents.GetMultiClusterEventPool().GetClusterBufferByGVR(gvr)
cachePool := watchcomponents.GetInitEventCachePool()
Expand Down Expand Up @@ -88,7 +88,7 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi

resourceStorage.buffer = buffer
resourceStorage.eventCache = cache
} else {
} else if middleware.PublisherEnabled { // PublisherEnabled is true when clustersynchro-manager starts and middleware enabled
err := middleware.GlobalPublisher.PublishTopic(gvr, config.Codec)
if err != nil {
return nil, err
Expand Down
17 changes: 11 additions & 6 deletions pkg/synchromanager/clustersynchro/resource_synchro.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/features"
"github.com/clusterpedia-io/clusterpedia/pkg/utils"
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
)

type ResourceSynchroConfig struct {
Expand Down Expand Up @@ -426,19 +427,23 @@ func (synchro *ResourceSynchro) handleResourceEvent(event *queue.Event) {
Published: true,
}
synchro.rvsLock.Unlock()
err := synchro.storage.ProcessEvent(context.TODO(), eventType, obj, synchro.cluster)
if err != nil {
return
if middleware.PublisherEnabled {
err := synchro.storage.ProcessEvent(context.TODO(), eventType, obj, synchro.cluster)
if err != nil {
return
}
}
}
} else {
handler, callback = synchro.deleteResource, func(_ runtime.Object, eventType watch.EventType) {
synchro.rvsLock.Lock()
delete(synchro.rvs, key)
synchro.rvsLock.Unlock()
err := synchro.storage.ProcessEvent(context.TODO(), eventType, obj, synchro.cluster)
if err != nil {
return
if middleware.PublisherEnabled {
err := synchro.storage.ProcessEvent(context.TODO(), eventType, obj, synchro.cluster)
if err != nil {
return
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/watcher/middleware/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
)

var GlobalPublisher Publisher
var PublisherEnabled bool = false

type Publisher interface {
InitPublisher(ctx context.Context) error
Expand Down
4 changes: 2 additions & 2 deletions pkg/watcher/middleware/rabbitmq/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const (
SubscribeerName = "rabbitmq"
)

func NewPulisher(mo *options.MiddlerwareOptions) (middleware.Publisher, error) {
func NewPulisher(mo *options.MiddlewareOptions) (middleware.Publisher, error) {
if mo.MaxConnections <= 0 {
mo.MaxConnections = 3
}
Expand All @@ -34,7 +34,7 @@ func NewPulisher(mo *options.MiddlerwareOptions) (middleware.Publisher, error) {
return publisher, nil
}

func NewSubscriber(mo *options.MiddlerwareOptions) (middleware.Subscriber, error) {
func NewSubscriber(mo *options.MiddlewareOptions) (middleware.Subscriber, error) {
if mo.MaxConnections <= 0 {
mo.MaxConnections = 3
}
Expand Down
1 change: 1 addition & 0 deletions pkg/watcher/middleware/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
)

var GlobalSubscriber Subscriber
var SubscriberEnabled bool = false

type Subscriber interface {
InitSubscriber(stopCh <-chan struct{}) error
Expand Down
12 changes: 7 additions & 5 deletions pkg/watcher/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
"github.com/spf13/pflag"
)

type MiddlerwareOptions struct {
type MiddlewareOptions struct {
Enabled bool // middleware enabled
Name string
ServerIp string
ServerPort int
Expand All @@ -20,11 +21,11 @@ type MiddlerwareOptions struct {
CacheSize int
}

func NewMiddlerwareOptions() *MiddlerwareOptions {
return &MiddlerwareOptions{Name: "apiserver", CacheSize: 100}
func NewMiddlerwareOptions() *MiddlewareOptions {
return &MiddlewareOptions{Enabled: false, Name: "rabbitmq", CacheSize: 100}
}

func (o *MiddlerwareOptions) Validate() []error {
func (o *MiddlewareOptions) Validate() []error {
if o == nil {
return nil
}
Expand All @@ -45,7 +46,8 @@ func (o *MiddlerwareOptions) Validate() []error {
return errors
}

func (o *MiddlerwareOptions) AddFlags(fs *pflag.FlagSet) {
func (o *MiddlewareOptions) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&o.Enabled, "middleware-enabled", o.Enabled, "middlerware enabled")
fs.StringVar(&o.Name, "middleware-name", o.Name, "middlerware name")
fs.StringVar(&o.ServerIp, "middleware-serverIp", o.ServerIp, "middlerware server Ip")
fs.IntVar(&o.ServerPort, "middleware-serverPort", o.ServerPort, "middlerware server port")
Expand Down
8 changes: 4 additions & 4 deletions pkg/watcher/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/options"
)

type NewPublisherFunc func(mo *options.MiddlerwareOptions) (middleware.Publisher, error)
type NewSubscriberFunc func(mo *options.MiddlerwareOptions) (middleware.Subscriber, error)
type NewPublisherFunc func(mo *options.MiddlewareOptions) (middleware.Publisher, error)
type NewSubscriberFunc func(mo *options.MiddlewareOptions) (middleware.Subscriber, error)

var publisherFuncs = make(map[string]NewPublisherFunc)
var subscriberFuncs = make(map[string]NewSubscriberFunc)
Expand All @@ -33,7 +33,7 @@ func RegisterSubscriberFunc(name string, f NewSubscriberFunc) {
subscriberFuncs[name] = f
}

func NewPulisher(mo *options.MiddlerwareOptions) error {
func NewPulisher(mo *options.MiddlewareOptions) error {
provider, ok := publisherFuncs[mo.Name]
if !ok {
return fmt.Errorf("publisher %s is unregistered", mo.Name)
Expand All @@ -48,7 +48,7 @@ func NewPulisher(mo *options.MiddlerwareOptions) error {
return nil
}

func NewSubscriber(mo *options.MiddlerwareOptions) error {
func NewSubscriber(mo *options.MiddlewareOptions) error {
provider, ok := subscriberFuncs[mo.Name]
if !ok {
return fmt.Errorf("publisher %s is unregistered", mo.Name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type ListOptions struct {
// +optional
Namespaces string `json:"namespaces,omitempty"`

// +optional
ResourcePrefix string `json:"resourcePrefix,omitempty"`

// +optional
OrderBy string `json:"orderby,omitempty"`

Expand Down
Loading

0 comments on commit b2080cb

Please sign in to comment.