-
Notifications
You must be signed in to change notification settings - Fork 29
/
apns_pool.go
155 lines (133 loc) · 3.09 KB
/
apns_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package apns
import (
"container/list"
"context"
"errors"
"sync"
"time"
)
// connection pool
type ConnPool struct {
ctx context.Context
dialFunc func(ctx context.Context) (*ApnsConn, error)
poolSize int //最小连接池大小
pool *list.List //当前正在工作的client
running bool
cancel context.CancelFunc
mutex sync.RWMutex //全局锁
}
func NewConnPool(poolSize int,
parentCtx context.Context,
dialFunc func(ctx context.Context) (*ApnsConn, error)) (*ConnPool, error) {
ctx, cancel := context.WithCancel(parentCtx)
pool := &ConnPool{
ctx: ctx,
cancel: cancel,
poolSize: poolSize,
dialFunc: dialFunc,
running: true,
pool: list.New()}
err := pool.enhancedPool(poolSize)
if nil != err {
return nil, err
}
return pool, nil
}
func (self *ConnPool) enhancedPool(size int) error {
for i := 0; i < size; i++ {
j := 0
var err error
var conn *ApnsConn
for ; j < 3; j++ {
conn, err = self.dialFunc(self.ctx)
if nil != err || nil == conn {
log.Infof("POOL_FACTORY|CREATE CONNECTION|INIT|FAIL|%s", err)
continue
} else {
break
}
}
if j >= 3 || nil == conn {
return errors.New("POOL_FACTORY|CREATE CONNECTION|INIT|FAIL|%s" + err.Error())
}
self.pool.PushBack(conn)
}
return nil
}
func (self *ConnPool) Get() (*ApnsConn, error) {
if !self.running {
return nil, errors.New("POOL_FACTORY|POOL IS SHUTDOWN")
}
self.mutex.Lock()
defer self.mutex.Unlock()
var conn *ApnsConn
//先从Idealpool中获取如果存在那么就直接使用
for e := self.pool.Back(); nil != e; e = e.Prev() {
conn = e.Value.(*ApnsConn)
//要么是不存活都需要移除
if conn.alive {
self.pool.MoveToFront(e)
break
} else {
//什么都不干
}
}
//找到一个存活的链接
if nil != conn && conn.alive {
return conn, nil
}
e := self.pool.Back()
var err error
if nil != e {
//如果没有找到合格的一个连接,那么主动队列尾部的
conn = e.Value.(*ApnsConn)
//要么是不存活都需要移除
if nil == conn || !conn.alive {
if nil != conn {
//移除队列尾部并主动创建
conn.Destroy()
self.pool.Remove(e)
}
conn, err = self.dialFunc(self.ctx)
if nil == err && nil != conn {
self.pool.PushBack(conn)
}
}
} else {
//当前没有连接。则创建连接
conn, err = self.dialFunc(self.ctx)
if nil == err && nil != conn {
self.pool.PushBack(conn)
}
}
return conn, err
}
// 获取当前
func (self *ConnPool) PoolStat() int {
self.mutex.RLock()
defer self.mutex.RUnlock()
return self.pool.Len()
}
func (self *ConnPool) Shutdown() {
self.mutex.Lock()
defer self.mutex.Unlock()
self.running = false
self.cancel()
for i := 0; i < 3; {
//等待五秒中结束
time.Sleep(5 * time.Second)
if self.pool.Len() <= 0 {
break
}
log.Infof("CONNECTION POOL|CLOSEING|WORK POOL SIZE|:%d", self.pool.Len())
i++
}
var idleconn *ApnsConn
//关闭掉空闲的client
for e := self.pool.Front(); e != nil; e = e.Next() {
idleconn = e.Value.(*ApnsConn)
idleconn.Destroy()
idleconn = nil
}
log.Info("CONNECTION_POOL|SHUTDOWN")
}