Skip to content

Commit 6ba2001

Browse files
committed
将连接模块与客户端,服务端进一步解耦
1 parent ce243e3 commit 6ba2001

File tree

4 files changed

+68
-128
lines changed

4 files changed

+68
-128
lines changed

ciface/iserver.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,6 @@ type IServer interface {
1212

1313
SetOnConnStart(func(IConn))
1414
SetOnConnStop(func(IConn))
15-
CallOnConnStart(conn IConn)
16-
CallOnConnStop(conn IConn)
15+
GetOnConnStart() func(IConn)
16+
GetOnConnStop() func(IConn)
1717
}

cnet/client.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"github.com/Cai-ki/cinx/ciface"
88
)
99

10-
// 客户端接口实现
1110
type Client struct {
1211
Name string
1312
IPVersion string
@@ -31,7 +30,9 @@ func NewClient(name, ipVersion, ip string, port int) ciface.IClient {
3130
IP: ip,
3231
Port: port,
3332

34-
msgHandler: NewMsgHandle(),
33+
msgHandler: NewMsgHandle(),
34+
onConnStart: func(conn ciface.IConn) {},
35+
onConnStop: func(conn ciface.IConn) {},
3536
}
3637
return c
3738
}
@@ -70,9 +71,8 @@ func (c *Client) Start() {
7071
}
7172
func (c *Client) Stop() {
7273
con := c.Conn()
73-
if con != nil {
74-
con.Stop()
75-
}
74+
con.Stop()
75+
c.exitChan <- struct{}{}
7676
}
7777
func (c *Client) Conn() ciface.IConn {
7878
return c.conn

cnet/conn.go

Lines changed: 28 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,16 @@ import (
1313
)
1414

1515
type Connection struct {
16-
//当前Conn属于哪个Server
17-
TcpServer ciface.IServer //当前conn属于哪个server,在conn初始化的时候添加即可
18-
//当前连接的socket TCP套接字
16+
TcpServer ciface.IServer
17+
1918
Conn *net.TCPConn
20-
//当前连接的ID 也可以称作为SessionID,ID全局唯一
19+
2120
ConnID uint32
2221

23-
// 正常退出
2422
IsClosed atomic.Bool
2523

26-
// 异常退出
2724
IsAborted atomic.Bool
2825

29-
//消息管理MsgId和对应处理方法的消息管理模块
3026
MsgHandler ciface.IMsgHandle
3127

3228
ExitBuffChan chan struct{}
@@ -36,43 +32,47 @@ type Connection struct {
3632

3733
property map[string]interface{}
3834
propertyLock sync.RWMutex
35+
36+
onConnStart func(conn ciface.IConn)
37+
onConnStop func(conn ciface.IConn)
3938
}
4039

41-
// 创建连接的方法
4240
func NewConntion(server ciface.IServer, conn *net.TCPConn, connID uint32, msgHandler ciface.IMsgHandle) *Connection {
4341
c := &Connection{
44-
TcpServer: server, //将隶属的server传递进来
42+
TcpServer: server,
4543
Conn: conn,
4644
ConnID: connID,
4745
MsgHandler: msgHandler,
4846
ExitBuffChan: make(chan struct{}, 1),
49-
msgChan: make(chan []byte), //msgChan初始化
47+
msgChan: make(chan []byte),
5048
msgBuffChan: make(chan []byte, cutils.GlobalObject.MaxMsgChanLen),
51-
property: make(map[string]interface{}), //对链接属性map初始化
49+
property: make(map[string]interface{}),
50+
onConnStart: server.GetOnConnStart(),
51+
onConnStop: server.GetOnConnStop(),
5252
}
5353
c.IsClosed.Store(false)
5454
c.IsAborted.Store(false)
5555

56-
c.TcpServer.GetConnMgr().Add(c)
56+
server.GetConnMgr().Add(c)
5757
return c
5858
}
5959

6060
func NewClientConn(client ciface.IClient, conn *net.TCPConn) ciface.IConn {
6161
c := &Connection{
62-
TcpServer: NewServer(), // TODO: 临时创建一个server,后续需要修改
62+
TcpServer: NewServer(),
6363
Conn: conn,
64-
ConnID: 0, // ignore
64+
ConnID: 0,
6565
MsgHandler: client.GetMsgHandler(),
6666
ExitBuffChan: make(chan struct{}, 1),
67-
msgChan: make(chan []byte), //msgChan初始化
67+
msgChan: make(chan []byte),
6868
msgBuffChan: make(chan []byte, cutils.GlobalObject.MaxMsgChanLen),
69-
property: make(map[string]interface{}), //对链接属性map初始化
69+
property: make(map[string]interface{}),
70+
onConnStart: client.GetOnConnStart(),
71+
onConnStop: client.GetOnConnStop(),
7072
}
7173
c.IsClosed.Store(false)
7274
c.IsAborted.Store(false)
7375

74-
c.TcpServer.SetOnConnStart(client.GetOnConnStart())
75-
c.TcpServer.SetOnConnStop(client.GetOnConnStop())
7676
return c
7777
}
7878

@@ -82,26 +82,23 @@ func (c *Connection) StartReader() {
8282
defer c.Stop()
8383

8484
for {
85-
// 创建拆包解包的对象
85+
8686
dp := NewDataPack()
8787

88-
//读取客户端的Msg head
8988
headData := make([]byte, dp.GetHeadLen())
9089
if _, err := io.ReadFull(c.GetTCPConn(), headData); err != nil {
9190
fmt.Println("read msg head error ", err)
9291
c.IsAborted.Store(true)
9392
break
9493
}
9594

96-
//拆包,得到msgid 和 datalen 放在msg中
9795
msg, err := dp.Unpack(headData)
9896
if err != nil {
9997
fmt.Println("unpack error ", err)
10098
c.IsAborted.Store(true)
10199
break
102100
}
103101

104-
//根据 dataLen 读取 data,放在msg.Data中
105102
var data []byte
106103
if msg.GetDataLen() > 0 {
107104
data = make([]byte, msg.GetDataLen())
@@ -138,7 +135,7 @@ func (c *Connection) StartWriter() {
138135
select {
139136
case data, ok := <-c.msgChan:
140137
if ok {
141-
//有数据要写给客户端
138+
142139
if _, err := c.Conn.Write(data); err != nil {
143140
fmt.Println("Send Data error:, ", err, " Conn Writer exit")
144141
return
@@ -147,10 +144,10 @@ func (c *Connection) StartWriter() {
147144
fmt.Println("msgChan is Closed")
148145
return
149146
}
150-
//针对有缓冲channel需要些的数据处理
147+
151148
case data, ok := <-c.msgBuffChan:
152149
if ok {
153-
//有数据要写给客户端
150+
154151
if _, err := c.Conn.Write(data); err != nil {
155152
fmt.Println("Send Buff Data error:, ", err, " Conn Writer exit")
156153
return
@@ -163,79 +160,66 @@ func (c *Connection) StartWriter() {
163160
}
164161
}
165162

166-
// 启动连接,让当前连接开始工作
167163
func (c *Connection) Start() {
168-
169-
//1 开启用户从客户端读取数据流程的Goroutine
170164
go c.StartReader()
171-
//2 开启用于写回客户端数据流程的Goroutine
172165
go c.StartWriter()
173166

174-
//按照用户传递进来的创建连接时需要处理的业务,执行钩子方法
175-
c.TcpServer.CallOnConnStart(c)
167+
c.onConnStart(c)
176168

177169
for {
178170
select {
179171
case <-c.ExitBuffChan:
180-
// 得到退出消息,不再阻塞
172+
181173
return
182174
}
183175
}
184176
}
185177

186-
// 停止连接,结束当前连接状态
187178
func (c *Connection) Stop() {
188179
if c.IsClosed.Load() {
189180
return
190181
}
191182
c.IsClosed.Store(true)
192183

193-
c.TcpServer.CallOnConnStop(c)
184+
c.onConnStop(c)
194185

195-
// 关闭socket链接
196186
c.Conn.Close()
197187

198188
c.ExitBuffChan <- struct{}{}
199189

200-
// 将链接从连接管理器中删除
201190
c.TcpServer.GetConnMgr().Remove(c)
202191

203192
close(c.ExitBuffChan)
204193
close(c.msgBuffChan)
205194
close(c.msgChan)
206195
}
207196

208-
// 从当前连接获取原始的socket TCPConn
209197
func (c *Connection) GetTCPConn() *net.TCPConn {
210198
return c.Conn
211199
}
212200

213-
// 获取当前连接ID
214201
func (c *Connection) GetConnID() uint32 {
215202
return c.ConnID
216203
}
217204

218-
// 获取远程客户端地址信息
219205
func (c *Connection) RemoteAddr() net.Addr {
220206
return c.Conn.RemoteAddr()
221207
}
222208

223-
// 直接将Message数据发送数据给远程的TCP客户端
224209
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
225210
IsClosed := c.IsClosed.Load()
226211
if IsClosed {
227212
return errors.New("Connection closed when send msg")
228213
}
229-
//将data封包,并且发送
214+
230215
dp := NewDataPack()
231216
msg, err := dp.Pack(NewMsgPackage(msgId, data))
232217
if err != nil {
233218
fmt.Println("Pack error msg id = ", msgId)
234219
return errors.New("Pack error msg ")
235220
}
236221

237-
//写回客户端
238-
c.msgChan <- msg //将之前直接回写给conn.Write的方法 改为 发送给Channel 供Writer读取
222+
c.msgChan <- msg
239223

240224
return nil
241225
}
@@ -245,29 +229,26 @@ func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error {
245229
if IsClosed {
246230
return errors.New("Connection closed when send buff msg")
247231
}
248-
//将data封包,并且发送
232+
249233
dp := NewDataPack()
250234
msg, err := dp.Pack(NewMsgPackage(msgId, data))
251235
if err != nil {
252236
fmt.Println("Pack error msg id = ", msgId)
253237
return errors.New("Pack error msg ")
254238
}
255239

256-
//写回客户端
257240
c.msgBuffChan <- msg
258241

259242
return nil
260243
}
261244

262-
// 设置链接属性
263245
func (c *Connection) SetProperty(key string, value interface{}) {
264246
c.propertyLock.Lock()
265247
defer c.propertyLock.Unlock()
266248

267249
c.property[key] = value
268250
}
269251

270-
// 获取链接属性
271252
func (c *Connection) GetProperty(key string) (interface{}, bool) {
272253
c.propertyLock.RLock()
273254
defer c.propertyLock.RUnlock()
@@ -279,7 +260,6 @@ func (c *Connection) GetProperty(key string) (interface{}, bool) {
279260
}
280261
}
281262

282-
// 移除链接属性
283263
func (c *Connection) RemoveProperty(key string) {
284264
c.propertyLock.Lock()
285265
defer c.propertyLock.Unlock()

0 commit comments

Comments
 (0)