-
-
Notifications
You must be signed in to change notification settings - Fork 49
/
stackexchange.go
129 lines (110 loc) · 4.14 KB
/
stackexchange.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package neffos
import (
"context"
)
// StackExchange is an optional interface
// that can be used to change the way neffos
// sends messages to its clients, i.e
// communication between multiple neffos servers.
//
// See the "kataras/neffos/stackexchange" subpackage for more details.
// Real-World example and usage documentation
// can be found at: "kataras/neffos/_examples/redis".
type StackExchange interface {
// OnConnect should prepare the connection's subscriber.
// It's called automatically after the neffos server's OnConnect (if any)
// on incoming client connections.
OnConnect(c *Conn) error
// OnDisconnect should close the connection's subscriber that
// created on the `OnConnect` method.
// It's called automatically when a connection goes offline,
// manually by server or client or by network failure.
OnDisconnect(c *Conn)
// Publish should publish messages through a stackexchange.
// It's called automatically on neffos broadcasting.
Publish(msgs []Message) bool
// Subscribe should subscribe to a specific namespace,
// it's called automatically on neffos namespace connected.
Subscribe(c *Conn, namespace string)
// Unsubscribe should unsubscribe from a specific namespace,
// it's called automatically on neffos namespace disconnect.
Unsubscribe(c *Conn, namespace string) // should close the subscriber.
// Ask should be able to perform a server Ask to a specific client or to all clients
// It blocks until response from a specific client if msg.To is filled,
// otherwise will return on the first responder's reply.
Ask(ctx context.Context, msg Message, token string) (Message, error)
// NotifyAsk should notify and unblock a subscribed connection for this
// specific message, "token" is the neffos wait signal for this message.
NotifyAsk(msg Message, token string) error
}
// StackExchangeInitializer is an optional interface for a `StackExchange`.
// It contains a single `Init` method which accepts
// the registered server namespaces and returns error.
// It does not called on manual `Server.StackExchange` field set,
// use the `Server.UseStackExchange` to make sure that this implementation is respected.
type StackExchangeInitializer interface {
// Init should initialize a stackexchange, it's optional.
Init(Namespaces) error
}
func stackExchangeInit(s StackExchange, namespaces Namespaces) error {
if s != nil {
if sinit, ok := s.(StackExchangeInitializer); ok {
return sinit.Init(namespaces)
}
}
return nil
}
// internal use only when more than one stack exchanges are registered.
type stackExchangeWrapper struct {
// read-only fields.
parent StackExchange
current StackExchange
}
func wrapStackExchanges(existingExc StackExchange, newExc StackExchange) StackExchange {
return &stackExchangeWrapper{
parent: existingExc,
current: newExc,
}
}
func (s *stackExchangeWrapper) OnConnect(c *Conn) error {
// return on first error, do not wrap errors,
// the server should NOT run if at least one is errored.
err := s.parent.OnConnect(c)
if err != nil {
return err
}
return s.current.OnConnect(c)
}
func (s *stackExchangeWrapper) OnDisconnect(c *Conn) {
s.parent.OnDisconnect(c)
s.current.OnDisconnect(c)
}
func (s *stackExchangeWrapper) Publish(msgs []Message) bool {
// keep try on the next but return false on any failure.
okParent := s.parent.Publish(msgs)
okCurrent := s.current.Publish(msgs)
return okParent && okCurrent
}
func (s *stackExchangeWrapper) Ask(ctx context.Context, msg Message, token string) (Message, error) {
// we run Ask and if one is failing then we keep trying for all stackexchanges.
msg, err := s.parent.Ask(ctx, msg, token)
if err != nil {
msg, err = s.current.Ask(ctx, msg, token)
}
return msg, err
}
func (s *stackExchangeWrapper) NotifyAsk(msg Message, token string) error {
err := s.parent.NotifyAsk(msg, token)
if err != nil {
return s.current.NotifyAsk(msg, token)
}
return nil
}
func (s *stackExchangeWrapper) Subscribe(c *Conn, namespace string) {
s.parent.Subscribe(c, namespace)
s.current.Subscribe(c, namespace)
}
func (s *stackExchangeWrapper) Unsubscribe(c *Conn, namespace string) {
s.parent.Unsubscribe(c, namespace)
s.current.Unsubscribe(c, namespace)
}