-
Notifications
You must be signed in to change notification settings - Fork 0
/
trigger.go
174 lines (151 loc) · 4.32 KB
/
trigger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package emitter
import (
"reflect"
"runtime"
"sync"
"sync/atomic"
)
// Trigger is a simple lightweight process for sending simple one shot notifications on
// multiple channels. There is no event or notification content, just a "wake up" signal
// sent to many channels at the same time. Calling the trigger costs virtually nothing
// (just one atomic integer operation) while listeners can have more complex logic.
//
// Even if there are a lot of listeners and it takes time to deliver notifications, each
// call to Push will translate to one attempt to push something on the listening channels.
// The only case where a channel drops notifications is if it's not ready to listen, but
// this can be solved by adding some capacity to the channels by setting Cap to something
// larger than zero.
type Trigger interface {
Listen() *TriggerListener
ListenCap(c uint) *TriggerListener
Push()
Close() error
}
type triggerImpl struct {
Cap uint // capacity for channels generated by Trigger.Listen
send uint32 // number of pending signals
l sync.RWMutex
c *sync.Cond
closed uint32 // if not zero, means this trigger has been closed
ch map[<-chan struct{}]chan struct{}
chLk sync.RWMutex
}
// TriggerListener represents a listener that will receive notifications when the trigger
// is pushed. Call Release() after using it to close the channel (with a defer l.Release())
type TriggerListener struct {
C <-chan struct{}
t *triggerImpl
}
// NewTrigger returns a new trigger object ready for use. This will also create a goroutine
func NewTrigger() Trigger {
tr := &triggerImpl{
Cap: 1, // 1 by default so we can queue even just 1 pending call
ch: make(map[<-chan struct{}]chan struct{}),
}
tr.c = sync.NewCond(tr.l.RLocker())
go tr.thread()
return tr
}
// Push will wake all the listeners for this trigger
func (t *triggerImpl) Push() {
atomic.AddUint32(&t.send, 1)
t.c.Broadcast()
}
// Close will close all the listeners for this trigger
func (t *triggerImpl) Close() error {
atomic.AddUint32(&t.closed, 1)
t.Push()
return nil
}
// Listen returns a listener object. Remember to release the object after you stop using it.
func (t *triggerImpl) Listen() *TriggerListener {
return t.ListenCap(t.Cap)
}
func (t *triggerImpl) ListenCap(capa uint) *TriggerListener {
c := make(chan struct{}, capa)
res := &TriggerListener{
C: c,
t: t,
}
runtime.SetFinalizer(res, releaseTriggerListener)
t.chLk.Lock()
defer t.chLk.Unlock()
t.ch[c] = c
return res
}
func releaseTriggerListener(o *TriggerListener) {
o.Release()
}
// Release will stop sending data to the channel for this trigger. The channel will not be
// closed howeveras Release() is assumed to be called when exiting the listening loop.
func (tl *TriggerListener) Release() {
t := tl.t
t.chLk.Lock()
defer t.chLk.Unlock()
delete(t.ch, tl.C)
}
var emptyStructVal = reflect.ValueOf(struct{}{})
// emit pushes a struct{}{} on all known channels
func (t *triggerImpl) emit() {
t.chLk.RLock()
defer t.chLk.RUnlock()
if len(t.ch) == 0 {
// do nothing
return
}
cases := make([]reflect.SelectCase, len(t.ch)+1)
cases[0].Dir = reflect.SelectDefault
n := 1
for _, l := range t.ch {
cases[n].Dir = reflect.SelectSend
cases[n].Chan = reflect.ValueOf(l)
cases[n].Send = emptyStructVal
n += 1
}
if len(cases) == 0 {
// do nothing
return
}
cnt := len(cases) - 1 // number of sends we expect, considering cases[0] is reserved for context timeout
for {
// (chosen int, recv Value, recvOK bool)
chosen, _, _ := reflect.Select(cases)
if chosen == 0 {
// default, meaning there is nothing ready to accept anymore
return
}
cnt -= 1
if cnt == 0 {
// all sends completed successfully
return
}
// set to nil & continue
cases[chosen].Chan = reflect.Value{}
}
}
// thread is a thread just running and that's all
func (t *triggerImpl) thread() {
t.l.RLock()
defer t.l.RUnlock()
for {
if atomic.LoadUint32(&t.closed) != 0 {
// closing!
t.chLk.Lock()
defer t.chLk.Unlock()
for _, c := range t.ch {
// close any still not released channel so they know this is the end
close(c)
}
clear(t.ch)
return
}
if atomic.LoadUint32(&t.send) == 0 {
// only wait if there is nothing to send
t.c.Wait()
continue
}
// at this point t.send is guaranteed to be >0
t.emit()
atomic.AddUint32(&t.send, ^uint32(0))
}
}