diff --git a/go/pools/smartconnpool/pool.go b/go/pools/smartconnpool/pool.go index d49032f34a1..cc3c42f8851 100644 --- a/go/pools/smartconnpool/pool.go +++ b/go/pools/smartconnpool/pool.go @@ -156,7 +156,6 @@ type ConnPool[C Connection] struct { // The pool must be ConnPool.Open before it can start giving out connections func NewPool[C Connection](config *Config[C]) *ConnPool[C] { pool := &ConnPool[C]{} - pool.freshSettingsStack.Store(-1) pool.config.maxCapacity = config.Capacity pool.config.maxLifetime.Store(config.MaxLifetime.Nanoseconds()) pool.config.idleTimeout.Store(config.IdleTimeout.Nanoseconds()) @@ -195,8 +194,14 @@ func (pool *ConnPool[C]) open() { // The expire worker takes care of removing from the waiter list any clients whose // context has been cancelled. - pool.runWorker(pool.close, 1*time.Second, func(_ time.Time) bool { - pool.wait.expire(false) + pool.runWorker(pool.close, 100*time.Millisecond, func(_ time.Time) bool { + maybeStarving := pool.wait.expire(false) + + // Do not allow connections to starve; if there's waiters in the queue + // and connections in the stack, it means we could be starving them. + // Try getting out a connection and handing it over directly + for n := 0; n < maybeStarving && pool.tryReturnAnyConn(); n++ { + } return true }) @@ -395,16 +400,34 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) { } } - if !pool.wait.tryReturnConn(conn) { - connSetting := conn.Conn.Setting() - if connSetting == nil { - pool.clean.Push(conn) - } else { - stack := connSetting.bucket & stackMask - pool.settings[stack].Push(conn) - pool.freshSettingsStack.Store(int64(stack)) + pool.tryReturnConn(conn) +} + +func (pool *ConnPool[C]) tryReturnConn(conn *Pooled[C]) bool { + if pool.wait.tryReturnConn(conn) { + return true + } + connSetting := conn.Conn.Setting() + if connSetting == nil { + pool.clean.Push(conn) + } else { + stack := connSetting.bucket & stackMask + pool.settings[stack].Push(conn) + pool.freshSettingsStack.Store(int64(stack)) + } + return false +} + +func (pool *ConnPool[C]) tryReturnAnyConn() bool { + if conn, ok := pool.clean.Pop(); ok { + return pool.tryReturnConn(conn) + } + for u := 0; u <= stackMask; u++ { + if conn, ok := pool.settings[u].Pop(); ok { + return pool.tryReturnConn(conn) } } + return false } func (pool *ConnPool[D]) extendedMaxLifetime() time.Duration { @@ -442,14 +465,9 @@ func (pool *ConnPool[C]) connNew(ctx context.Context) (*Pooled[C], error) { } func (pool *ConnPool[C]) getFromSettingsStack(setting *Setting) *Pooled[C] { - fresh := pool.freshSettingsStack.Load() - if fresh < 0 { - return nil - } - var start uint32 if setting == nil { - start = uint32(fresh) + start = uint32(pool.freshSettingsStack.Load()) } else { start = setting.bucket } diff --git a/go/pools/smartconnpool/pool_test.go b/go/pools/smartconnpool/pool_test.go index 701327005ad..a399bdfb3a4 100644 --- a/go/pools/smartconnpool/pool_test.go +++ b/go/pools/smartconnpool/pool_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "reflect" + "sync" "sync/atomic" "testing" "time" @@ -36,6 +37,7 @@ var ( type TestState struct { lastID, open, close, reset atomic.Int64 waits []time.Time + mu sync.Mutex chaos struct { delayConnect time.Duration @@ -45,6 +47,8 @@ type TestState struct { } func (ts *TestState) LogWait(start time.Time) { + ts.mu.Lock() + defer ts.mu.Unlock() ts.waits = append(ts.waits, start) } @@ -1080,3 +1084,68 @@ func TestApplySettingsFailure(t *testing.T) { p.put(r) } } + +func TestGetSpike(t *testing.T) { + var state TestState + + ctx := context.Background() + p := NewPool(&Config[*TestConn]{ + Capacity: 5, + IdleTimeout: time.Second, + LogWait: state.LogWait, + }).Open(newConnector(&state), nil) + + var resources [10]*Pooled[*TestConn] + + // Ensure we have a pool with 5 available resources + for i := 0; i < 5; i++ { + r, err := p.Get(ctx, nil) + + require.NoError(t, err) + resources[i] = r + assert.EqualValues(t, 5-i-1, p.Available()) + assert.Zero(t, p.Metrics.WaitCount()) + assert.Zero(t, len(state.waits)) + assert.Zero(t, p.Metrics.WaitTime()) + assert.EqualValues(t, i+1, state.lastID.Load()) + assert.EqualValues(t, i+1, state.open.Load()) + } + + for i := 0; i < 5; i++ { + p.put(resources[i]) + } + + assert.EqualValues(t, 5, p.Available()) + assert.EqualValues(t, 5, p.Active()) + assert.EqualValues(t, 0, p.InUse()) + + for i := 0; i < 2000; i++ { + wg := sync.WaitGroup{} + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + errs := make(chan error, 80) + + for j := 0; j < 80; j++ { + wg.Add(1) + + go func() { + defer wg.Done() + r, err := p.Get(ctx, nil) + defer p.put(r) + + if err != nil { + errs <- err + } + }() + } + wg.Wait() + + if len(errs) > 0 { + t.Errorf("Error getting connection: %v", <-errs) + } + + close(errs) + } +} diff --git a/go/pools/smartconnpool/waitlist.go b/go/pools/smartconnpool/waitlist.go index f16215f4b14..ef1eb1fe997 100644 --- a/go/pools/smartconnpool/waitlist.go +++ b/go/pools/smartconnpool/waitlist.go @@ -76,7 +76,7 @@ func (wl *waitlist[C]) waitForConn(ctx context.Context, setting *Setting) (*Pool // expire removes and wakes any expired waiter in the waitlist. // if force is true, it'll wake and remove all the waiters. -func (wl *waitlist[C]) expire(force bool) { +func (wl *waitlist[C]) expire(force bool) (maybeStarving int) { if wl.list.Len() == 0 { return } @@ -91,6 +91,9 @@ func (wl *waitlist[C]) expire(force bool) { expired = append(expired, e) continue } + if e.Value.age == 0 { + maybeStarving++ + } } // remove the expired waiters from the waitlist after traversing it for _, e := range expired { @@ -102,6 +105,7 @@ func (wl *waitlist[C]) expire(force bool) { for _, e := range expired { e.Value.sema.notify(false) } + return } // tryReturnConn tries handing over a connection to one of the waiters in the pool.