Skip to content

Commit

Permalink
Add MaxConnLifetime
Browse files Browse the repository at this point in the history
  • Loading branch information
Kryvchun committed Jul 15, 2022
1 parent 2d7bb13 commit 97836c3
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 35 deletions.
1 change: 1 addition & 0 deletions sonic/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func newDriversHolder(
opts.PoolMinConnections,
opts.PoolMaxConnections,
opts.PoolPingThreshold,
opts.PoolMaxIdleLifetime,
)
if err != nil {
return nil, err
Expand Down
19 changes: 14 additions & 5 deletions sonic/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ type driver struct {
Port int
Password string

lastUse time.Time
lastPing time.Time
channel Channel

channel Channel
*connection
}

Expand Down Expand Up @@ -67,6 +69,7 @@ func (c *driver) Connect() error {

c.connection, err = newConnection(c)
c.lastPing = time.Now()
c.lastUse = time.Now()

return err
}
Expand Down Expand Up @@ -102,10 +105,16 @@ func (c *driver) Ping() error {
}

// softPing pings the connection if it wasn't pinged for a while.
func (c *driver) softPing(threshold time.Duration) (ok bool) {
if threshold <= 0 || time.Since(c.lastPing) < threshold {
return true
func (c *driver) checkConn(pingThreshold, maxLifetime time.Duration) (ok bool) {
if maxLifetime > 0 && time.Since(c.lastUse) > maxLifetime {
return false
}

c.lastUse = time.Now()

if pingThreshold > 0 && time.Since(c.lastPing) > pingThreshold {
return c.Ping() == nil
}

return c.Ping() == nil
return true
}
33 changes: 22 additions & 11 deletions sonic/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package sonic
import "time"

type controllerOptions struct {
Host string
Port int
Password string
PoolMinConnections int
PoolMaxConnections int
PoolPingThreshold time.Duration
Channel Channel
Host string
Port int
Password string
PoolMinConnections int
PoolMaxConnections int
PoolPingThreshold time.Duration
PoolMaxIdleLifetime time.Duration
Channel Channel
}

func (o controllerOptions) With(optionSetters ...OptionSetter) controllerOptions {
Expand All @@ -32,9 +33,10 @@ func defaultOptions(
Password: password,
Channel: channel,

PoolMinConnections: 1,
PoolMaxConnections: 16,
PoolPingThreshold: time.Minute,
PoolMinConnections: 1,
PoolMaxConnections: 16,
PoolMaxIdleLifetime: 5 * time.Minute,
PoolPingThreshold: 0,
}
}

Expand All @@ -60,9 +62,18 @@ func OptionPoolMinIdleConnections(val int) OptionSetter {
// OptionPoolPingThreshold sets a minimum ping interval to ensure that
// the connection is healthy before getting it from the pool.
//
// By default is 1m. For disabling set 0.
// By default is 0s. For disabling set it to 0.
func OptionPoolPingThreshold(val time.Duration) OptionSetter {
return func(o *controllerOptions) {
o.PoolPingThreshold = val
}
}

// OptionPoolMaxIdleLifetime sets a minimum lifetime of idle connection.
//
// By default is 5m. For disabling set it to 0.
func OptionPoolMaxIdleLifetime(val time.Duration) OptionSetter {
return func(o *controllerOptions) {
o.PoolMaxIdleLifetime = val
}
}
55 changes: 36 additions & 19 deletions sonic/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"time"
)

const recursionLimit = 8

type driversPool struct {
driverFactory *driverFactory
drivers chan *driverWrapper
pingThreshold time.Duration
driverFactory *driverFactory
drivers chan *driverWrapper
pingThreshold time.Duration
maxIdleLifetime time.Duration

isPoolClosedMu sync.RWMutex
isPoolClosed bool
Expand All @@ -19,12 +22,14 @@ func newDriversPool(
minIdle int,
maxIdle int,
pingThreshold time.Duration,
maxIdleLifetime time.Duration,
) (*driversPool, error) {
dp := &driversPool{
driverFactory: df,
drivers: make(chan *driverWrapper, maxIdle),

pingThreshold: pingThreshold,
pingThreshold: pingThreshold,
maxIdleLifetime: maxIdleLifetime,

isPoolClosedMu: sync.RWMutex{},
isPoolClosed: false,
Expand Down Expand Up @@ -55,15 +60,15 @@ func newDriversPool(

// put the connection back.
func (p *driversPool) put(dw *driverWrapper) {
if dw.closed {
if dw.driver.closed {
return
}

p.isPoolClosedMu.RLock()
defer p.isPoolClosedMu.RUnlock()

if p.isPoolClosed {
dw.close()
dw.driver.close()

return
}
Expand All @@ -72,8 +77,8 @@ func (p *driversPool) put(dw *driverWrapper) {
case p.drivers <- dw:
default:
// The pool is full.
_ = dw.Quit()
dw.close()
_ = dw.driver.Quit()
dw.driver.close()
}
}

Expand All @@ -89,24 +94,36 @@ func (p *driversPool) Get() (*driverWrapper, error) {
return nil, ErrClosed
}

return p.getNextDriver(0)
}

func (p *driversPool) getNextDriver(depth int) (*driverWrapper, error) {
if depth > recursionLimit {
return p.newDriver()
}

select {
case d := <-p.drivers:
if !d.softPing(p.pingThreshold) {
d.close()
if !d.checkConn(p.pingThreshold, p.maxIdleLifetime) {
d.driver.close()

return p.Get()
return p.getNextDriver(depth + 1)
}

return d, nil
default:
d := p.driverFactory.Build()
return p.newDriver()
}
}

if err := d.Connect(); err != nil {
return nil, err
}
func (p *driversPool) newDriver() (*driverWrapper, error) {
d := p.driverFactory.Build()

return p.wrapDriver(d), nil
if err := d.Connect(); err != nil {
return nil, err
}

return p.wrapDriver(d), nil
}

// Close and quit all connections in the pool.
Expand All @@ -117,10 +134,10 @@ func (p *driversPool) Close() {

close(p.drivers)
for dw := range p.drivers {
if !dw.closed {
_ = dw.Quit()
if !dw.driver.closed {
_ = dw.driver.Quit()

dw.close()
dw.driver.close()
}
}
}
Expand Down

0 comments on commit 97836c3

Please sign in to comment.