Skip to content

Commit cc706e8

Browse files
committed
修改部分内容
1 parent c23cb9e commit cc706e8

File tree

2 files changed

+49
-117
lines changed

2 files changed

+49
-117
lines changed

cnet/conn.go

Lines changed: 49 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"io"
77
"net"
88
"sync"
9+
"sync/atomic"
910

1011
"github.com/Cai-ki/cinx/ciface"
1112
"github.com/Cai-ki/cinx/cutils"
@@ -18,24 +19,23 @@ type Connection struct {
1819
Conn *net.TCPConn
1920
//当前连接的ID 也可以称作为SessionID,ID全局唯一
2021
ConnID uint32
21-
//当前连接的关闭状态
22-
isClosed bool
22+
23+
// 正常退出
24+
IsClosed atomic.Bool
25+
26+
// 异常退出
27+
IsAborted atomic.Bool
2328

2429
//消息管理MsgId和对应处理方法的消息管理模块
2530
MsgHandler ciface.IMsgHandle
2631

27-
//告知该链接已经退出/停止的channel
28-
ExitBuffChan chan bool
29-
//无缓冲管道,用于读、写两个goroutine之间的消息通信
30-
msgChan chan []byte
31-
//有缓冲管道,用于读、写两个goroutine之间的消息通信
32+
ExitBuffChan chan struct{}
33+
34+
msgChan chan []byte
3235
msgBuffChan chan []byte
33-
// ================================
34-
//链接属性
35-
property map[string]interface{}
36-
//保护链接属性修改的锁
36+
37+
property map[string]interface{}
3738
propertyLock sync.RWMutex
38-
// ================================
3939
}
4040

4141
// 创建连接的方法
@@ -44,15 +44,16 @@ func NewConntion(server ciface.IServer, conn *net.TCPConn, connID uint32, msgHan
4444
TcpServer: server, //将隶属的server传递进来
4545
Conn: conn,
4646
ConnID: connID,
47-
isClosed: false,
4847
MsgHandler: msgHandler,
49-
ExitBuffChan: make(chan bool, 1),
48+
ExitBuffChan: make(chan struct{}, 1),
5049
msgChan: make(chan []byte), //msgChan初始化
5150
msgBuffChan: make(chan []byte, cutils.GlobalObject.MaxMsgChanLen),
5251
property: make(map[string]interface{}), //对链接属性map初始化
5352
}
54-
//将新创建的Conn添加到链接管理中
55-
c.TcpServer.GetConnMgr().Add(c) //将当前新创建的连接添加到ConnManager中
53+
c.IsClosed.Store(false)
54+
c.IsAborted.Store(false)
55+
56+
c.TcpServer.GetConnMgr().Add(c)
5657
return c
5758
}
5859

@@ -61,13 +62,17 @@ func NewClientConn(client ciface.IClient, conn *net.TCPConn) ciface.IConn {
6162
TcpServer: NewServer(), // TODO: 临时创建一个server,后续需要修改
6263
Conn: conn,
6364
ConnID: 0, // ignore
64-
isClosed: false,
6565
MsgHandler: client.GetMsgHandler(),
66-
ExitBuffChan: make(chan bool, 1),
66+
ExitBuffChan: make(chan struct{}, 1),
6767
msgChan: make(chan []byte), //msgChan初始化
6868
msgBuffChan: make(chan []byte, cutils.GlobalObject.MaxMsgChanLen),
6969
property: make(map[string]interface{}), //对链接属性map初始化
7070
}
71+
c.IsClosed.Store(false)
72+
c.IsAborted.Store(false)
73+
74+
c.TcpServer.SetOnConnStart(client.GetOnConnStart())
75+
c.TcpServer.SetOnConnStop(client.GetOnConnStop())
7176
return c
7277
}
7378

@@ -84,13 +89,15 @@ func (c *Connection) StartReader() {
8489
headData := make([]byte, dp.GetHeadLen())
8590
if _, err := io.ReadFull(c.GetTCPConn(), headData); err != nil {
8691
fmt.Println("read msg head error ", err)
92+
c.IsAborted.Store(true)
8793
break
8894
}
8995

9096
//拆包,得到msgid 和 datalen 放在msg中
9197
msg, err := dp.Unpack(headData)
9298
if err != nil {
9399
fmt.Println("unpack error ", err)
100+
c.IsAborted.Store(true)
94101
break
95102
}
96103

@@ -100,23 +107,20 @@ func (c *Connection) StartReader() {
100107
data = make([]byte, msg.GetDataLen())
101108
if _, err := io.ReadFull(c.GetTCPConn(), data); err != nil {
102109
fmt.Println("read msg data error ", err)
103-
// continue
110+
c.IsAborted.Store(true)
104111
break
105112
}
106113
}
107114
msg.SetData(data)
108115

109-
//得到当前客户端请求的Request数据
110116
req := Request{
111117
conn: c,
112-
msg: msg, //将之前的buf 改成 msg
118+
msg: msg,
113119
}
114120

115121
if cutils.GlobalObject.WorkerPoolSize > 0 {
116-
//已经启动工作池机制,将消息交给Worker处理
117122
c.MsgHandler.SendMsgToTaskQueue(&req)
118123
} else {
119-
//从绑定好的消息和对应的处理方法中执行对应的Handle方法
120124
go c.MsgHandler.DoMsgHandler(&req)
121125
}
122126
}
@@ -132,10 +136,15 @@ func (c *Connection) StartWriter() {
132136

133137
for {
134138
select {
135-
case data := <-c.msgChan:
136-
//有数据要写给客户端
137-
if _, err := c.Conn.Write(data); err != nil {
138-
fmt.Println("Send Data error:, ", err, " Conn Writer exit")
139+
case data, ok := <-c.msgChan:
140+
if ok {
141+
//有数据要写给客户端
142+
if _, err := c.Conn.Write(data); err != nil {
143+
fmt.Println("Send Data error:, ", err, " Conn Writer exit")
144+
return
145+
}
146+
} else {
147+
fmt.Println("msgChan is Closed")
139148
return
140149
}
141150
//针对有缓冲channel需要些的数据处理
@@ -148,11 +157,8 @@ func (c *Connection) StartWriter() {
148157
}
149158
} else {
150159
fmt.Println("msgBuffChan is Closed")
151-
break
160+
return
152161
}
153-
case <-c.ExitBuffChan:
154-
//conn已经关闭
155-
return
156162
}
157163
}
158164
}
@@ -171,37 +177,32 @@ func (c *Connection) Start() {
171177
for {
172178
select {
173179
case <-c.ExitBuffChan:
174-
//得到退出消息,不再阻塞
180+
// 得到退出消息,不再阻塞
175181
return
176182
}
177183
}
178184
}
179185

180-
// 停止连接,结束当前连接状态M
186+
// 停止连接,结束当前连接状态
181187
func (c *Connection) Stop() {
182-
//1. 如果当前链接已经关闭
183-
if c.isClosed == true {
188+
if c.IsClosed.Load() {
184189
return
185190
}
186-
c.isClosed = true
187-
188-
//TODO Connection Stop() 如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用
191+
c.IsClosed.Store(true)
189192

190-
//如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用
191193
c.TcpServer.CallOnConnStop(c)
192194

193195
// 关闭socket链接
194196
c.Conn.Close()
195197

196-
//通知从缓冲队列读数据的业务,该链接已经关闭
197-
c.ExitBuffChan <- true
198+
c.ExitBuffChan <- struct{}{}
198199

199-
//将链接从连接管理器中删除
200-
c.TcpServer.GetConnMgr().Remove(c) //删除conn从ConnManager中
200+
// 将链接从连接管理器中删除
201+
c.TcpServer.GetConnMgr().Remove(c)
201202

202-
//关闭该链接全部管道
203203
close(c.ExitBuffChan)
204204
close(c.msgBuffChan)
205+
close(c.msgChan)
205206
}
206207

207208
// 从当前连接获取原始的socket TCPConn
@@ -221,7 +222,8 @@ func (c *Connection) RemoteAddr() net.Addr {
221222

222223
// 直接将Message数据发送数据给远程的TCP客户端
223224
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
224-
if c.isClosed == true {
225+
IsClosed := c.IsClosed.Load()
226+
if IsClosed {
225227
return errors.New("Connection closed when send msg")
226228
}
227229
//将data封包,并且发送
@@ -239,7 +241,8 @@ func (c *Connection) SendMsg(msgId uint32, data []byte) error {
239241
}
240242

241243
func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error {
242-
if c.isClosed == true {
244+
IsClosed := c.IsClosed.Load()
245+
if IsClosed {
243246
return errors.New("Connection closed when send buff msg")
244247
}
245248
//将data封包,并且发送

examples/客户端/Client/main.go

Lines changed: 0 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -31,75 +31,4 @@ func main() {
3131
client.Conn().SendMsg(0, []byte("hello"))
3232
}
3333

34-
// conn, err := net.Dial("tcp4", "127.0.0.1:7777")
35-
// if err != nil {
36-
// fmt.Println("client start err, exit!")
37-
// return
38-
// }
39-
40-
// //发封包message消息
41-
// dp := cnet.NewDataPack()
42-
// call := func(id uint32, data string) {
43-
// fmt.Println("==> Call Msg: ID=", id, ", data=", data)
44-
45-
// msg, _ := dp.Pack(cnet.NewMsgPackage(id, []byte(data)))
46-
// _, err := conn.Write(msg)
47-
// if err != nil {
48-
// fmt.Println("write error err ", err)
49-
// return
50-
// }
51-
52-
// }
53-
54-
// get := func() (msg *cnet.Message) {
55-
// msg = &cnet.Message{}
56-
// msg.Id = 404
57-
// //先读出流中的head部分
58-
// headData := make([]byte, dp.GetHeadLen())
59-
// _, err = io.ReadFull(conn, headData) //ReadFull 会把msg填充满为止
60-
// if err != nil {
61-
// fmt.Println("read head error")
62-
// return
63-
// }
64-
// //将headData字节流 拆包到msg中
65-
// msgHead, err := dp.Unpack(headData)
66-
// if err != nil {
67-
// fmt.Println("server unpack err:", err)
68-
// return
69-
// }
70-
71-
// if msgHead.GetDataLen() > 0 {
72-
// //msg 是有data数据的,需要再次读取data数据
73-
// msg = msgHead.(*cnet.Message)
74-
// msg.Data = make([]byte, msg.GetDataLen())
75-
76-
// //根据dataLen从io中读取字节流
77-
// _, err := io.ReadFull(conn, msg.Data)
78-
// if err != nil {
79-
// fmt.Println("server unpack data err:", err)
80-
// return
81-
// }
82-
83-
// fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data))
84-
// }
85-
86-
// return
87-
// }
88-
89-
// call(0, "hello")
90-
91-
// for {
92-
// msg := get()
93-
// if msg.Id == 0 {
94-
// call(0, "get: "+string(msg.Data))
95-
// } else if msg.Id == 1001 {
96-
// // call(1002, "get: "+string(msg.Data))
97-
// } else if msg.Id == 1002 {
98-
// call(1001, "get: "+string(msg.Data))
99-
// } else {
100-
// fmt.Println("error")
101-
// break
102-
// }
103-
// time.Sleep(1 * time.Second)
104-
// }
10534
}

0 commit comments

Comments
 (0)