Skip to content

Commit

Permalink
Cherry-pick 30c09f5 with conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
vitess-bot[bot] authored and vitess-bot committed Feb 3, 2025
1 parent 4411ffc commit 78c2b82
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 10 deletions.
50 changes: 41 additions & 9 deletions go/pools/smartconnpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ type ConnPool[C Connection] struct {
// The pool must be ConnPool.Open before it can start giving out connections
func NewPool[C Connection](config *Config[C]) *ConnPool[C] {
pool := &ConnPool[C]{}
pool.freshSettingsStack.Store(-1)
pool.config.maxCapacity = config.Capacity
pool.config.maxLifetime.Store(config.MaxLifetime.Nanoseconds())
pool.config.idleTimeout.Store(config.IdleTimeout.Nanoseconds())
Expand Down Expand Up @@ -195,8 +194,14 @@ func (pool *ConnPool[C]) open() {

// The expire worker takes care of removing from the waiter list any clients whose
// context has been cancelled.
pool.runWorker(pool.close, 1*time.Second, func(_ time.Time) bool {
pool.wait.expire(false)
pool.runWorker(pool.close, 100*time.Millisecond, func(_ time.Time) bool {
maybeStarving := pool.wait.expire(false)

// Do not allow connections to starve; if there's waiters in the queue
// and connections in the stack, it means we could be starving them.
// Try getting out a connection and handing it over directly
for n := 0; n < maybeStarving && pool.tryReturnAnyConn(); n++ {
}
return true
})

Expand Down Expand Up @@ -395,6 +400,7 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) {
}
}

<<<<<<< HEAD

Check failure on line 403 in go/pools/smartconnpool/pool.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected <<, expected }

Check failure on line 403 in go/pools/smartconnpool/pool.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected <<, expected }
if !pool.wait.tryReturnConn(conn) {
connSetting := conn.Conn.Setting()
if connSetting == nil {
Expand All @@ -403,8 +409,39 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) {
stack := connSetting.bucket & stackMask
pool.settings[stack].Push(conn)
pool.freshSettingsStack.Store(int64(stack))
=======
pool.tryReturnConn(conn)
}

func (pool *ConnPool[C]) tryReturnConn(conn *Pooled[C]) bool {
if pool.wait.tryReturnConn(conn) {
return true
}
if pool.closeOnIdleLimitReached(conn) {
return false
}
connSetting := conn.Conn.Setting()
if connSetting == nil {
pool.clean.Push(conn)
} else {
stack := connSetting.bucket & stackMask
pool.settings[stack].Push(conn)
pool.freshSettingsStack.Store(int64(stack))
}
return false
}

func (pool *ConnPool[C]) tryReturnAnyConn() bool {
if conn, ok := pool.clean.Pop(); ok {
return pool.tryReturnConn(conn)
}
for u := 0; u <= stackMask; u++ {
if conn, ok := pool.settings[u].Pop(); ok {
return pool.tryReturnConn(conn)
>>>>>>> 30c09f56be (smartconnpool: do not allow connections to starve (#17675))

Check failure on line 441 in go/pools/smartconnpool/pool.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected >>, expected }

Check failure on line 441 in go/pools/smartconnpool/pool.go

View workflow job for this annotation

GitHub Actions / Code Coverage

invalid character U+0023 '#'

Check failure on line 441 in go/pools/smartconnpool/pool.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected >>, expected }

Check failure on line 441 in go/pools/smartconnpool/pool.go

View workflow job for this annotation

GitHub Actions / Code Coverage

invalid character U+0023 '#'
}
}
return false

Check failure on line 444 in go/pools/smartconnpool/pool.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: non-declaration statement outside function body

Check failure on line 444 in go/pools/smartconnpool/pool.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: non-declaration statement outside function body
}

func (pool *ConnPool[D]) extendedMaxLifetime() time.Duration {
Expand Down Expand Up @@ -443,14 +480,9 @@ func (pool *ConnPool[C]) connNew(ctx context.Context) (*Pooled[C], error) {
}

func (pool *ConnPool[C]) getFromSettingsStack(setting *Setting) *Pooled[C] {
fresh := pool.freshSettingsStack.Load()
if fresh < 0 {
return nil
}

var start uint32
if setting == nil {
start = uint32(fresh)
start = uint32(pool.freshSettingsStack.Load())
} else {
start = setting.bucket
}
Expand Down
69 changes: 69 additions & 0 deletions go/pools/smartconnpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"
Expand All @@ -36,6 +37,7 @@ var (
type TestState struct {
lastID, open, close, reset atomic.Int64
waits []time.Time
mu sync.Mutex

chaos struct {
delayConnect time.Duration
Expand All @@ -45,6 +47,8 @@ type TestState struct {
}

func (ts *TestState) LogWait(start time.Time) {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.waits = append(ts.waits, start)
}

Expand Down Expand Up @@ -1080,3 +1084,68 @@ func TestApplySettingsFailure(t *testing.T) {
p.put(r)
}
}

func TestGetSpike(t *testing.T) {
var state TestState

ctx := context.Background()
p := NewPool(&Config[*TestConn]{
Capacity: 5,
IdleTimeout: time.Second,
LogWait: state.LogWait,
}).Open(newConnector(&state), nil)

var resources [10]*Pooled[*TestConn]

// Ensure we have a pool with 5 available resources
for i := 0; i < 5; i++ {
r, err := p.Get(ctx, nil)

require.NoError(t, err)
resources[i] = r
assert.EqualValues(t, 5-i-1, p.Available())
assert.Zero(t, p.Metrics.WaitCount())
assert.Zero(t, len(state.waits))
assert.Zero(t, p.Metrics.WaitTime())
assert.EqualValues(t, i+1, state.lastID.Load())
assert.EqualValues(t, i+1, state.open.Load())
}

for i := 0; i < 5; i++ {
p.put(resources[i])
}

assert.EqualValues(t, 5, p.Available())
assert.EqualValues(t, 5, p.Active())
assert.EqualValues(t, 0, p.InUse())

for i := 0; i < 2000; i++ {
wg := sync.WaitGroup{}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

errs := make(chan error, 80)

for j := 0; j < 80; j++ {
wg.Add(1)

go func() {
defer wg.Done()
r, err := p.Get(ctx, nil)
defer p.put(r)

if err != nil {
errs <- err
}
}()
}
wg.Wait()

if len(errs) > 0 {
t.Errorf("Error getting connection: %v", <-errs)
}

close(errs)
}
}
6 changes: 5 additions & 1 deletion go/pools/smartconnpool/waitlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (wl *waitlist[C]) waitForConn(ctx context.Context, setting *Setting) (*Pool

// expire removes and wakes any expired waiter in the waitlist.
// if force is true, it'll wake and remove all the waiters.
func (wl *waitlist[C]) expire(force bool) {
func (wl *waitlist[C]) expire(force bool) (maybeStarving int) {
if wl.list.Len() == 0 {
return
}
Expand All @@ -91,6 +91,9 @@ func (wl *waitlist[C]) expire(force bool) {
expired = append(expired, e)
continue
}
if e.Value.age == 0 {
maybeStarving++
}
}
// remove the expired waiters from the waitlist after traversing it
for _, e := range expired {
Expand All @@ -102,6 +105,7 @@ func (wl *waitlist[C]) expire(force bool) {
for _, e := range expired {
e.Value.sema.notify(false)
}
return
}

// tryReturnConn tries handing over a connection to one of the waiters in the pool.
Expand Down

0 comments on commit 78c2b82

Please sign in to comment.