-
Notifications
You must be signed in to change notification settings - Fork 86
/
canceller.go
89 lines (73 loc) · 2.34 KB
/
canceller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package worker
import "sync"
type CancellationCommand struct {
JobID uint64
Reason string
}
// A CancellationBroadcaster allows you to subscribe to and unsubscribe from
// cancellation messages for a given job ID.
type CancellationBroadcaster struct {
registryMutex sync.Mutex
registry map[uint64][](chan CancellationCommand)
}
// NewCancellationBroadcaster sets up a new cancellation broadcaster with an
// empty registry.
func NewCancellationBroadcaster() *CancellationBroadcaster {
return &CancellationBroadcaster{
registry: make(map[uint64][](chan CancellationCommand)),
}
}
// Broadcast broacasts a cancellation message to all currently subscribed
// cancellers.
func (cb *CancellationBroadcaster) Broadcast(command CancellationCommand) {
cb.registryMutex.Lock()
defer cb.registryMutex.Unlock()
chans := cb.registry[command.JobID]
delete(cb.registry, command.JobID)
for _, ch := range chans {
ch <- command
close(ch)
}
}
// Subscribe will set up a subscription for cancellation messages for the
// given job ID. When a cancellation message comes in, the returned channel
// will be closed.
func (cb *CancellationBroadcaster) Subscribe(id uint64) <-chan CancellationCommand {
cb.registryMutex.Lock()
defer cb.registryMutex.Unlock()
if _, ok := cb.registry[id]; !ok {
cb.registry[id] = make([](chan CancellationCommand), 0, 1)
}
ch := make(chan CancellationCommand, 1)
cb.registry[id] = append(cb.registry[id], ch)
return ch
}
// Unsubscribe removes an existing subscription for the channel.
func (cb *CancellationBroadcaster) Unsubscribe(id uint64, ch <-chan CancellationCommand) {
cb.registryMutex.Lock()
defer cb.registryMutex.Unlock()
// If there's no registered channels for the given ID, just return
if _, ok := cb.registry[id]; !ok {
return
}
// If there's only one element, remove the key
if len(cb.registry[id]) <= 1 {
delete(cb.registry, id)
return
}
var chanIndex int = -1
for i, registeredChan := range cb.registry[id] {
if registeredChan == ch {
chanIndex = i
break
}
}
if chanIndex == -1 {
// Channel is already removed
return
}
// Remove element at index by putting the last element in that place, and
// then shrinking the slice to remove the last element.
cb.registry[id][chanIndex] = cb.registry[id][len(cb.registry[id])-1]
cb.registry[id] = cb.registry[id][:len(cb.registry[id])-1]
}