diff --git a/sonic/controller.go b/sonic/controller.go index 3e8a58e..5071330 100644 --- a/sonic/controller.go +++ b/sonic/controller.go @@ -20,6 +20,7 @@ func newDriversHolder( opts.PoolMinConnections, opts.PoolMaxConnections, opts.PoolPingThreshold, + opts.PoolMaxIdleLifetime, ) if err != nil { return nil, err diff --git a/sonic/driver.go b/sonic/driver.go index f58437f..0e69275 100644 --- a/sonic/driver.go +++ b/sonic/driver.go @@ -33,8 +33,10 @@ type driver struct { Port int Password string + lastUse time.Time lastPing time.Time - channel Channel + + channel Channel *connection } @@ -67,6 +69,7 @@ func (c *driver) Connect() error { c.connection, err = newConnection(c) c.lastPing = time.Now() + c.lastUse = time.Now() return err } @@ -102,10 +105,16 @@ func (c *driver) Ping() error { } // softPing pings the connection if it wasn't pinged for a while. -func (c *driver) softPing(threshold time.Duration) (ok bool) { - if threshold <= 0 || time.Since(c.lastPing) < threshold { - return true +func (c *driver) checkConn(pingThreshold, maxLifetime time.Duration) (ok bool) { + if maxLifetime > 0 && time.Since(c.lastUse) > maxLifetime { + return false + } + + c.lastUse = time.Now() + + if pingThreshold > 0 && time.Since(c.lastPing) > pingThreshold { + return c.Ping() == nil } - return c.Ping() == nil + return true } diff --git a/sonic/options.go b/sonic/options.go index d16b3e6..bdb392d 100644 --- a/sonic/options.go +++ b/sonic/options.go @@ -3,13 +3,14 @@ package sonic import "time" type controllerOptions struct { - Host string - Port int - Password string - PoolMinConnections int - PoolMaxConnections int - PoolPingThreshold time.Duration - Channel Channel + Host string + Port int + Password string + PoolMinConnections int + PoolMaxConnections int + PoolPingThreshold time.Duration + PoolMaxIdleLifetime time.Duration + Channel Channel } func (o controllerOptions) With(optionSetters ...OptionSetter) controllerOptions { @@ -32,9 +33,10 @@ func defaultOptions( Password: password, Channel: channel, - PoolMinConnections: 1, - PoolMaxConnections: 16, - PoolPingThreshold: time.Minute, + PoolMinConnections: 1, + PoolMaxConnections: 16, + PoolMaxIdleLifetime: 5 * time.Minute, + PoolPingThreshold: 0, } } @@ -60,9 +62,18 @@ func OptionPoolMinIdleConnections(val int) OptionSetter { // OptionPoolPingThreshold sets a minimum ping interval to ensure that // the connection is healthy before getting it from the pool. // -// By default is 1m. For disabling set 0. +// By default is 0s. For disabling set it to 0. func OptionPoolPingThreshold(val time.Duration) OptionSetter { return func(o *controllerOptions) { o.PoolPingThreshold = val } } + +// OptionPoolMaxIdleLifetime sets a minimum lifetime of idle connection. +// +// By default is 5m. For disabling set it to 0. +func OptionPoolMaxIdleLifetime(val time.Duration) OptionSetter { + return func(o *controllerOptions) { + o.PoolMaxIdleLifetime = val + } +} diff --git a/sonic/pool.go b/sonic/pool.go index b0006ed..0991407 100644 --- a/sonic/pool.go +++ b/sonic/pool.go @@ -5,10 +5,13 @@ import ( "time" ) +const recursionLimit = 8 + type driversPool struct { - driverFactory *driverFactory - drivers chan *driverWrapper - pingThreshold time.Duration + driverFactory *driverFactory + drivers chan *driverWrapper + pingThreshold time.Duration + maxIdleLifetime time.Duration isPoolClosedMu sync.RWMutex isPoolClosed bool @@ -19,12 +22,14 @@ func newDriversPool( minIdle int, maxIdle int, pingThreshold time.Duration, + maxIdleLifetime time.Duration, ) (*driversPool, error) { dp := &driversPool{ driverFactory: df, drivers: make(chan *driverWrapper, maxIdle), - pingThreshold: pingThreshold, + pingThreshold: pingThreshold, + maxIdleLifetime: maxIdleLifetime, isPoolClosedMu: sync.RWMutex{}, isPoolClosed: false, @@ -55,7 +60,7 @@ func newDriversPool( // put the connection back. func (p *driversPool) put(dw *driverWrapper) { - if dw.closed { + if dw.driver.closed { return } @@ -63,7 +68,7 @@ func (p *driversPool) put(dw *driverWrapper) { defer p.isPoolClosedMu.RUnlock() if p.isPoolClosed { - dw.close() + dw.driver.close() return } @@ -72,8 +77,8 @@ func (p *driversPool) put(dw *driverWrapper) { case p.drivers <- dw: default: // The pool is full. - _ = dw.Quit() - dw.close() + _ = dw.driver.Quit() + dw.driver.close() } } @@ -89,24 +94,36 @@ func (p *driversPool) Get() (*driverWrapper, error) { return nil, ErrClosed } + return p.getNextDriver(0) +} + +func (p *driversPool) getNextDriver(depth int) (*driverWrapper, error) { + if depth > recursionLimit { + return p.newDriver() + } + select { case d := <-p.drivers: - if !d.softPing(p.pingThreshold) { - d.close() + if !d.checkConn(p.pingThreshold, p.maxIdleLifetime) { + d.driver.close() - return p.Get() + return p.getNextDriver(depth + 1) } return d, nil default: - d := p.driverFactory.Build() + return p.newDriver() + } +} - if err := d.Connect(); err != nil { - return nil, err - } +func (p *driversPool) newDriver() (*driverWrapper, error) { + d := p.driverFactory.Build() - return p.wrapDriver(d), nil + if err := d.Connect(); err != nil { + return nil, err } + + return p.wrapDriver(d), nil } // Close and quit all connections in the pool. @@ -117,10 +134,10 @@ func (p *driversPool) Close() { close(p.drivers) for dw := range p.drivers { - if !dw.closed { - _ = dw.Quit() + if !dw.driver.closed { + _ = dw.driver.Quit() - dw.close() + dw.driver.close() } } }