Skip to content

Commit 753d50a

Browse files
committed
完成多路由
1 parent 6886f5a commit 753d50a

File tree

13 files changed

+270
-114
lines changed

13 files changed

+270
-114
lines changed

ciface/imsghandler.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package ciface
2+
3+
/*
4+
消息管理抽象层
5+
*/
6+
type IMsgHandle interface {
7+
DoMsgHandler(request IRequest) //马上以非阻塞方式处理消息
8+
AddRouter(msgId uint32, router IRouter) //为消息添加具体的处理逻辑
9+
}

ciface/iserver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@ type IServer interface {
99
//开启业务服务方法
1010
Serve()
1111
//路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用
12-
AddRouter(router IRouter)
12+
AddRouter(msgId uint32, router IRouter)
1313
}

cnet/connection.go

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,20 @@ type Connection struct {
1717
//当前连接的关闭状态
1818
isClosed bool
1919

20-
//该连接的处理方法router
21-
Router ciface.IRouter
20+
//消息管理MsgId和对应处理方法的消息管理模块
21+
MsgHandler ciface.IMsgHandle
2222

2323
//告知该链接已经退出/停止的channel
2424
ExitBuffChan chan bool
2525
}
2626

2727
// 创建连接的方法
28-
func NewConntion(conn *net.TCPConn, connID uint32, router ciface.IRouter) *Connection {
28+
func NewConntion(conn *net.TCPConn, connID uint32, msgHandler ciface.IMsgHandle) *Connection {
2929
c := &Connection{
3030
Conn: conn,
3131
ConnID: connID,
3232
isClosed: false,
33-
Router: router,
33+
MsgHandler: msgHandler,
3434
ExitBuffChan: make(chan bool, 1),
3535
}
3636

@@ -78,13 +78,8 @@ func (c *Connection) StartReader() {
7878
conn: c,
7979
msg: msg, //将之前的buf 改成 msg
8080
}
81-
//从路由Routers 中找到注册绑定Conn的对应Handle
82-
go func(request ciface.IRequest) {
83-
//执行注册的路由方法
84-
c.Router.PreHandle(request)
85-
c.Router.Handle(request)
86-
c.Router.PostHandle(request)
87-
}(&req)
81+
//从绑定好的消息和对应的处理方法中执行对应的Handle方法
82+
go c.MsgHandler.DoMsgHandler(&req)
8883
}
8984
}
9085

cnet/msghandler.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package cnet
2+
3+
import (
4+
"fmt"
5+
"strconv"
6+
7+
"github.com/Cai-ki/cinx/ciface"
8+
)
9+
10+
type MsgHandle struct {
11+
Apis map[uint32]ciface.IRouter //存放每个MsgId 所对应的处理方法的map属性
12+
}
13+
14+
func NewMsgHandle() *MsgHandle {
15+
return &MsgHandle{
16+
Apis: make(map[uint32]ciface.IRouter),
17+
}
18+
}
19+
20+
// 马上以非阻塞方式处理消息
21+
func (mh *MsgHandle) DoMsgHandler(request ciface.IRequest) {
22+
handler, ok := mh.Apis[request.GetMsgID()]
23+
if !ok {
24+
fmt.Println("api msgId = ", request.GetMsgID(), " is not FOUND!")
25+
return
26+
}
27+
28+
//执行对应处理方法
29+
handler.PreHandle(request)
30+
handler.Handle(request)
31+
handler.PostHandle(request)
32+
}
33+
34+
// 为消息添加具体的处理逻辑
35+
func (mh *MsgHandle) AddRouter(msgId uint32, router ciface.IRouter) {
36+
//1 判断当前msg绑定的API处理方法是否已经存在
37+
if _, ok := mh.Apis[msgId]; ok {
38+
panic("repeated api , msgId = " + strconv.Itoa(int(msgId)))
39+
}
40+
//2 添加msg与api的绑定关系
41+
mh.Apis[msgId] = router
42+
fmt.Println("Add api msgId = ", msgId)
43+
}

cnet/server.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ type Server struct {
2020
IP string
2121
//服务绑定的端口
2222
Port int
23-
//当前Server由用户绑定的回调router,也就是Server注册的链接对应的处理业务
24-
Router ciface.IRouter
23+
//当前Server的消息管理模块,用来绑定MsgId和对应的处理方法
24+
msgHandler ciface.IMsgHandle
2525
}
2626

2727
// ============== 定义当前客户端链接的handle api ===========
@@ -79,7 +79,7 @@ func (s *Server) Start() {
7979
//3.2 TODO Server.Start() 设置服务器最大连接控制,如果超过最大连接,那么则关闭此新的连接
8080

8181
//3.3 处理该新连接请求的 业务 方法, 此时应该有 handler 和 conn是绑定的
82-
dealConn := NewConntion(conn, cid, s.Router)
82+
dealConn := NewConntion(conn, cid, s.msgHandler)
8383
cid++
8484

8585
//3.4 启动当前链接的处理业务
@@ -106,8 +106,8 @@ func (s *Server) Serve() {
106106
}
107107

108108
// 路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用
109-
func (s *Server) AddRouter(router ciface.IRouter) {
110-
s.Router = router
109+
func (s *Server) AddRouter(msgId uint32, router ciface.IRouter) {
110+
s.msgHandler.AddRouter(msgId, router)
111111

112112
fmt.Println("Add Router succ! ")
113113
}
@@ -120,11 +120,11 @@ func NewServer() ciface.IServer {
120120
cutils.GlobalObject.Reload()
121121

122122
s := &Server{
123-
Name: cutils.GlobalObject.Name, //从全局参数获取
124-
IPVersion: "tcp4",
125-
IP: cutils.GlobalObject.Host, //从全局参数获取
126-
Port: cutils.GlobalObject.TcpPort, //从全局参数获取
127-
Router: nil,
123+
Name: cutils.GlobalObject.Name, //从全局参数获取
124+
IPVersion: "tcp4",
125+
IP: cutils.GlobalObject.Host, //从全局参数获取
126+
Port: cutils.GlobalObject.TcpPort, //从全局参数获取
127+
msgHandler: NewMsgHandle(), //msgHandler 初始化
128128
}
129129
return s
130130
}

examples/多路由/Client0/main.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"net"
7+
"time"
8+
9+
"github.com/Cai-ki/cinx/cnet"
10+
)
11+
12+
/*
13+
模拟客户端
14+
*/
15+
func main() {
16+
17+
fmt.Println("Client Test ... start")
18+
//3秒之后发起测试请求,给服务端开启服务的机会
19+
time.Sleep(3 * time.Second)
20+
21+
conn, err := net.Dial("tcp", "127.0.0.1:7777")
22+
if err != nil {
23+
fmt.Println("client start err, exit!")
24+
return
25+
}
26+
27+
for {
28+
//发封包message消息
29+
dp := cnet.NewDataPack()
30+
msg, _ := dp.Pack(cnet.NewMsgPackage(0, []byte("Zinx V0.6 Client0 Test Message")))
31+
_, err := conn.Write(msg)
32+
if err != nil {
33+
fmt.Println("write error err ", err)
34+
return
35+
}
36+
37+
//先读出流中的head部分
38+
headData := make([]byte, dp.GetHeadLen())
39+
_, err = io.ReadFull(conn, headData) //ReadFull 会把msg填充满为止
40+
if err != nil {
41+
fmt.Println("read head error")
42+
break
43+
}
44+
//将headData字节流 拆包到msg中
45+
msgHead, err := dp.Unpack(headData)
46+
if err != nil {
47+
fmt.Println("server unpack err:", err)
48+
return
49+
}
50+
51+
if msgHead.GetDataLen() > 0 {
52+
//msg 是有data数据的,需要再次读取data数据
53+
msg := msgHead.(*cnet.Message)
54+
msg.Data = make([]byte, msg.GetDataLen())
55+
56+
//根据dataLen从io中读取字节流
57+
_, err := io.ReadFull(conn, msg.Data)
58+
if err != nil {
59+
fmt.Println("server unpack data err:", err)
60+
return
61+
}
62+
63+
fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data))
64+
}
65+
66+
time.Sleep(1 * time.Second)
67+
}
68+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"Name": "demo server",
3+
"Host": "127.0.0.1",
4+
"TcpPort": 7777,
5+
"MaxConn": 3
6+
}

examples/多路由/Client1/main.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"net"
7+
"time"
8+
9+
"github.com/Cai-ki/cinx/cnet"
10+
)
11+
12+
/*
13+
模拟客户端
14+
*/
15+
func main() {
16+
17+
fmt.Println("Client Test ... start")
18+
//3秒之后发起测试请求,给服务端开启服务的机会
19+
time.Sleep(3 * time.Second)
20+
21+
conn, err := net.Dial("tcp", "127.0.0.1:7777")
22+
if err != nil {
23+
fmt.Println("client start err, exit!")
24+
return
25+
}
26+
27+
for {
28+
//发封包message消息
29+
dp := cnet.NewDataPack()
30+
msg, _ := dp.Pack(cnet.NewMsgPackage(1, []byte("Zinx V0.6 Client1 Test Message")))
31+
_, err := conn.Write(msg)
32+
if err != nil {
33+
fmt.Println("write error err ", err)
34+
return
35+
}
36+
37+
//先读出流中的head部分
38+
headData := make([]byte, dp.GetHeadLen())
39+
_, err = io.ReadFull(conn, headData) //ReadFull 会把msg填充满为止
40+
if err != nil {
41+
fmt.Println("read head error")
42+
break
43+
}
44+
//将headData字节流 拆包到msg中
45+
msgHead, err := dp.Unpack(headData)
46+
if err != nil {
47+
fmt.Println("server unpack err:", err)
48+
return
49+
}
50+
51+
if msgHead.GetDataLen() > 0 {
52+
//msg 是有data数据的,需要再次读取data数据
53+
msg := msgHead.(*cnet.Message)
54+
msg.Data = make([]byte, msg.GetDataLen())
55+
56+
//根据dataLen从io中读取字节流
57+
_, err := io.ReadFull(conn, msg.Data)
58+
if err != nil {
59+
fmt.Println("server unpack data err:", err)
60+
return
61+
}
62+
63+
fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data))
64+
}
65+
66+
time.Sleep(1 * time.Second)
67+
}
68+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"Name": "demo server",
3+
"Host": "127.0.0.1",
4+
"TcpPort": 7777,
5+
"MaxConn": 3
6+
}

0 commit comments

Comments
 (0)