-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathsocket.go
195 lines (163 loc) · 4.23 KB
/
socket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package ss
import (
"bytes"
"encoding/base64"
"encoding/json"
"github.com/gorilla/websocket"
"github.com/raz-varren/log"
"sync"
)
var (
socketRNG = NewRNG()
)
//Socket represents a websocket connection
type Socket struct {
l *sync.RWMutex
id string
ws *websocket.Conn
closed bool
serv *SocketServer
roomsl *sync.RWMutex
rooms map[string]bool
}
const (
idLen int = 24
typeJSON string = "J"
typeBin = "B"
typeStr = "S"
)
func newSocket(serv *SocketServer, ws *websocket.Conn) *Socket {
s := &Socket{
l: &sync.RWMutex{},
id: newSocketID(),
ws: ws,
closed: false,
serv: serv,
roomsl: &sync.RWMutex{},
rooms: make(map[string]bool),
}
serv.hub.addSocket(s)
return s
}
func newSocketID() string {
idBuf := make([]byte, idLen)
socketRNG.Read(idBuf)
return base64.StdEncoding.EncodeToString(idBuf)
}
func (s *Socket) receive() ([]byte, error) {
_, data, err := s.ws.ReadMessage()
return data, err
}
func (s *Socket) send(msgType int, data []byte) error {
s.l.Lock()
defer s.l.Unlock()
return s.ws.WriteMessage(msgType, data)
}
//InRoom returns true if s is currently a member of roomName
func (s *Socket) InRoom(roomName string) bool {
s.roomsl.RLock()
defer s.roomsl.RUnlock()
inRoom := s.rooms[roomName]
return inRoom
}
//GetRooms returns a list of rooms that s is a member of
func (s *Socket) GetRooms() []string {
s.roomsl.RLock()
defer s.roomsl.RUnlock()
var roomList []string
for room := range s.rooms {
roomList = append(roomList, room)
}
return roomList
}
//Join adds s to the specified room. If the room does
//not exist, it will be created
func (s *Socket) Join(roomName string) {
s.roomsl.Lock()
defer s.roomsl.Unlock()
s.serv.hub.joinRoom(&joinRequest{roomName, s})
s.rooms[roomName] = true
}
//Leave removes s from the specified room. If s
//is not a member of the room, nothing will happen. If the room is
//empty upon removal of s, the room will be closed
func (s *Socket) Leave(roomName string) {
s.roomsl.Lock()
defer s.roomsl.Unlock()
s.serv.hub.leaveRoom(&leaveRequest{roomName, s})
delete(s.rooms, roomName)
}
//Roomcast dispatches an event to all Sockets in the specified room.
func (s *Socket) Roomcast(roomName, eventName string, data interface{}) {
s.serv.hub.roomcast(&RoomMsg{roomName, eventName, data})
}
//Broadcast dispatches an event to all Sockets on the SocketServer.
func (s *Socket) Broadcast(eventName string, data interface{}) {
s.serv.hub.broadcast(&BroadcastMsg{eventName, data})
}
//Socketcast dispatches an event to the specified socket ID.
func (s *Socket) Socketcast(socketID, eventName string, data interface{}) {
s.serv.Roomcast("__socket_id:"+socketID, eventName, data)
}
//Emit dispatches an event to s.
func (s *Socket) Emit(eventName string, data interface{}) error {
d, msgType := emitData(eventName, data)
return s.send(msgType, d)
}
//ID returns the unique ID of s
func (s *Socket) ID() string {
return s.id
}
//emitData combines the eventName and data into a payload that is understood
//by the sac-sock protocol.
func emitData(eventName string, data interface{}) ([]byte, int) {
buf := bytes.NewBuffer(nil)
buf.WriteString(eventName)
buf.WriteByte(startOfHeaderByte)
switch d := data.(type) {
case string:
buf.WriteString(typeStr)
buf.WriteByte(startOfDataByte)
buf.WriteString(d)
return buf.Bytes(), websocket.TextMessage
case []byte:
buf.WriteString(typeBin)
buf.WriteByte(startOfDataByte)
buf.Write(d)
return buf.Bytes(), websocket.BinaryMessage
default:
buf.WriteString(typeJSON)
buf.WriteByte(startOfDataByte)
jsonData, err := json.Marshal(d)
if err != nil {
log.Err.Println(err)
} else {
buf.Write(jsonData)
}
return buf.Bytes(), websocket.TextMessage
}
}
//Close closes the Socket connection and removes the Socket
//from any rooms that it was a member of
func (s *Socket) Close() {
s.l.Lock()
isAlreadyClosed := s.closed
s.closed = true
s.l.Unlock()
if isAlreadyClosed { //can't reclose the socket
return
}
defer log.Debug.Println(s.ID(), "disconnected")
s.ws.Close()
rooms := s.GetRooms()
for _, room := range rooms {
s.Leave(room)
}
s.serv.l.RLock()
event := s.serv.onDisconnectFunc
s.serv.l.RUnlock()
if event != nil {
event(s)
}
s.serv.hub.removeSocket(s)
}