Skip to content

Commit f36142f

Browse files
committed
fix https://github.com/kataras/iris/issues/1318 with Server.SyncBroadcaster = true
1 parent e697f7a commit f36142f

File tree

8 files changed

+156
-147
lines changed

8 files changed

+156
-147
lines changed

_examples/example/main.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,12 @@ func server(upgrader neffos.Upgrader) {
172172
Event: "chat",
173173
Body: []byte(fmt.Sprintf("Client [%s] connected too.", c.ID())),
174174
})
175-
c.Server().Broadcast(c, neffos.Message{
176-
Namespace: namespace,
177-
Event: "chat",
178-
Body: []byte(fmt.Sprintf("SECOND ONE Client [%s] connected too.", c.ID())),
179-
})
175+
176+
// c.Server().Broadcast(c, neffos.Message{
177+
// Namespace: namespace,
178+
// Event: "chat",
179+
// Body: []byte(fmt.Sprintf("SECOND ONE")),
180+
// })
180181
}
181182

182183
return nil

_examples/stress-test/broadcasting-1/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ var (
7171
)
7272

7373
func main() {
74-
7574
// connect all and then start cron.
7675
server := startServer()
7776
time.Sleep(200 * time.Millisecond)
@@ -104,6 +103,7 @@ func startServer() *neffos.Server {
104103

105104
return username
106105
}
106+
server.SyncBroadcaster = true
107107

108108
go func() {
109109
log.Fatal(http.ListenAndServe(":8080", server))

broadcaster.go

+14-60
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
package neffos
22

33
import (
4-
"context"
54
"sync"
65
"sync/atomic"
7-
"time"
86
"unsafe"
97
)
108

9+
// async broadcaster, doesn't wait for a publish to complete to all clients before any
10+
// next broadcast call.
1111
type broadcaster struct {
12-
message Message
13-
mu *sync.Mutex
14-
awaiter unsafe.Pointer
12+
messages []Message
13+
mu *sync.Mutex
14+
awaiter unsafe.Pointer
1515
}
1616

1717
func newBroadcaster() *broadcaster {
@@ -28,72 +28,26 @@ func (b *broadcaster) getAwaiter() <-chan struct{} {
2828
return *((*chan struct{})(ptr))
2929
}
3030

31-
func (b *broadcaster) broadcast(msg Message) {
31+
func (b *broadcaster) broadcast(msgs []Message) {
3232
b.mu.Lock()
33-
b.message = msg
33+
b.messages = msgs
3434
b.mu.Unlock()
3535

3636
ch := make(chan struct{})
3737
old := atomic.SwapPointer(&b.awaiter, unsafe.Pointer(&ch))
3838
close(*(*chan struct{})(old))
3939
}
4040

41-
// lock required.
42-
func (b *broadcaster) wait() (msg Message) {
41+
func (b *broadcaster) waitUntilClosed(closeCh <-chan struct{}) (msgs []Message, ok bool) {
4342
ch := b.getAwaiter()
4443
b.mu.Unlock()
45-
<-ch
46-
msg = b.message
47-
b.mu.Lock()
48-
49-
return
50-
}
44+
select {
45+
case <-ch:
46+
msgs = b.messages[:]
47+
ok = true
48+
case <-closeCh:
49+
}
5150

52-
func (b *broadcaster) subscribe(fn func(<-chan struct{})) {
53-
ch := b.getAwaiter()
54-
b.mu.Unlock()
55-
fn(ch)
5651
b.mu.Lock()
57-
}
58-
59-
func (b *broadcaster) waitUntilClosed(closeCh <-chan struct{}) (msg Message, ok bool) {
60-
b.subscribe(func(ch <-chan struct{}) {
61-
select {
62-
case <-ch:
63-
msg = b.message
64-
ok = true
65-
case <-closeCh:
66-
}
67-
})
68-
69-
return
70-
}
71-
72-
func (b *broadcaster) waitUntil(timeout time.Duration) (msg Message, ok bool) {
73-
timer := time.NewTimer(timeout)
74-
defer timer.Stop()
75-
76-
b.subscribe(func(ch <-chan struct{}) {
77-
select {
78-
case <-ch:
79-
msg = b.message
80-
ok = true
81-
case <-timer.C:
82-
}
83-
})
84-
85-
return
86-
}
87-
88-
func (b *broadcaster) waitWithContext(ctx context.Context) (msg Message, err error) {
89-
b.subscribe(func(ch <-chan struct{}) {
90-
select {
91-
case <-ch:
92-
msg = b.message
93-
case <-ctx.Done():
94-
err = ctx.Err()
95-
}
96-
})
97-
9852
return
9953
}

conn.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ type Conn struct {
8888

8989
// used to fire `conn#Close` once.
9090
closed *uint32
91-
// useful to terminate the broadcaster, see `Server#ServeHTTP.waitMessage`.
91+
// useful to terminate the broadcaster, see `Server#ServeHTTP.waitMessages`.
9292
closeCh chan struct{}
9393
}
9494

server.go

+100-66
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,17 @@ type Server struct {
4949
IDGenerator IDGenerator
5050
StackExchange StackExchange
5151

52+
// If `StackExchange` is set then this field is ignored.
53+
//
54+
// It overrides the default behavior(when no StackExchange is not used)
55+
// which publishes a message independently.
56+
// In short the default behavior doesn't wait for a message to be published to all clients
57+
// before any next broadcast call.
58+
//
59+
// Therefore, if set to true,
60+
// each broadcast call will publish its own message(s) by order.
61+
SyncBroadcaster bool
62+
5263
mu sync.RWMutex
5364
namespaces Namespaces
5465

@@ -58,14 +69,16 @@ type Server struct {
5869

5970
count uint64
6071

61-
connections map[*Conn]struct{}
62-
connect chan *Conn
63-
disconnect chan *Conn
64-
actions chan action
72+
connections map[*Conn]struct{}
73+
connect chan *Conn
74+
disconnect chan *Conn
75+
actions chan action
76+
broadcastMessages chan []Message
77+
6578
broadcaster *broadcaster
79+
6680
// messages that this server must waits
67-
// for a reply from one of its own connections(see `waitMessage`)
68-
// or TODO: from cloud (see `StackExchange.PublishAndWait`).
81+
// for a reply from one of its own connections(see `waitMessages`).
6982
waitingMessages map[string]chan Message
7083
waitingMessagesMutex sync.RWMutex
7184

@@ -94,22 +107,21 @@ func New(upgrader Upgrader, connHandler ConnHandler) *Server {
94107
readTimeout, writeTimeout := getTimeouts(connHandler)
95108
namespaces := connHandler.GetNamespaces()
96109
s := &Server{
97-
uuid: uuid.Must(uuid.NewV4()).String(),
98-
upgrader: upgrader,
99-
namespaces: namespaces,
100-
readTimeout: readTimeout,
101-
writeTimeout: writeTimeout,
102-
connections: make(map[*Conn]struct{}),
103-
connect: make(chan *Conn, 1),
104-
disconnect: make(chan *Conn),
105-
actions: make(chan action),
106-
broadcaster: newBroadcaster(),
107-
waitingMessages: make(map[string]chan Message),
108-
IDGenerator: DefaultIDGenerator,
110+
uuid: uuid.Must(uuid.NewV4()).String(),
111+
upgrader: upgrader,
112+
namespaces: namespaces,
113+
readTimeout: readTimeout,
114+
writeTimeout: writeTimeout,
115+
connections: make(map[*Conn]struct{}),
116+
connect: make(chan *Conn, 1),
117+
disconnect: make(chan *Conn),
118+
actions: make(chan action),
119+
broadcastMessages: make(chan []Message),
120+
broadcaster: newBroadcaster(),
121+
waitingMessages: make(map[string]chan Message),
122+
IDGenerator: DefaultIDGenerator,
109123
}
110124

111-
// s.broadcastCond = sync.NewCond(&s.broadcastMu)
112-
113125
go s.start()
114126

115127
return s
@@ -171,6 +183,10 @@ func (s *Server) start() {
171183
s.StackExchange.OnDisconnect(c)
172184
}
173185
}
186+
case msgs := <-s.broadcastMessages:
187+
for c := range s.connections {
188+
publishMessages(c, msgs)
189+
}
174190
case act := <-s.actions:
175191
for c := range s.connections {
176192
act.call(c)
@@ -312,9 +328,9 @@ func (s *Server) Upgrade(
312328
c.ReconnectTries, _ = strconv.Atoi(retriesHeaderValue)
313329
}
314330

315-
if !s.usesStackExchange() {
331+
if !s.usesStackExchange() && !s.SyncBroadcaster {
316332
go func(c *Conn) {
317-
for s.waitMessage(c) {
333+
for s.waitMessages(c) {
318334
}
319335
}(c)
320336
}
@@ -380,34 +396,6 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
380396
s.Upgrade(w, r, nil, nil)
381397
}
382398

383-
func (s *Server) waitMessage(c *Conn) bool {
384-
s.broadcaster.mu.Lock()
385-
defer s.broadcaster.mu.Unlock()
386-
387-
msg, ok := s.broadcaster.waitUntilClosed(c.closeCh)
388-
if !ok {
389-
return false
390-
}
391-
392-
if msg.from == c.ID() {
393-
// if the message is not supposed to return back to any connection with this ID.
394-
return true
395-
}
396-
397-
// if "To" field is given then send to a specific connection.
398-
if msg.To != "" && msg.To != c.ID() {
399-
return true
400-
}
401-
402-
// c.Write may fail if the message is not supposed to end to this client
403-
// but the connection should be still open in order to continue.
404-
if !c.Write(msg) && c.IsClosed() {
405-
return false
406-
}
407-
408-
return true
409-
}
410-
411399
// GetTotalConnections returns the total amount of the connected connections to the server, it's fast
412400
// and can be used as frequently as needed.
413401
func (s *Server) GetTotalConnections() uint64 {
@@ -438,6 +426,41 @@ func (s *Server) Do(fn func(*Conn), async bool) {
438426
}
439427
}
440428

429+
func publishMessages(c *Conn, msgs []Message) bool {
430+
for _, msg := range msgs {
431+
if msg.from == c.ID() {
432+
// if the message is not supposed to return back to any connection with this ID.
433+
434+
return true
435+
}
436+
437+
// if "To" field is given then send to a specific connection.
438+
if msg.To != "" && msg.To != c.ID() {
439+
return true
440+
}
441+
442+
// c.Write may fail if the message is not supposed to end to this client
443+
// but the connection should be still open in order to continue.
444+
if !c.Write(msg) && c.IsClosed() {
445+
return false
446+
}
447+
}
448+
449+
return true
450+
}
451+
452+
func (s *Server) waitMessages(c *Conn) bool {
453+
s.broadcaster.mu.Lock()
454+
defer s.broadcaster.mu.Unlock()
455+
456+
msgs, ok := s.broadcaster.waitUntilClosed(c.closeCh)
457+
if !ok {
458+
return false
459+
}
460+
461+
return publishMessages(c, msgs)
462+
}
463+
441464
type stringerValue struct{ v string }
442465

443466
func (s stringerValue) String() string { return s.v }
@@ -452,7 +475,7 @@ func (s stringerValue) String() string { return s.v }
452475
// neffos.Message{Namespace: "default", Room: "roomName or empty", Event: "chat", Body: [...]})
453476
func Exclude(connID string) fmt.Stringer { return stringerValue{connID} }
454477

455-
// Broadcast method is fast and does not block any new incoming connection,
478+
// Broadcast method is fast and does not block any new incoming connection by-default,
456479
// it can be used as frequently as needed. Use the "msg"'s Namespace, or/and Event or/and Room to broadcast
457480
// to a specific type of connection collectives.
458481
//
@@ -465,34 +488,45 @@ func Exclude(connID string) fmt.Stringer { return stringerValue{connID} }
465488
// nsConn.Conn.Server().Broadcast(
466489
// nsConn OR nil,
467490
// neffos.Message{Namespace: "default", Room: "roomName or empty", Event: "chat", Body: [...]})
468-
func (s *Server) Broadcast(exceptSender fmt.Stringer, msg Message) {
491+
//
492+
// Note that it if `StackExchange` is nil then its default behavior
493+
// doesn't wait for a publish to complete to all clients before any
494+
// next broadcast call. To change that behavior set the `Server.SyncBroadcaster` to true
495+
// before server start.
496+
func (s *Server) Broadcast(exceptSender fmt.Stringer, msgs ...Message) {
497+
469498
if exceptSender != nil {
499+
var fromExplicit, from string
500+
470501
switch c := exceptSender.(type) {
471502
case *Conn:
472-
msg.FromExplicit = c.serverConnID
503+
fromExplicit = c.serverConnID
473504
case *NSConn:
474-
msg.FromExplicit = c.Conn.serverConnID
505+
fromExplicit = c.Conn.serverConnID
475506
default:
476-
msg.from = exceptSender.String()
507+
from = exceptSender.String()
477508
}
478509

479-
// msg.from = exceptSender.String()
510+
for i := range msgs {
511+
if from != "" {
512+
msgs[i].from = from
513+
} else {
514+
msgs[i].FromExplicit = fromExplicit
515+
}
516+
}
480517
}
481518

482-
// s.broadcast <- msg
483-
484-
// s.broadcastMu.Lock()
485-
// s.broadcastMessage = msg
486-
// s.broadcastMu.Unlock()
487-
488-
// s.broadcastCond.Broadcast()
489-
490519
if s.usesStackExchange() {
491-
s.StackExchange.Publish(msg)
520+
s.StackExchange.Publish(msgs)
521+
return
522+
}
523+
524+
if s.SyncBroadcaster {
525+
s.broadcastMessages <- msgs
492526
return
493527
}
494528

495-
s.broadcaster.broadcast(msg)
529+
s.broadcaster.broadcast(msgs)
496530
}
497531

498532
// Ask is like `Broadcast` but it blocks until a response

0 commit comments

Comments
 (0)