Skip to content

Commit 5c26e1c

Browse files
committed
完成连接属性功能
1 parent cbc3780 commit 5c26e1c

File tree

5 files changed

+125
-89
lines changed

5 files changed

+125
-89
lines changed

ciface/iconnection.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,10 @@ type IConnection interface {
1818
SendMsg(msgId uint32, data []byte) error
1919
//直接将Message数据发送给远程的TCP客户端(有缓冲)
2020
SendBuffMsg(msgId uint32, data []byte) error //添加带缓冲发送消息接口
21+
//设置链接属性
22+
SetProperty(key string, value interface{})
23+
//获取链接属性
24+
GetProperty(key string) (interface{}, error)
25+
//移除链接属性
26+
RemoveProperty(key string)
2127
}

cnet/connection.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"io"
77
"net"
8+
"sync"
89

910
"github.com/Cai-ki/cinx/ciface"
1011
"github.com/Cai-ki/cinx/cutils"
@@ -29,6 +30,12 @@ type Connection struct {
2930
msgChan chan []byte
3031
//有缓冲管道,用于读、写两个goroutine之间的消息通信
3132
msgBuffChan chan []byte
33+
// ================================
34+
//链接属性
35+
property map[string]interface{}
36+
//保护链接属性修改的锁
37+
propertyLock sync.RWMutex
38+
// ================================
3239
}
3340

3441
// 创建连接的方法
@@ -42,6 +49,7 @@ func NewConntion(server ciface.IServer, conn *net.TCPConn, connID uint32, msgHan
4249
ExitBuffChan: make(chan bool, 1),
4350
msgChan: make(chan []byte), //msgChan初始化
4451
msgBuffChan: make(chan []byte, cutils.GlobalObject.MaxMsgChanLen),
52+
property: make(map[string]interface{}), //对链接属性map初始化
4553
}
4654
//将新创建的Conn添加到链接管理中
4755
c.TcpServer.GetConnMgr().Add(c) //将当前新创建的连接添加到ConnManager中
@@ -231,3 +239,31 @@ func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error {
231239

232240
return nil
233241
}
242+
243+
// 设置链接属性
244+
func (c *Connection) SetProperty(key string, value interface{}) {
245+
c.propertyLock.Lock()
246+
defer c.propertyLock.Unlock()
247+
248+
c.property[key] = value
249+
}
250+
251+
// 获取链接属性
252+
func (c *Connection) GetProperty(key string) (interface{}, error) {
253+
c.propertyLock.RLock()
254+
defer c.propertyLock.RUnlock()
255+
256+
if value, ok := c.property[key]; ok {
257+
return value, nil
258+
} else {
259+
return nil, errors.New("no property found")
260+
}
261+
}
262+
263+
// 移除链接属性
264+
func (c *Connection) RemoveProperty(key string) {
265+
c.propertyLock.Lock()
266+
defer c.propertyLock.Unlock()
267+
268+
delete(c.property, key)
269+
}

cnet/msghandler.go

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,21 @@ import (
99
)
1010

1111
type MsgHandle struct {
12-
Apis map[uint32]ciface.IRouter //存放每个MsgId 所对应的处理方法的map属性
13-
WorkerPoolSize uint32 //业务工作Worker池的数量
14-
TaskQueue []chan ciface.IRequest //Worker负责取任务的消息队列
12+
Apis map[uint32]ciface.IRouter // 映射 MsgID 对应的处理方法
13+
WorkerPoolSize uint32 // worker pool 大小
14+
TaskQueues []chan ciface.IRequest // worker 获取任务的消息队列
1515
}
1616

1717
func NewMsgHandle() *MsgHandle {
1818
return &MsgHandle{
1919
Apis: make(map[uint32]ciface.IRouter),
2020
WorkerPoolSize: cutils.GlobalObject.WorkerPoolSize,
21-
//一个worker对应一个queue
22-
TaskQueue: make([]chan ciface.IRequest, cutils.GlobalObject.WorkerPoolSize),
21+
//一个 worker 对应一个 task 队列
22+
TaskQueues: make([]chan ciface.IRequest, cutils.GlobalObject.WorkerPoolSize),
2323
}
2424
}
2525

26-
// 马上以非阻塞方式处理消息
26+
// 非阻塞方式处理消息
2727
func (mh *MsgHandle) DoMsgHandler(request ciface.IRequest) {
2828
handler, ok := mh.Apis[request.GetMsgID()]
2929
if !ok {
@@ -39,48 +39,47 @@ func (mh *MsgHandle) DoMsgHandler(request ciface.IRequest) {
3939

4040
// 为消息添加具体的处理逻辑
4141
func (mh *MsgHandle) AddRouter(msgId uint32, router ciface.IRouter) {
42-
//1 判断当前msg绑定的API处理方法是否已经存在
42+
// 判断当前 msg 绑定的 API 处理方法是否已经存在
4343
if _, ok := mh.Apis[msgId]; ok {
44-
panic("repeated api , msgId = " + strconv.Itoa(int(msgId)))
44+
panic("[Cinx] Repeated api , msgId = " + strconv.Itoa(int(msgId)))
4545
}
46-
//2 添加msg与api的绑定关系
46+
// 添加 msg 与 API 的绑定关系
4747
mh.Apis[msgId] = router
48-
fmt.Println("Add api msgId = ", msgId)
48+
fmt.Println("[Cinx] Add api msgId = ", msgId)
4949
}
5050

51-
// 启动一个Worker工作流程
51+
// worker 工作流程
5252
func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ciface.IRequest) {
53-
fmt.Println("Worker ID = ", workerID, " is started.")
54-
//不断的等待队列中的消息
55-
for {
56-
select {
57-
//有消息则取出队列的Request,并执行绑定的业务方法
58-
case request := <-taskQueue:
59-
mh.DoMsgHandler(request)
60-
}
53+
// 监听队列中的消息
54+
for request := range taskQueue {
55+
mh.DoMsgHandler(request)
6156
}
57+
// for {
58+
// select {
59+
// case request := <-taskQueue:
60+
// mh.DoMsgHandler(request)
61+
// }
62+
// }
6263
}
6364

64-
// 启动worker工作池
65+
// 启动 worker 工作池
6566
func (mh *MsgHandle) StartWorkerPool() {
66-
//遍历需要启动worker的数量,依此启动
6767
for i := 0; i < int(mh.WorkerPoolSize); i++ {
68-
//一个worker被启动
69-
//给当前worker对应的任务队列开辟空间
70-
mh.TaskQueue[i] = make(chan ciface.IRequest, cutils.GlobalObject.MaxWorkerTaskLen)
71-
//启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来
72-
go mh.StartOneWorker(i, mh.TaskQueue[i])
68+
// 初始化当前 worker 消息队列管道
69+
mh.TaskQueues[i] = make(chan ciface.IRequest, cutils.GlobalObject.MaxWorkerTaskLen)
70+
71+
fmt.Println("[Cinx] Worker ID = ", i, " is started.")
72+
// 创建一个 worker 协程
73+
go mh.StartOneWorker(i, mh.TaskQueues[i])
7374
}
7475
}
7576

76-
// 将消息交给TaskQueue,由worker进行处理
77+
// 分发消息给消息队列处理
7778
func (mh *MsgHandle) SendMsgToTaskQueue(request ciface.IRequest) {
78-
//根据ConnID来分配当前的连接应该由哪个worker负责处理
79-
//轮询的平均分配法则
80-
81-
//得到需要处理此条连接的workerID
79+
// 朴素的任务分配策略
8280
workerID := request.GetConnection().GetConnID() % mh.WorkerPoolSize
83-
fmt.Println("Add ConnID=", request.GetConnection().GetConnID(), " request msgID=", request.GetMsgID(), "to workerID=", workerID)
84-
//将请求消息发送给任务队列
85-
mh.TaskQueue[workerID] <- request
81+
fmt.Println("[Cinx] Add ConnID=", request.GetConnection().GetConnID(), " request msgID=", request.GetMsgID(), "to workerID=", workerID)
82+
83+
// 将消息发送给对应的 worker 的消息队列
84+
mh.TaskQueues[workerID] <- request
8685
}

cnet/server.go

Lines changed: 49 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package cnet
33
import (
44
"fmt"
55
"net"
6-
"time"
76

87
"github.com/Cai-ki/cinx/ciface"
98
"github.com/Cai-ki/cinx/cutils"
@@ -14,151 +13,147 @@ type Server struct {
1413
//服务器的名称
1514
Name string
1615
//tcp4 or other
17-
IPVersion string
18-
//服务绑定的IP地址
16+
TcpVersion string
17+
//服务绑定的 IP 地址
1918
IP string
2019
//服务绑定的端口
2120
Port int
22-
//当前Server的消息管理模块,用来绑定MsgId和对应的处理方法
21+
//当前 server 的消息管理模块,用来绑定 MsgId 和对应的处理方法
2322
msgHandler ciface.IMsgHandle
24-
//当前Server的链接管理器
23+
//当前 server 的链接管理器
2524
ConnMgr ciface.IConnManager
2625

27-
//该Server的连接创建时Hook函数
2826
OnConnStart func(conn ciface.IConnection)
29-
//该Server的连接断开时的Hook函数
30-
OnConnStop func(conn ciface.IConnection)
27+
OnConnStop func(conn ciface.IConnection)
3128
}
3229

33-
//============== 实现 ciface.IServer 里的全部接口方法 ========
34-
35-
// 开启网络服务
30+
// 开启 server 服务(无阻塞)
3631
func (s *Server) Start() {
37-
fmt.Printf("[START] Server listenner at IP: %s, Port %d, is starting\n", s.IP, s.Port)
32+
// 输出 server 信息
33+
fmt.Println("[Cinx] Server Name:", s.Name, "listenner at IP:", s.IP, " Port:", s.Port)
3834
fmt.Printf("[Cinx] Version: %s, MaxConn: %d, MaxPacketSize: %d\n",
3935
cutils.GlobalObject.Version,
4036
cutils.GlobalObject.MaxConn,
4137
cutils.GlobalObject.MaxPacketSize)
42-
//开启一个go去做服务端Linster业务
38+
39+
// 创建协程不间断处理链接
4340
go func() {
44-
//0 启动worker工作池机制
41+
//0 开启工作池
4542
s.msgHandler.StartWorkerPool()
4643

47-
//1 获取一个TCP的Addr
48-
addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))
44+
//1 封装 tcp 地址
45+
addr, err := net.ResolveTCPAddr(s.TcpVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))
4946
if err != nil {
50-
fmt.Println("resolve tcp addr err: ", err)
47+
fmt.Println("[Cinx] resolve tcp address err: ", err)
5148
return
5249
}
5350

54-
//2 监听服务器地址
55-
listenner, err := net.ListenTCP(s.IPVersion, addr)
51+
//2 创建监听 socket
52+
listenner, err := net.ListenTCP(s.TcpVersion, addr)
5653
if err != nil {
57-
fmt.Println("listen", s.IPVersion, "err", err)
54+
fmt.Println("[Cinx] listen", s.TcpVersion, "err", err)
5855
return
5956
}
6057

61-
//已经监听成功
62-
fmt.Println("start Cinx server ", s.Name, " succ, now listenning...")
58+
//输出监听成功信息
59+
fmt.Println("[Cinx] start success, now listenning...")
6360

64-
//TODO server.go 应该有一个自动生成ID的方法
65-
var cid uint32
66-
cid = 0
61+
// 简单实现一个自增的连接 ID
62+
var cid uint32 = 0
6763

68-
//3 启动server网络连接业务
64+
//3 持续监听客户端连接
6965
for {
7066
//3.1 阻塞等待客户端建立连接请求
7167
conn, err := listenner.AcceptTCP()
7268
if err != nil {
73-
fmt.Println("Accept err ", err)
69+
fmt.Println("[Cinx] Accept err ", err)
7470
continue
7571
}
7672

77-
//3.2 设置服务器最大连接控制,如果超过最大连接,那么则关闭此新的连接
73+
//3.2 判断当前服务器的连接数是否已经超过最大连接数
7874
if s.ConnMgr.Len() >= cutils.GlobalObject.MaxConn {
7975
conn.Close()
8076
continue
8177
}
82-
//3.3 处理该新连接请求的 业务 方法, 此时应该有 handler 和 conn是绑定的
78+
79+
//3.3 初始化连接模块
8380
dealConn := NewConntion(s, conn, cid, s.msgHandler)
8481
cid++
8582

86-
//3.4 启动当前链接的处理业务
83+
//3.4 启动协程处理当前连接的业务
8784
go dealConn.Start()
8885
}
8986
}()
9087
}
9188

89+
// 停止 server 服务
9290
func (s *Server) Stop() {
93-
fmt.Println("[STOP] Cinx server , name ", s.Name)
91+
fmt.Println("[Cinx] stop server , name ", s.Name)
9492

95-
//将其他需要清理的连接信息或者其他信息 也要一并停止或者清理
93+
// 通过 ConnManager 清除并停止所有连接
9694
s.ConnMgr.ClearConn()
9795
}
9896

97+
// 开启 server 服务(阻塞)
9998
func (s *Server) Serve() {
10099
s.Start()
101100

102101
//TODO Server.Serve() 是否在启动服务的时候 还要处理其他的事情呢 可以在这里添加
103102

104-
//阻塞,否则主Go退出, listenner的go将会退出
105-
for {
106-
time.Sleep(10 * time.Second)
107-
}
103+
// 阻塞
104+
select {}
108105
}
109106

110-
// 路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用
107+
// 为特定消息注册处理函数
111108
func (s *Server) AddRouter(msgId uint32, router ciface.IRouter) {
112109
s.msgHandler.AddRouter(msgId, router)
113110

114-
fmt.Println("Add Router succ! ")
111+
fmt.Println("[Cinx] Add Router success! ")
115112
}
116113

117114
// 得到链接管理
118115
func (s *Server) GetConnMgr() ciface.IConnManager {
119116
return s.ConnMgr
120117
}
121118

122-
// 设置该Server的连接创建时Hook函数
119+
// 设置 server 的连接创建时 hook 函数
123120
func (s *Server) SetOnConnStart(hookFunc func(ciface.IConnection)) {
124121
s.OnConnStart = hookFunc
125122
}
126123

127-
// 设置该Server的连接断开时的Hook函数
124+
// 设置 server 的连接断开时 hook 函数
128125
func (s *Server) SetOnConnStop(hookFunc func(ciface.IConnection)) {
129126
s.OnConnStop = hookFunc
130127
}
131128

132-
// 调用连接OnConnStart Hook函数
129+
// 调用 hook 函数
133130
func (s *Server) CallOnConnStart(conn ciface.IConnection) {
134131
if s.OnConnStart != nil {
135-
fmt.Println("---> CallOnConnStart....")
132+
fmt.Println("[Cinx] CallOnConnStart....")
136133
s.OnConnStart(conn)
137134
}
138135
}
139136

140-
// 调用连接OnConnStop Hook函数
137+
// 调用 hook 函数
141138
func (s *Server) CallOnConnStop(conn ciface.IConnection) {
142139
if s.OnConnStop != nil {
143-
fmt.Println("---> CallOnConnStop....")
140+
fmt.Println("[Cinx] CallOnConnStop....")
144141
s.OnConnStop(conn)
145142
}
146143
}
147144

148-
/*
149-
创建一个服务器句柄
150-
*/
145+
// 创建 server 实例
151146
func NewServer() ciface.IServer {
152-
//先初始化全局配置文件
147+
//初始化全局配置文件
153148
cutils.GlobalObject.Reload()
154149

155150
s := &Server{
156-
Name: cutils.GlobalObject.Name, //从全局参数获取
157-
IPVersion: "tcp4",
158-
IP: cutils.GlobalObject.Host, //从全局参数获取
159-
Port: cutils.GlobalObject.TcpPort, //从全局参数获取
160-
msgHandler: NewMsgHandle(), //msgHandler 初始化
161-
ConnMgr: NewConnManager(), //创建ConnManager
151+
Name: cutils.GlobalObject.Name,
152+
TcpVersion: "tcp4",
153+
IP: cutils.GlobalObject.Host,
154+
Port: cutils.GlobalObject.TcpPort,
155+
msgHandler: NewMsgHandle(),
156+
ConnMgr: NewConnManager(),
162157
}
163158
return s
164159
}

cutils/globalobj.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func init() {
5454
//初始化GlobalObject变量,设置一些默认值
5555
GlobalObject = &GlobalObj{
5656
Name: "CinxServerApp",
57-
Version: "V0.4",
57+
Version: "V0.10",
5858
TcpPort: 7777,
5959
Host: "0.0.0.0",
6060
MaxConn: 12000,

0 commit comments

Comments
 (0)