Skip to content

Commit 18fe598

Browse files
committed
adding raw bytes parser (WIP)
1 parent 4661b68 commit 18fe598

File tree

9 files changed

+298
-115
lines changed

9 files changed

+298
-115
lines changed

.idea/vcs.xml

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/main.go

Lines changed: 43 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,22 @@ package main
33
import (
44
"bufio"
55
"bytes"
6-
"encoding/json"
76
"errors"
7+
"flag"
88
"io"
99
"log/slog"
1010
"net"
1111

12-
"github.com/swagftw/rex/types"
12+
"github.com/swagftw/rex"
1313
)
1414

1515
var daemonAddr = "0.0.0.0:8080"
1616

1717
func main() {
18-
defer func() {
19-
if r := recover(); r != nil {
20-
slog.Error("panic: while running client", "err", r)
21-
}
22-
}()
18+
verbose := flag.Bool("verbose", false, "verbose logging")
19+
20+
flag.Parse()
21+
rex.InitLogger(*verbose)
2322

2423
conn, err := net.Dial("tcp", daemonAddr)
2524
if err != nil {
@@ -30,83 +29,70 @@ func main() {
3029

3130
slog.Info("connected to the daemon")
3231

33-
for {
34-
// check if the connection is still alive
35-
_, err = conn.Write([]byte{'\n'})
36-
if err != nil {
37-
slog.Error("connection is not alive", "err", err)
38-
39-
return
40-
}
32+
reader := bufio.NewReader(conn)
4133

42-
reader := bufio.NewReader(conn)
34+
err = registerClient(conn)
35+
if err != nil {
36+
return
37+
}
4338

44-
err = readConnection(conn, reader)
39+
for {
40+
err = readConnection(reader)
4541
if errors.Is(err, io.EOF) {
4642
return
4743
}
4844
}
4945
}
5046

51-
func readConnection(conn net.Conn, reader *bufio.Reader) error {
52-
byteData, err := reader.ReadBytes('\n')
47+
func registerClient(conn net.Conn) error {
48+
err := writeToConn(conn, []byte("REX REGISTER\r\n"))
5349
if err != nil {
54-
if errors.Is(err, io.EOF) {
55-
slog.Error("connection closed", "err", err)
56-
57-
return err
58-
}
59-
60-
slog.Error("failed to read data", "err", err)
61-
6250
return err
6351
}
6452

65-
data := new(types.Data)
66-
err = json.Unmarshal(byteData, data)
67-
if err != nil {
68-
slog.Error("failed to unmarshal data", "err", err)
53+
slog.Info("registered client")
6954

70-
return nil
71-
}
55+
return nil
56+
}
7257

73-
if data.Type != types.HeartbeatPing {
74-
return nil
75-
}
58+
func readConnection(reader *bufio.Reader) error {
59+
for {
60+
byteData, err := reader.ReadBytes('\n')
61+
if err != nil {
62+
if errors.Is(err, io.EOF) {
63+
slog.Debug("connection closed", "err", err)
7664

77-
slog.Info("ping received")
65+
return err
66+
}
7867

79-
data = &types.Data{
68+
slog.Error("failed to read data", "err", err)
8069

81-
Type: types.HeartbeatPong,
82-
}
70+
return err
71+
}
8372

84-
jsonData, err := json.Marshal(data)
85-
if err != nil {
86-
slog.Error("failed to marshal data", "err", err)
73+
// end of the message
74+
if bytes.Equal(byteData, []byte{'\r', '\n', '\r', '\n'}) {
75+
break
76+
}
8777

88-
return nil
78+
// check if the message is "ping"
79+
if bytes.HasPrefix(byteData, []byte("REX PING")) {
80+
continue
81+
}
8982
}
9083

91-
buf := new(bytes.Buffer)
92-
93-
err = json.Compact(buf, jsonData)
94-
if err != nil {
95-
slog.Error("failed to compact data", "err", err)
96-
97-
return nil
98-
}
84+
return nil
85+
}
9986

100-
buf.WriteByte('\n')
87+
func writeToConn(conn net.Conn, data []byte) error {
88+
data = append(data, []byte{'\r', '\n'}...)
10189

102-
_, err = conn.Write(buf.Bytes())
90+
_, err := conn.Write(data)
10391
if err != nil {
10492
slog.Error("failed to write data", "err", err)
10593

106-
return nil
94+
return err
10795
}
10896

109-
slog.Info("pong sent")
110-
11197
return nil
11298
}

daemon/heartbeat/heartbeat.go

Lines changed: 104 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,30 @@ import (
66
"encoding/json"
77
"errors"
88
"fmt"
9+
"io"
910
"log/slog"
1011
"net"
12+
"sync"
1113
"time"
1214

15+
gonanoid "github.com/matoous/go-nanoid"
16+
1317
"github.com/swagftw/rex/types"
1418
)
1519

1620
var port = 8080
1721

22+
var ActiveClients = sync.Map{}
23+
var clientLock = sync.RWMutex{}
24+
1825
func StartListener() (net.Listener, error) {
19-
slog.Info("starting listener...", "port", port)
20-
listener, err := net.Listen("tcp", "0.0.0.0:"+fmt.Sprintf("%d", port))
26+
slog.Info("starting heartbeat...", "port", port)
27+
28+
hostPort := net.JoinHostPort("0.0.0.0", fmt.Sprintf("%d", port))
29+
30+
listener, err := net.Listen("tcp", hostPort)
2131
if err != nil {
22-
slog.Error("failed to start listener", "err", err)
32+
slog.Error("failed to start heartbeat", "err", err)
2333

2434
return nil, err
2535
}
@@ -43,10 +53,12 @@ func StartHeartbeat() error {
4353
continue
4454
}
4555

46-
// write ping messages to the client
47-
ticker := time.NewTicker(time.Second * 2)
56+
go func(conn net.Conn) {
57+
// write ping messages to the client
58+
ticker := time.NewTicker(time.Second * 2)
59+
60+
defer ticker.Stop()
4861

49-
go func(conn net.Conn, ticker *time.Ticker) {
5062
defer func() {
5163
if r := recover(); r != nil {
5264
slog.Error("panic: while sending ping", "err", r)
@@ -62,42 +74,15 @@ func StartHeartbeat() error {
6274

6375
for {
6476
if <-ticker.C; true {
65-
data := types.Data{
66-
Type: types.HeartbeatPing,
67-
}
68-
69-
buf := &bytes.Buffer{}
70-
71-
jsonData, err := json.Marshal(data)
77+
err = PingClient(conn)
7278
if err != nil {
73-
slog.Error("failed to marshal data", "err", err)
74-
75-
panic(err)
76-
}
77-
78-
err = json.Compact(buf, jsonData)
79-
if err != nil {
80-
slog.Error("failed to compact data", "err", err)
81-
82-
panic(err)
83-
}
84-
85-
buf.WriteByte('\n')
86-
87-
_, err = conn.Write(buf.Bytes())
88-
if err != nil {
89-
ticker.Stop()
90-
91-
slog.Error("failed to send ping", "err", err)
92-
9379
return
9480
}
95-
96-
slog.Info("ping sent")
9781
}
9882
}
99-
}(conn, ticker)
83+
}(conn)
10084

85+
// read from connection
10186
go func(conn net.Conn) {
10287
defer func() {
10388
if r := recover(); r != nil {
@@ -119,18 +104,16 @@ func StartHeartbeat() error {
119104

120105
buf, err = reader.ReadBytes('\n')
121106
if err != nil {
122-
slog.Error("failed to read the bytes", "err", err)
123-
124-
if errors.Is(err, net.ErrClosed) {
107+
if errors.Is(err, io.EOF) {
125108
return
126109
}
127110

111+
slog.Error("failed to read the bytes", "err", err)
112+
128113
return
129114
}
130115

131-
if len(buf) < 1 || buf[0] == '\n' {
132-
continue
133-
}
116+
bytes.TrimSuffix(buf, []byte{'\r'})
134117

135118
cd := new(types.Data)
136119

@@ -142,10 +125,11 @@ func StartHeartbeat() error {
142125
}
143126

144127
switch cd.Type {
145-
case types.HeartbeatPong:
146-
slog.Info("pong received")
147128
case types.RegisterClient:
148-
registerClient(cd)
129+
err = registerClient(conn)
130+
if err != nil {
131+
return
132+
}
149133
case types.Close:
150134
return
151135
default:
@@ -156,6 +140,79 @@ func StartHeartbeat() error {
156140
}
157141
}
158142

159-
func registerClient(cd *types.Data) {
160-
slog.Info("client registered", "data", cd)
143+
func PingClient(conn net.Conn) error {
144+
data := types.Data{
145+
Type: types.HeartbeatPing,
146+
}
147+
148+
buf := &bytes.Buffer{}
149+
150+
jsonData, err := json.Marshal(data)
151+
if err != nil {
152+
slog.Error("failed to marshal data", "err", err)
153+
154+
return err
155+
}
156+
157+
err = json.Compact(buf, jsonData)
158+
if err != nil {
159+
slog.Error("failed to compact data", "err", err)
160+
161+
return err
162+
}
163+
164+
buf.WriteByte('\n')
165+
166+
_, err = conn.Write(buf.Bytes())
167+
if err != nil {
168+
if errors.Is(err, net.ErrClosed) {
169+
return err
170+
}
171+
172+
slog.Error("failed to send ping", "err", err)
173+
174+
return err
175+
}
176+
177+
slog.Debug("ping sent")
178+
179+
return nil
180+
}
181+
182+
func registerClient(conn net.Conn) error {
183+
addr := conn.RemoteAddr().String()
184+
185+
slog.Info("client connected", "addr", addr)
186+
187+
id := generateID()
188+
189+
ActiveClients.Store(id, conn)
190+
191+
data := &types.Data{
192+
Type: types.RegisterClient,
193+
}
194+
195+
jsonBytes, err := json.Marshal(data)
196+
if err != nil {
197+
slog.Error("failed to marshal data", "err", err)
198+
199+
return err
200+
}
201+
202+
jsonBytes = append(jsonBytes, []byte{'\r', '\n'}...)
203+
204+
_, err = conn.Write(jsonBytes)
205+
if err != nil {
206+
slog.Error("failed to write data", "err", err)
207+
208+
return err
209+
}
210+
211+
return nil
212+
}
213+
214+
var chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
215+
216+
func generateID() string {
217+
return gonanoid.MustGenerate(chars, 6)
161218
}

0 commit comments

Comments
 (0)