Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

smartconnpool: do not allow connections to starve #17675

Merged
merged 3 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 37 additions & 20 deletions go/pools/smartconnpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ type ConnPool[C Connection] struct {
// The pool must be ConnPool.Open before it can start giving out connections
func NewPool[C Connection](config *Config[C]) *ConnPool[C] {
pool := &ConnPool[C]{}
pool.freshSettingsStack.Store(-1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was -1 being used as some kind of sentinel value? Or was this completely unnecessary?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was a sentinel value to mark when no Setting connection had been returned to the pool yet, but it was not an interesting optimization, so I removed it.

pool.config.maxCapacity = config.Capacity
pool.config.maxIdleCount = config.MaxIdleCount
pool.config.maxLifetime.Store(config.MaxLifetime.Nanoseconds())
Expand Down Expand Up @@ -202,8 +201,14 @@ func (pool *ConnPool[C]) open() {

// The expire worker takes care of removing from the waiter list any clients whose
// context has been cancelled.
pool.runWorker(pool.close, 1*time.Second, func(_ time.Time) bool {
pool.wait.expire(false)
pool.runWorker(pool.close, 100*time.Millisecond, func(_ time.Time) bool {
maybeStarving := pool.wait.expire(false)

// Do not allow connections to starve; if there's waiters in the queue
// and connections in the stack, it means we could be starving them.
// Try getting out a connection and handing it over directly
for n := 0; n < maybeStarving && pool.tryReturnAnyConn(); n++ {
}
return true
})

Expand Down Expand Up @@ -416,20 +421,37 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) {
}
}

if !pool.wait.tryReturnConn(conn) {
if pool.closeOnIdleLimitReached(conn) {
return
}
pool.tryReturnConn(conn)
}

func (pool *ConnPool[C]) tryReturnConn(conn *Pooled[C]) bool {
if pool.wait.tryReturnConn(conn) {
return true
}
if pool.closeOnIdleLimitReached(conn) {
return false
}
connSetting := conn.Conn.Setting()
if connSetting == nil {
pool.clean.Push(conn)
} else {
stack := connSetting.bucket & stackMask
pool.settings[stack].Push(conn)
pool.freshSettingsStack.Store(int64(stack))
}
return false
}

connSetting := conn.Conn.Setting()
if connSetting == nil {
pool.clean.Push(conn)
} else {
stack := connSetting.bucket & stackMask
pool.settings[stack].Push(conn)
pool.freshSettingsStack.Store(int64(stack))
func (pool *ConnPool[C]) tryReturnAnyConn() bool {
if conn, ok := pool.clean.Pop(); ok {
return pool.tryReturnConn(conn)
}
for u := 0; u <= stackMask; u++ {
if conn, ok := pool.settings[u].Pop(); ok {
return pool.tryReturnConn(conn)
}
}
return false
}

// closeOnIdleLimitReached closes a connection if the number of idle connections (active - inuse) in the pool
Expand Down Expand Up @@ -484,14 +506,9 @@ func (pool *ConnPool[C]) connNew(ctx context.Context) (*Pooled[C], error) {
}

func (pool *ConnPool[C]) getFromSettingsStack(setting *Setting) *Pooled[C] {
fresh := pool.freshSettingsStack.Load()
if fresh < 0 {
return nil
}

var start uint32
if setting == nil {
start = uint32(fresh)
start = uint32(pool.freshSettingsStack.Load())
} else {
start = setting.bucket
}
Expand Down
69 changes: 69 additions & 0 deletions go/pools/smartconnpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"
Expand All @@ -36,6 +37,7 @@ var (
type TestState struct {
lastID, open, close, reset atomic.Int64
waits []time.Time
mu sync.Mutex

chaos struct {
delayConnect time.Duration
Expand All @@ -45,6 +47,8 @@ type TestState struct {
}

func (ts *TestState) LogWait(start time.Time) {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.waits = append(ts.waits, start)
}

Expand Down Expand Up @@ -1125,3 +1129,68 @@ func TestApplySettingsFailure(t *testing.T) {
p.put(r)
}
}

func TestGetSpike(t *testing.T) {
var state TestState

ctx := context.Background()
p := NewPool(&Config[*TestConn]{
Capacity: 5,
IdleTimeout: time.Second,
LogWait: state.LogWait,
}).Open(newConnector(&state), nil)

var resources [10]*Pooled[*TestConn]

// Ensure we have a pool with 5 available resources
for i := 0; i < 5; i++ {
r, err := p.Get(ctx, nil)

require.NoError(t, err)
resources[i] = r
assert.EqualValues(t, 5-i-1, p.Available())
assert.Zero(t, p.Metrics.WaitCount())
assert.Zero(t, len(state.waits))
assert.Zero(t, p.Metrics.WaitTime())
assert.EqualValues(t, i+1, state.lastID.Load())
assert.EqualValues(t, i+1, state.open.Load())
}

for i := 0; i < 5; i++ {
p.put(resources[i])
}

assert.EqualValues(t, 5, p.Available())
assert.EqualValues(t, 5, p.Active())
assert.EqualValues(t, 0, p.InUse())

for i := 0; i < 2000; i++ {
wg := sync.WaitGroup{}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

errs := make(chan error, 80)

for j := 0; j < 80; j++ {
wg.Add(1)

go func() {
defer wg.Done()
r, err := p.Get(ctx, nil)
defer p.put(r)

if err != nil {
errs <- err
}
}()
}
wg.Wait()

if len(errs) > 0 {
t.Errorf("Error getting connection: %v", <-errs)
}

close(errs)
}
}
6 changes: 5 additions & 1 deletion go/pools/smartconnpool/waitlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (wl *waitlist[C]) waitForConn(ctx context.Context, setting *Setting) (*Pool

// expire removes and wakes any expired waiter in the waitlist.
// if force is true, it'll wake and remove all the waiters.
func (wl *waitlist[C]) expire(force bool) {
func (wl *waitlist[C]) expire(force bool) (maybeStarving int) {
if wl.list.Len() == 0 {
return
}
Expand All @@ -91,6 +91,9 @@ func (wl *waitlist[C]) expire(force bool) {
expired = append(expired, e)
continue
}
if e.Value.age == 0 {
maybeStarving++
}
}
// remove the expired waiters from the waitlist after traversing it
for _, e := range expired {
Expand All @@ -102,6 +105,7 @@ func (wl *waitlist[C]) expire(force bool) {
for _, e := range expired {
e.Value.sema.notify(false)
}
return
}

// tryReturnConn tries handing over a connection to one of the waiters in the pool.
Expand Down
Loading