Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry-pick 2529b667 notReady nodes in scaledown #26

Open
wants to merge 4 commits into
base: datadog-master-3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/aws_local_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package aws

import (
"strings"
)

var volumesPerInstanceFamilly = map[string]int64{
"c1": 1,
"c3": 1,
"c5ad": 1,
"c5d": 1,
"c6gd": 1,
"d2": 1,
"f1": 1,
"g2": 1,
"g4dn": 1,
"h1": 1,
"i2": 1,
"i3": 1,
"i3en": 1,
"i3p": 1,
"m1": 1,
"m2": 1,
"m3": 1,
"m5ad": 1,
"m5d": 1,
"m5dn": 1,
"m6gd": 1,
"p3dn": 1,
"r3": 1,
"r5ad": 1,
"r5d": 1,
"r5dn": 1,
"r6gd": 1,
"x1": 1,
"x1e": 1,
"z1d": 1,
}

func numberOfLocalVolumes(instanceType string) int64 {
familly := strings.Split(instanceType, ".")[0]
if volumes, ok := volumesPerInstanceFamilly[familly]; ok {
return volumes
}
return 0
}
1 change: 1 addition & 0 deletions cluster-autoscaler/cloudprovider/aws/aws_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ func (m *AwsManager) buildNodeFromTemplate(asg *asg, template *asgTemplate) (*ap
node.Status.Capacity[apiv1.ResourceCPU] = *resource.NewQuantity(template.InstanceType.VCPU, resource.DecimalSI)
node.Status.Capacity[gpu.ResourceNvidiaGPU] = *resource.NewQuantity(template.InstanceType.GPU, resource.DecimalSI)
node.Status.Capacity[apiv1.ResourceMemory] = *resource.NewQuantity(instanceMemoryBi, resource.BinarySI)
node.Status.Capacity["storageclass/local-data"] = *resource.NewQuantity(numberOfLocalVolumes(template.InstanceType.InstanceType), resource.DecimalSI)

resourcesFromTags := extractAllocatableResourcesFromAsg(template.Tags)
for resourceName, val := range resourcesFromTags {
Expand Down
21 changes: 21 additions & 0 deletions cluster-autoscaler/cloudprovider/azure/azure_local_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package azure

import (
"regexp"
)

// https://github.com/DataDog/k8s-nodegroups/blob/controller-runtime-v1/pkg/cloud/azure/clients/resource_skus_cache.go#L21
var volumesPerInstanceFamilly = map[string]int64{
"Standard_L\\d+s_v2": 1, // standardLSv2Family
"Standard_E\\d+d_v4": 1, // standardEDv4Family
"Standard_E.*ds_v4": 1, // standardEDSv4Family
}

func numberOfLocalVolumes(instanceType string) int64 {
for familly, volumes := range volumesPerInstanceFamilly {
if match, _ := regexp.MatchString(familly, instanceType); match {
return volumes
}
}
return 0
}
5 changes: 5 additions & 0 deletions cluster-autoscaler/cloudprovider/azure/azure_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ func buildNodeFromTemplate(scaleSetName string, template compute.VirtualMachineS
node.Status.Capacity[apiv1.ResourceName(resourceName)] = *val
}

if vmssType.InstanceType != "" {
volumes := numberOfLocalVolumes(vmssType.InstanceType)
node.Status.Capacity["storageclass/local-data"] = *resource.NewQuantity(volumes, resource.DecimalSI)
}

// TODO: set real allocatable.
node.Status.Allocatable = node.Status.Capacity

Expand Down
50 changes: 44 additions & 6 deletions cluster-autoscaler/cloudprovider/gce/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ limitations under the License.
package gce

import (
"context"
"fmt"
"os"
"reflect"
"strconv"
"strings"
"sync"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/client-go/util/workqueue"

gce "google.golang.org/api/compute/v1"
klog "k8s.io/klog/v2"
Expand Down Expand Up @@ -222,9 +226,28 @@ func (gc *GceCache) getMigNoLock(migRef GceRef) (mig Mig, found bool) {

// RegenerateInstanceCacheForMig triggers instances cache regeneration for single MIG under lock.
func (gc *GceCache) RegenerateInstanceCacheForMig(migRef GceRef) error {
klog.V(4).Infof("Regenerating MIG information for %s", migRef.String())

instances, err := gc.GceService.FetchMigInstances(migRef)
if err != nil {
klog.V(4).Infof("Failed MIG info request for %s: %v", migRef.String(), err)
return err
}

gc.cacheMutex.Lock()
defer gc.cacheMutex.Unlock()
return gc.regenerateInstanceCacheForMigNoLock(migRef)

// cleanup old entries
gc.removeInstancesForMigs(migRef)

for _, instance := range instances {
instanceRef, err := GceRefFromProviderId(instance.Id)
if err != nil {
return err
}
gc.instanceRefToMigRef[instanceRef] = migRef
}
return nil
}

func (gc *GceCache) regenerateInstanceCacheForMigNoLock(migRef GceRef) error {
Expand All @@ -250,17 +273,32 @@ func (gc *GceCache) regenerateInstanceCacheForMigNoLock(migRef GceRef) error {

// RegenerateInstancesCache triggers instances cache regeneration under lock.
func (gc *GceCache) RegenerateInstancesCache() error {
gc.cacheMutex.Lock()
defer gc.cacheMutex.Unlock()

gc.instanceRefToMigRef = make(map[GceRef]GceRef)
gc.instancesFromUnknownMigs = make(map[GceRef]struct{})
for _, migRef := range gc.getMigRefs() {
err := gc.regenerateInstanceCacheForMigNoLock(migRef)

concurrency := 5
if value, exists := os.LookupEnv("GCP_MAX_CONCURRENCY"); exists {
if i, err := strconv.Atoi(value); err == nil {
concurrency = i
}
}

migs := gc.getMigRefs()
errors := make([]error, len(migs))
ctx, cancel := context.WithCancel(context.Background())
workqueue.ParallelizeUntil(ctx, concurrency, len(migs), func(piece int) {
errors[piece] = gc.RegenerateInstanceCacheForMig(migs[piece])
if errors[piece] != nil {
cancel()
}
}, workqueue.WithChunkSize(concurrency))

for _, err := range errors {
if err != nil {
return err
}
}

return nil
}

Expand Down
34 changes: 26 additions & 8 deletions cluster-autoscaler/cloudprovider/gce/gce_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,24 @@ limitations under the License.
package gce

import (
"context"
"errors"
"fmt"
"io"
"os"
"regexp"
"strconv"
"strings"
"sync/atomic"
"time"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
"k8s.io/client-go/util/workqueue"

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
provider_gce "k8s.io/legacy-cloud-providers/gce"

"cloud.google.com/go/compute/metadata"
Expand Down Expand Up @@ -359,12 +362,22 @@ func (m *gceManagerImpl) buildMigFromSpec(s *dynamic.NodeGroupSpec) (Mig, error)
// they no longer exist in GCE.
func (m *gceManagerImpl) fetchAutoMigs() error {
exists := make(map[GceRef]bool)
changed := false
var changed int32 = 0

concurrency := 5
if value, exists := os.LookupEnv("GCP_MAX_CONCURRENCY"); exists {
if i, err := strconv.Atoi(value); err == nil {
concurrency = i
}
}

toRegister := make([]Mig, 0)
for _, cfg := range m.migAutoDiscoverySpecs {
links, err := m.findMigsNamed(cfg.Re)
if err != nil {
return fmt.Errorf("cannot autodiscover managed instance groups: %v", err)
}

for _, link := range links {
mig, err := m.buildMigFromAutoCfg(link, cfg)
if err != nil {
Expand All @@ -378,21 +391,26 @@ func (m *gceManagerImpl) fetchAutoMigs() error {
klog.V(3).Infof("Ignoring explicitly configured MIG %s in autodiscovery.", mig.GceRef().String())
continue
}
if m.registerMig(mig) {
klog.V(3).Infof("Autodiscovered MIG %s using regexp %s", mig.GceRef().String(), cfg.Re.String())
changed = true
}
toRegister = append(toRegister, mig)
}
}

workqueue.ParallelizeUntil(context.Background(), concurrency, len(toRegister), func(piece int) {
mig := toRegister[piece]
if m.registerMig(mig) {
klog.V(3).Infof("Autodiscovered MIG %s", mig.GceRef().String())
atomic.StoreInt32(&changed, int32(1))
}
}, workqueue.WithChunkSize(concurrency))

for _, mig := range m.GetMigs() {
if !exists[mig.GceRef()] && !m.explicitlyConfigured[mig.GceRef()] {
m.cache.UnregisterMig(mig)
changed = true
atomic.StoreInt32(&changed, int32(1))
}
}

if changed {
if atomic.LoadInt32(&changed) > 0 {
return m.cache.RegenerateInstancesCache()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,13 @@ func NewCachingMigInstanceTemplatesProvider(cache *GceCache, gceClient Autoscali
// GetMigInstanceTemplate returns instance template for MIG with given ref
func (p *CachingMigInstanceTemplatesProvider) GetMigInstanceTemplate(migRef GceRef) (*gce.InstanceTemplate, error) {
p.mutex.Lock()
defer p.mutex.Unlock()

if !p.lastRefresh.Add(migInstanceCacheRefreshInterval).After(time.Now()) {
p.cache.InvalidateAllMigInstanceTemplates()
p.lastRefresh = time.Now()
}

instanceTemplate, found := p.cache.GetMigInstanceTemplate(migRef)
p.mutex.Unlock()

if found {
return instanceTemplate, nil
Expand All @@ -69,6 +68,8 @@ func (p *CachingMigInstanceTemplatesProvider) GetMigInstanceTemplate(migRef GceR
if err != nil {
return nil, err
}
p.mutex.Lock()
p.cache.SetMigInstanceTemplate(migRef, instanceTemplate)
p.mutex.Unlock()
return instanceTemplate, nil
}
5 changes: 5 additions & 0 deletions cluster-autoscaler/cloudprovider/gce/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ func (t *GceTemplateBuilder) BuildNodeFromTemplate(mig Mig, template *gce.Instan
if err != nil {
return nil, err
}
var storage int64 = 0
if len(template.Properties.Disks) > 1 {
storage = 1
}
capacity["storageclass/local-data"] = *resource.NewQuantity(storage, resource.DecimalSI)
node.Status = apiv1.NodeStatus{
Capacity: capacity,
}
Expand Down
Loading