Skip to content

Commit c23cb9e

Browse files
committed
简单实现了客户端模块
1 parent 0648204 commit c23cb9e

File tree

10 files changed

+294
-15
lines changed

10 files changed

+294
-15
lines changed

chook/hheartbeat.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ func StartHeartbeatChecker(conn ciface.IConn) {
4242
// 超过 15 秒未收到消息,判定超时
4343
if time.Since(lastActiveTime.(time.Time)) > 15*time.Second {
4444
fmt.Println("Connection timeout, closing...")
45+
conn.SendBuffMsg(404, []byte("timeout"))
46+
time.Sleep(5 * time.Second)
4547
conn.Stop()
4648
return
4749
}

ciface/iclient.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package ciface
2+
3+
// 客户端接口
4+
type IClient interface {
5+
Restart()
6+
Start()
7+
Stop()
8+
AddRouter(msgId uint32, router IRouter)
9+
Conn() IConn
10+
GetMsgHandler() IMsgHandle
11+
12+
SetOnConnStart(func(IConn))
13+
SetOnConnStop(func(IConn))
14+
GetOnConnStart() func(IConn)
15+
GetOnConnStop() func(IConn)
16+
}

ciface/iserver.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,17 @@ package ciface
22

33
//定义服务器接口
44
type IServer interface {
5-
//启动服务器方法
65
Start()
7-
//停止服务器方法
86
Stop()
9-
//开启业务服务方法
107
Serve()
11-
//路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用
8+
129
AddRouter(msgId uint32, router IRouter)
13-
//得到链接管理
10+
1411
GetConnMgr() IConnManager
15-
//设置该Server的连接创建时Hook函数
12+
GetMsgHandler() IMsgHandle
13+
1614
SetOnConnStart(func(IConn))
17-
//设置该Server的连接断开时的Hook函数
1815
SetOnConnStop(func(IConn))
19-
//调用连接OnConnStart Hook函数
2016
CallOnConnStart(conn IConn)
21-
//调用连接OnConnStop Hook函数
2217
CallOnConnStop(conn IConn)
23-
GetMsgHandler() IMsgHandle // 临时
2418
}

cnet/client.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package cnet
2+
3+
import (
4+
"fmt"
5+
"net"
6+
7+
"github.com/Cai-ki/cinx/ciface"
8+
)
9+
10+
// 客户端接口实现
11+
type Client struct {
12+
Name string
13+
IPVersion string
14+
IP string
15+
Port int
16+
conn ciface.IConn
17+
// connMux sync.Mutex
18+
onConnStart func(conn ciface.IConn)
19+
onConnStop func(conn ciface.IConn)
20+
msgHandler ciface.IMsgHandle
21+
22+
exitChan chan struct{}
23+
}
24+
25+
var _ ciface.IClient = (*Client)(nil)
26+
27+
func NewClient(name, ipVersion, ip string, port int) ciface.IClient {
28+
c := &Client{
29+
Name: name,
30+
IPVersion: ipVersion,
31+
IP: ip,
32+
Port: port,
33+
34+
msgHandler: NewMsgHandle(),
35+
}
36+
return c
37+
}
38+
39+
func (c *Client) Restart() {
40+
c.Stop()
41+
c.Start()
42+
}
43+
44+
func (c *Client) Start() {
45+
c.exitChan = make(chan struct{})
46+
go c.msgHandler.StartWorkerPool()
47+
48+
go func() {
49+
addr, err := net.ResolveTCPAddr(c.IPVersion, fmt.Sprintf("%s:%d", c.IP, c.Port))
50+
if err != nil {
51+
fmt.Println("[Cinx] resolve tcp address err: ", err)
52+
return
53+
}
54+
55+
conn, err := net.DialTCP(c.IPVersion, nil, addr)
56+
if err != nil {
57+
fmt.Println("[Cinx] dial tcp err: ", err)
58+
}
59+
60+
c.conn = NewClientConn(c, conn)
61+
62+
go c.conn.Start()
63+
64+
select {
65+
case <-c.exitChan:
66+
fmt.Println("[Cinx] client exit")
67+
return
68+
}
69+
}()
70+
}
71+
func (c *Client) Stop() {
72+
con := c.Conn()
73+
if con != nil {
74+
con.Stop()
75+
}
76+
}
77+
func (c *Client) Conn() ciface.IConn {
78+
return c.conn
79+
}
80+
func (c *Client) AddRouter(msgId uint32, router ciface.IRouter) {
81+
c.msgHandler.AddRouter(msgId, router)
82+
}
83+
func (c *Client) GetMsgHandler() ciface.IMsgHandle {
84+
return c.msgHandler
85+
}
86+
func (c *Client) SetOnConnStart(f func(ciface.IConn)) {
87+
c.onConnStart = f
88+
}
89+
func (c *Client) SetOnConnStop(f func(ciface.IConn)) {
90+
c.onConnStop = f
91+
}
92+
func (c *Client) GetOnConnStart() func(ciface.IConn) {
93+
return c.onConnStart
94+
}
95+
func (c *Client) GetOnConnStop() func(ciface.IConn) {
96+
return c.onConnStop
97+
}

cnet/conn.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,22 @@ func NewConntion(server ciface.IServer, conn *net.TCPConn, connID uint32, msgHan
5555
c.TcpServer.GetConnMgr().Add(c) //将当前新创建的连接添加到ConnManager中
5656
return c
5757
}
58+
59+
func NewClientConn(client ciface.IClient, conn *net.TCPConn) ciface.IConn {
60+
c := &Connection{
61+
TcpServer: NewServer(), // TODO: 临时创建一个server,后续需要修改
62+
Conn: conn,
63+
ConnID: 0, // ignore
64+
isClosed: false,
65+
MsgHandler: client.GetMsgHandler(),
66+
ExitBuffChan: make(chan bool, 1),
67+
msgChan: make(chan []byte), //msgChan初始化
68+
msgBuffChan: make(chan []byte, cutils.GlobalObject.MaxMsgChanLen),
69+
property: make(map[string]interface{}), //对链接属性map初始化
70+
}
71+
return c
72+
}
73+
5874
func (c *Connection) StartReader() {
5975
fmt.Println("Reader Goroutine is running")
6076
defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!")

cnet/server.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ type Server struct {
1313
//服务器的名称
1414
Name string
1515
//tcp4 or other
16-
TcpVersion string
16+
IPVersion string
1717
//服务绑定的 IP 地址
1818
IP string
1919
//服务绑定的端口
@@ -42,16 +42,16 @@ func (s *Server) Start() {
4242
s.msgHandler.StartWorkerPool()
4343

4444
//1 封装 tcp 地址
45-
addr, err := net.ResolveTCPAddr(s.TcpVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))
45+
addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))
4646
if err != nil {
4747
fmt.Println("[Cinx] resolve tcp address err: ", err)
4848
return
4949
}
5050

5151
//2 创建监听 socket
52-
listenner, err := net.ListenTCP(s.TcpVersion, addr)
52+
listenner, err := net.ListenTCP(s.IPVersion, addr)
5353
if err != nil {
54-
fmt.Println("[Cinx] listen", s.TcpVersion, "err", err)
54+
fmt.Println("[Cinx] listen", s.IPVersion, "err", err)
5555
return
5656
}
5757

@@ -149,7 +149,7 @@ func NewServer() ciface.IServer {
149149

150150
s := &Server{
151151
Name: cutils.GlobalObject.Name,
152-
TcpVersion: "tcp4",
152+
IPVersion: "tcp4",
153153
IP: cutils.GlobalObject.Host,
154154
Port: cutils.GlobalObject.TcpPort,
155155
msgHandler: NewMsgHandle(),
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"Name": "demo server",
3+
"Host": "127.0.0.1",
4+
"TcpPort": 7777,
5+
"MaxConn": 3
6+
}

examples/客户端/Client/main.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
"github.com/Cai-ki/cinx/ciface"
8+
"github.com/Cai-ki/cinx/cnet"
9+
)
10+
11+
type helloRouter struct {
12+
cnet.BaseRouter
13+
}
14+
15+
func (h *helloRouter) Handle(request ciface.IRequest) {
16+
fmt.Println("[Cinx] Received:", string(request.GetData()))
17+
}
18+
19+
func main() {
20+
21+
fmt.Println("Client Test ... start")
22+
//3秒之后发起测试请求,给服务端开启服务的机会
23+
time.Sleep(1 * time.Second)
24+
25+
client := cnet.NewClient("Client", "tcp4", "127.0.0.1", 7777)
26+
client.AddRouter(0, &helloRouter{})
27+
client.Start()
28+
29+
for {
30+
time.Sleep(3 * time.Second)
31+
client.Conn().SendMsg(0, []byte("hello"))
32+
}
33+
34+
// conn, err := net.Dial("tcp4", "127.0.0.1:7777")
35+
// if err != nil {
36+
// fmt.Println("client start err, exit!")
37+
// return
38+
// }
39+
40+
// //发封包message消息
41+
// dp := cnet.NewDataPack()
42+
// call := func(id uint32, data string) {
43+
// fmt.Println("==> Call Msg: ID=", id, ", data=", data)
44+
45+
// msg, _ := dp.Pack(cnet.NewMsgPackage(id, []byte(data)))
46+
// _, err := conn.Write(msg)
47+
// if err != nil {
48+
// fmt.Println("write error err ", err)
49+
// return
50+
// }
51+
52+
// }
53+
54+
// get := func() (msg *cnet.Message) {
55+
// msg = &cnet.Message{}
56+
// msg.Id = 404
57+
// //先读出流中的head部分
58+
// headData := make([]byte, dp.GetHeadLen())
59+
// _, err = io.ReadFull(conn, headData) //ReadFull 会把msg填充满为止
60+
// if err != nil {
61+
// fmt.Println("read head error")
62+
// return
63+
// }
64+
// //将headData字节流 拆包到msg中
65+
// msgHead, err := dp.Unpack(headData)
66+
// if err != nil {
67+
// fmt.Println("server unpack err:", err)
68+
// return
69+
// }
70+
71+
// if msgHead.GetDataLen() > 0 {
72+
// //msg 是有data数据的,需要再次读取data数据
73+
// msg = msgHead.(*cnet.Message)
74+
// msg.Data = make([]byte, msg.GetDataLen())
75+
76+
// //根据dataLen从io中读取字节流
77+
// _, err := io.ReadFull(conn, msg.Data)
78+
// if err != nil {
79+
// fmt.Println("server unpack data err:", err)
80+
// return
81+
// }
82+
83+
// fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data))
84+
// }
85+
86+
// return
87+
// }
88+
89+
// call(0, "hello")
90+
91+
// for {
92+
// msg := get()
93+
// if msg.Id == 0 {
94+
// call(0, "get: "+string(msg.Data))
95+
// } else if msg.Id == 1001 {
96+
// // call(1002, "get: "+string(msg.Data))
97+
// } else if msg.Id == 1002 {
98+
// call(1001, "get: "+string(msg.Data))
99+
// } else {
100+
// fmt.Println("error")
101+
// break
102+
// }
103+
// time.Sleep(1 * time.Second)
104+
// }
105+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"Name": "demo server",
3+
"Host": "127.0.0.1",
4+
"TcpPort": 7777,
5+
"MaxConn": 3
6+
}

examples/客户端/Server/main.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/Cai-ki/cinx/chook"
7+
"github.com/Cai-ki/cinx/ciface"
8+
"github.com/Cai-ki/cinx/cnet"
9+
"github.com/Cai-ki/cinx/crouter"
10+
)
11+
12+
type helloRouter struct {
13+
cnet.BaseRouter
14+
}
15+
16+
func (h *helloRouter) Handle(request ciface.IRequest) {
17+
// 请求,直接回复响应
18+
fmt.Println("[Cinx] Received:", string(request.GetData()))
19+
err := request.GetConn().SendMsg(0, []byte("received"))
20+
if err != nil {
21+
fmt.Println("[Cinx] error:", err)
22+
}
23+
}
24+
25+
func main() {
26+
//创建一个server句柄
27+
s := cnet.NewServer()
28+
s.AddRouter(crouter.MsgIDHeartbeatRequest, &crouter.HeartbeatPingRouter{})
29+
s.AddRouter(crouter.MsgIDHeartbeatResponse, &crouter.HeartbeatPongRouter{})
30+
s.SetOnConnStart(func(conn ciface.IConn) {
31+
go chook.StartHeartbeat(conn)
32+
go chook.StartHeartbeatChecker(conn)
33+
})
34+
s.AddRouter(0, &helloRouter{})
35+
//开启服务
36+
s.Serve()
37+
}

0 commit comments

Comments
 (0)