diff --git a/go/pools/smartconnpool/pool.go b/go/pools/smartconnpool/pool.go index b024cc656df..52346adb1a4 100644 --- a/go/pools/smartconnpool/pool.go +++ b/go/pools/smartconnpool/pool.go @@ -161,7 +161,6 @@ type ConnPool[C Connection] struct { // The pool must be ConnPool.Open before it can start giving out connections func NewPool[C Connection](config *Config[C]) *ConnPool[C] { pool := &ConnPool[C]{} - pool.freshSettingsStack.Store(-1) pool.config.maxCapacity = config.Capacity pool.config.maxIdleCount = config.MaxIdleCount pool.config.maxLifetime.Store(config.MaxLifetime.Nanoseconds()) @@ -202,8 +201,14 @@ func (pool *ConnPool[C]) open() { // The expire worker takes care of removing from the waiter list any clients whose // context has been cancelled. - pool.runWorker(pool.close, 1*time.Second, func(_ time.Time) bool { - pool.wait.expire(false) + pool.runWorker(pool.close, 100*time.Millisecond, func(_ time.Time) bool { + maybeStarving := pool.wait.expire(false) + + // Do not allow connections to starve; if there's waiters in the queue + // and connections in the stack, it means we could be starving them. + // Try getting out a connection and handing it over directly + for n := 0; n < maybeStarving && pool.tryReturnAnyConn(); n++ { + } return true }) @@ -416,20 +421,37 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) { } } - if !pool.wait.tryReturnConn(conn) { - if pool.closeOnIdleLimitReached(conn) { - return - } + pool.tryReturnConn(conn) +} + +func (pool *ConnPool[C]) tryReturnConn(conn *Pooled[C]) bool { + if pool.wait.tryReturnConn(conn) { + return true + } + if pool.closeOnIdleLimitReached(conn) { + return false + } + connSetting := conn.Conn.Setting() + if connSetting == nil { + pool.clean.Push(conn) + } else { + stack := connSetting.bucket & stackMask + pool.settings[stack].Push(conn) + pool.freshSettingsStack.Store(int64(stack)) + } + return false +} - connSetting := conn.Conn.Setting() - if connSetting == nil { - pool.clean.Push(conn) - } else { - stack := connSetting.bucket & stackMask - pool.settings[stack].Push(conn) - pool.freshSettingsStack.Store(int64(stack)) +func (pool *ConnPool[C]) tryReturnAnyConn() bool { + if conn, ok := pool.clean.Pop(); ok { + return pool.tryReturnConn(conn) + } + for u := 0; u <= stackMask; u++ { + if conn, ok := pool.settings[u].Pop(); ok { + return pool.tryReturnConn(conn) } } + return false } // closeOnIdleLimitReached closes a connection if the number of idle connections (active - inuse) in the pool @@ -484,14 +506,9 @@ func (pool *ConnPool[C]) connNew(ctx context.Context) (*Pooled[C], error) { } func (pool *ConnPool[C]) getFromSettingsStack(setting *Setting) *Pooled[C] { - fresh := pool.freshSettingsStack.Load() - if fresh < 0 { - return nil - } - var start uint32 if setting == nil { - start = uint32(fresh) + start = uint32(pool.freshSettingsStack.Load()) } else { start = setting.bucket } diff --git a/go/pools/smartconnpool/waitlist.go b/go/pools/smartconnpool/waitlist.go index f16215f4b14..ef1eb1fe997 100644 --- a/go/pools/smartconnpool/waitlist.go +++ b/go/pools/smartconnpool/waitlist.go @@ -76,7 +76,7 @@ func (wl *waitlist[C]) waitForConn(ctx context.Context, setting *Setting) (*Pool // expire removes and wakes any expired waiter in the waitlist. // if force is true, it'll wake and remove all the waiters. -func (wl *waitlist[C]) expire(force bool) { +func (wl *waitlist[C]) expire(force bool) (maybeStarving int) { if wl.list.Len() == 0 { return } @@ -91,6 +91,9 @@ func (wl *waitlist[C]) expire(force bool) { expired = append(expired, e) continue } + if e.Value.age == 0 { + maybeStarving++ + } } // remove the expired waiters from the waitlist after traversing it for _, e := range expired { @@ -102,6 +105,7 @@ func (wl *waitlist[C]) expire(force bool) { for _, e := range expired { e.Value.sema.notify(false) } + return } // tryReturnConn tries handing over a connection to one of the waiters in the pool.