-
Notifications
You must be signed in to change notification settings - Fork 33
/
Copy pathchatroom.go
210 lines (181 loc) · 5.71 KB
/
chatroom.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
package main
import (
"bufio"
"context"
"encoding/binary"
"fmt"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
// ChatRoomBufSize is the number of incoming messages to buffer for each topic.
const ChatRoomBufSize = 128
// Topic used to broadcast browser WebRTC addresses
const PubSubDiscoveryTopic string = "universal-connectivity-browser-peer-discovery"
const ChatTopic string = "universal-connectivity"
const ChatFileTopic string = "universal-connectivity-file"
// ChatRoom represents a subscription to a single PubSub topic. Messages
// can be published to the topic with ChatRoom.Publish, and received
// messages are pushed to the Messages channel.
type ChatRoom struct {
// Messages is a channel of messages received from other peers in the chat room
Messages chan *ChatMessage
SysMessages chan *ChatMessage
ctx context.Context
h host.Host
ps *pubsub.PubSub
chatTopic *pubsub.Topic
chatSub *pubsub.Subscription
fileTopic *pubsub.Topic
fileSub *pubsub.Subscription
peerDiscoveryTopic *pubsub.Topic
peerDiscoverySub *pubsub.Subscription
roomName string
nick string
}
// ChatMessage gets converted to/from JSON and sent in the body of pubsub messages.
type ChatMessage struct {
Message string
SenderID string
SenderNick string
}
// JoinChatRoom tries to subscribe to the PubSub topic for the room name, returning
// a ChatRoom on success.
func JoinChatRoom(ctx context.Context, h host.Host, ps *pubsub.PubSub, nickname string) (*ChatRoom, error) {
// join the pubsub chatTopic
chatTopic, err := ps.Join(ChatTopic)
if err != nil {
return nil, err
}
// and subscribe to it
chatSub, err := chatTopic.Subscribe()
if err != nil {
return nil, err
}
// join the pubsub fileTopic
fileTopic, err := ps.Join(ChatFileTopic)
if err != nil {
return nil, err
}
// and subscribe to it
fileSub, err := fileTopic.Subscribe()
if err != nil {
return nil, err
}
// join the pubsub peer disovery topic
peerDiscoveryTopic, err := ps.Join(PubSubDiscoveryTopic)
if err != nil {
return nil, err
}
// and subscribe to it
peerDiscoverySub, err := peerDiscoveryTopic.Subscribe()
if err != nil {
return nil, err
}
cr := &ChatRoom{
ctx: ctx,
h: h,
ps: ps,
chatTopic: chatTopic,
chatSub: chatSub,
fileTopic: fileTopic,
fileSub: fileSub,
peerDiscoveryTopic: peerDiscoveryTopic,
peerDiscoverySub: peerDiscoverySub,
nick: nickname,
Messages: make(chan *ChatMessage, ChatRoomBufSize),
SysMessages: make(chan *ChatMessage, ChatRoomBufSize),
}
// start reading messages from the subscription in a loop
go cr.readLoop()
return cr, nil
}
// Publish sends a message to the pubsub topic.
func (cr *ChatRoom) Publish(message string) error {
return cr.chatTopic.Publish(cr.ctx, []byte(message))
}
func (cr *ChatRoom) ListPeers() []peer.ID {
return cr.ps.ListPeers(ChatTopic)
}
// readLoop pulls messages from the pubsub chat/file topic and handles them.
func (cr *ChatRoom) readLoop() {
go cr.readChatLoop()
go cr.readFileLoop()
}
// readChatLoop pulls messages from the pubsub chat topic and pushes them onto the Messages channel.
func (cr *ChatRoom) readChatLoop() {
for {
msg, err := cr.chatSub.Next(cr.ctx)
if err != nil {
close(cr.Messages)
return
}
// only forward messages delivered by others
if msg.ReceivedFrom == cr.h.ID() {
continue
}
cm := new(ChatMessage)
cm.Message = string(msg.Data)
cm.SenderID = msg.ID
cm.SenderNick = string(msg.ID[len(msg.ID)-8])
// send valid messages onto the Messages channel
cr.Messages <- cm
}
}
// readFileLoop pulls messages from the pubsub file topic and handles them.
func (cr *ChatRoom) readFileLoop() {
for {
msg, err := cr.fileSub.Next(cr.ctx)
if err != nil {
close(cr.Messages)
return
}
// only forward messages delivered by others
if msg.ReceivedFrom == cr.h.ID() {
continue
}
fileID := msg.Data
fileBody, err := cr.requestFile(msg.GetFrom(), fileID)
if err != nil {
close(cr.Messages)
return
}
cm := new(ChatMessage)
cm.Message = fmt.Sprintf("File: %s (%v bytes) from %s", string(fileID), len(fileBody), msg.GetFrom().String())
cm.SenderID = msg.ID
cm.SenderNick = string(msg.ID[len(msg.ID)-8])
// send valid messages onto the Messages channel
cr.Messages <- cm
}
}
// requestFile sends a request to the peer to send the file with the given fileID.
func (cr *ChatRoom) requestFile(toPeer peer.ID, fileID []byte) ([]byte, error) {
stream, err := cr.h.NewStream(context.Background(), toPeer, "/universal-connectivity-file/1")
if err != nil {
return nil, fmt.Errorf("failed to create stream: %w", err)
}
defer stream.Close()
reqLen := binary.AppendUvarint([]byte{}, uint64(len(fileID)))
if _, err := stream.Write(reqLen); err != nil {
return nil, fmt.Errorf("failed to write fileID to the stream: %w", err)
}
if _, err := stream.Write(fileID); err != nil {
return nil, fmt.Errorf("failed to write fileID to the stream: %w", err)
}
if err := stream.CloseWrite(); err != nil {
return nil, fmt.Errorf("failed to close write stream: %w", err)
}
reader := bufio.NewReader(stream)
respLen, err := binary.ReadUvarint(reader)
if err != nil {
return nil, fmt.Errorf("failed to read response length prefix: %w", err)
}
fileBody := make([]byte, respLen)
if _, err := reader.Read(fileBody); err != nil {
return nil, fmt.Errorf("failed to read fileBody from the stream: %w", err)
}
if err := stream.CloseRead(); err != nil {
return nil, fmt.Errorf("failed to close read stream: %w", err)
}
return fileBody, nil
}