From 0a87be5a13ee7346b0220a86072248cffe0dd01c Mon Sep 17 00:00:00 2001 From: Iceber Gu Date: Fri, 10 Dec 2021 23:04:18 +0800 Subject: [PATCH] change reflector to watch-relist --- .../clustersynchro/informer/reflector.go | 235 ++++++++++-------- 1 file changed, 129 insertions(+), 106 deletions(-) diff --git a/pkg/synchromanager/clustersynchro/informer/reflector.go b/pkg/synchromanager/clustersynchro/informer/reflector.go index 07b8a62c9..131128557 100644 --- a/pkg/synchromanager/clustersynchro/informer/reflector.go +++ b/pkg/synchromanager/clustersynchro/informer/reflector.go @@ -1,6 +1,10 @@ /* Reference from https://github.com/kubernetes/kubernetes/blob/b695d79d4f967c403a96986f1750a35eb75e75f1/staging/src/k8s.io/client-go/tools/cache/reflector.go + +Changes: + * add isLastSyncResourceVersionExpired. If lastSyncResourceVersion is expired, set true. + * change reflector.ListAndWatch to reflector.WatchAndRelist */ package informer @@ -89,6 +93,8 @@ type Reflector struct { WatchListPageSize int64 // Called whenever the ListAndWatch drops the connection with an error. watchErrorHandler WatchErrorHandler + + isLastSyncResourceVersionExpired bool } // ResourceVersionUpdater is an interface that allows store implementation to @@ -208,7 +214,7 @@ var internalPackages = []string{"client-go/tools/cache/"} func (r *Reflector) Run(stopCh <-chan struct{}) { klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) wait.BackoffUntil(func() { - if err := r.ListAndWatch(stopCh); err != nil { + if err := r.WatchAndRelist(stopCh); err != nil { r.watchErrorHandler(r, err) } }, r.backoffManager, true, stopCh) @@ -238,117 +244,23 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { return t.C(), t.Stop } -// ListAndWatch first lists all items and get the resource version at the moment of call, -// and then use the resource version to watch. +// WatchAndRelist use the resource version to watch, if relistResourceVersion is "", "0" +// or the lastSyncResourceVersion is expired, call relist() to get all items. // It returns error if ListAndWatch didn't even try to initialize watch. -func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { +func (r *Reflector) WatchAndRelist(stopCh <-chan struct{}) error { klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name) - var resourceVersion string - - options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()} - - if err := func() error { - initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name}) - defer initTrace.LogIfLong(10 * time.Second) - var list runtime.Object - var paginatedResult bool - var err error - listCh := make(chan struct{}, 1) - panicCh := make(chan interface{}, 1) - go func() { - defer func() { - if r := recover(); r != nil { - panicCh <- r - } - }() - // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first - // list request will return the full response. - pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { - return r.listerWatcher.List(opts) - })) - switch { - case r.WatchListPageSize != 0: - pager.PageSize = r.WatchListPageSize - case r.paginatedResult: - // We got a paginated result initially. Assume this resource and server honor - // paging requests (i.e. watch cache is probably disabled) and leave the default - // pager size set. - case options.ResourceVersion != "" && options.ResourceVersion != "0": - // User didn't explicitly request pagination. - // - // With ResourceVersion != "", we have a possibility to list from watch cache, - // but we do that (for ResourceVersion != "0") only if Limit is unset. - // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly - // switch off pagination to force listing from watch cache (if enabled). - // With the existing semantic of RV (result is at least as fresh as provided RV), - // this is correct and doesn't lead to going back in time. - // - // We also don't turn off pagination for ResourceVersion="0", since watch cache - // is ignoring Limit in that case anyway, and if watch cache is not enabled - // we don't introduce regression. - pager.PageSize = 0 - } + relistResourceVersion := r.relistResourceVersion() + if relistResourceVersion == "" || relistResourceVersion == "0" || r.isLastSyncResourceVersionExpired { + if err := r.relist(stopCh); err != nil { + return err + } + r.isLastSyncResourceVersionExpired = false - list, paginatedResult, err = pager.List(context.Background(), options) - if isExpiredError(err) || isTooLargeResourceVersionError(err) { - r.setIsLastSyncResourceVersionUnavailable(true) - // Retry immediately if the resource version used to list is unavailable. - // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on - // continuation pages, but the pager might not be enabled, the full list might fail because the - // resource version it is listing at is expired or the cache may not yet be synced to the provided - // resource version. So we need to fallback to resourceVersion="" in all to recover and ensure - // the reflector makes forward progress. - list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) - } - close(listCh) - }() select { case <-stopCh: return nil - case r := <-panicCh: - panic(r) - case <-listCh: - } - if err != nil { - return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err) - } - - // We check if the list was paginated and if so set the paginatedResult based on that. - // However, we want to do that only for the initial list (which is the only case - // when we set ResourceVersion="0"). The reasoning behind it is that later, in some - // situations we may force listing directly from etcd (by setting ResourceVersion="") - // which will return paginated result, even if watch cache is enabled. However, in - // that case, we still want to prefer sending requests to watch cache if possible. - // - // Paginated result returned for request with ResourceVersion="0" mean that watch - // cache is disabled and there are a lot of objects of a given type. In such case, - // there is no need to prefer listing from watch cache. - if options.ResourceVersion == "0" && paginatedResult { - r.paginatedResult = true - } - - r.setIsLastSyncResourceVersionUnavailable(false) // list was successful - initTrace.Step("Objects listed") - listMetaInterface, err := meta.ListAccessor(list) - if err != nil { - return fmt.Errorf("unable to understand list result %#v: %v", list, err) - } - resourceVersion = listMetaInterface.GetResourceVersion() - initTrace.Step("Resource version extracted") - items, err := meta.ExtractList(list) - if err != nil { - return fmt.Errorf("unable to understand list result %#v (%v)", list, err) - } - initTrace.Step("Objects extracted") - if err := r.syncWith(items, resourceVersion); err != nil { - return fmt.Errorf("unable to sync list result: %v", err) + default: } - initTrace.Step("SyncWith done") - r.setLastSyncResourceVersion(resourceVersion) - initTrace.Step("Resource version updated") - return nil - }(); err != nil { - return err } resyncerrc := make(chan error, 1) @@ -379,6 +291,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { } }() + resourceVersion := r.LastSyncResourceVersion() for { // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors select { @@ -388,7 +301,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { } timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) - options = metav1.ListOptions{ + options := metav1.ListOptions{ ResourceVersion: resourceVersion, // We want to avoid situations of hanging watchers. Stop any wachers that do not // receive any events within the timeout window. @@ -412,6 +325,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { <-r.initConnBackoffManager.Backoff().C() continue } + + if isExpiredError(err) { + r.isLastSyncResourceVersionExpired = true + } return err } @@ -423,6 +340,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { // has a semantic that it returns data at least as fresh as provided RV. // So first try to LIST with setting RV to resource version of last observed object. klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) + r.isLastSyncResourceVersionExpired = true case apierrors.IsTooManyRequests(err): klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName) <-r.initConnBackoffManager.Backoff().C() @@ -436,6 +354,111 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { } } +func (r *Reflector) relist(stopCh <-chan struct{}) error { + klog.V(2).Infof("Relist %v from %s", r.expectedTypeName, r.name) + options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()} + initTrace := trace.New("Reflector Relist", trace.Field{Key: "name", Value: r.name}) + defer initTrace.LogIfLong(10 * time.Second) + + var list runtime.Object + var paginatedResult bool + var err error + listCh := make(chan struct{}, 1) + panicCh := make(chan interface{}, 1) + go func() { + defer func() { + if r := recover(); r != nil { + panicCh <- r + } + }() + // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first + // list request will return the full response. + pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { + return r.listerWatcher.List(opts) + })) + switch { + case r.WatchListPageSize != 0: + pager.PageSize = r.WatchListPageSize + case r.paginatedResult: + // We got a paginated result initially. Assume this resource and server honor + // paging requests (i.e. watch cache is probably disabled) and leave the default + // pager size set. + case options.ResourceVersion != "" && options.ResourceVersion != "0": + // User didn't explicitly request pagination. + // + // With ResourceVersion != "", we have a possibility to list from watch cache, + // but we do that (for ResourceVersion != "0") only if Limit is unset. + // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly + // switch off pagination to force listing from watch cache (if enabled). + // With the existing semantic of RV (result is at least as fresh as provided RV), + // this is correct and doesn't lead to going back in time. + // + // We also don't turn off pagination for ResourceVersion="0", since watch cache + // is ignoring Limit in that case anyway, and if watch cache is not enabled + // we don't introduce regression. + pager.PageSize = 0 + } + + list, paginatedResult, err = pager.List(context.Background(), options) + if isExpiredError(err) || isTooLargeResourceVersionError(err) { + r.setIsLastSyncResourceVersionUnavailable(true) + // Retry immediately if the resource version used to list is unavailable. + // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on + // continuation pages, but the pager might not be enabled, the full list might fail because the + // resource version it is listing at is expired or the cache may not yet be synced to the provided + // resource version. So we need to fallback to resourceVersion="" in all to recover and ensure + // the reflector makes forward progress. + list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) + } + close(listCh) + }() + select { + case <-stopCh: + return nil + case r := <-panicCh: + panic(r) + case <-listCh: + } + if err != nil { + return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err) + } + + // We check if the list was paginated and if so set the paginatedResult based on that. + // However, we want to do that only for the initial list (which is the only case + // when we set ResourceVersion="0"). The reasoning behind it is that later, in some + // situations we may force listing directly from etcd (by setting ResourceVersion="") + // which will return paginated result, even if watch cache is enabled. However, in + // that case, we still want to prefer sending requests to watch cache if possible. + // + // Paginated result returned for request with ResourceVersion="0" mean that watch + // cache is disabled and there are a lot of objects of a given type. In such case, + // there is no need to prefer listing from watch cache. + if options.ResourceVersion == "0" && paginatedResult { + r.paginatedResult = true + } + + r.setIsLastSyncResourceVersionUnavailable(false) // list was successful + initTrace.Step("Objects listed") + listMetaInterface, err := meta.ListAccessor(list) + if err != nil { + return fmt.Errorf("unable to understand list result %#v: %v", list, err) + } + resourceVersion := listMetaInterface.GetResourceVersion() + initTrace.Step("Resource version extracted") + items, err := meta.ExtractList(list) + if err != nil { + return fmt.Errorf("unable to understand list result %#v (%v)", list, err) + } + initTrace.Step("Objects extracted") + if err := r.syncWith(items, resourceVersion); err != nil { + return fmt.Errorf("unable to sync list result: %v", err) + } + initTrace.Step("SyncWith done") + r.setLastSyncResourceVersion(resourceVersion) + initTrace.Step("Resource version updated") + return nil +} + // syncWith replaces the store's items with the given list. func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error { found := make([]interface{}, 0, len(items))