-
Notifications
You must be signed in to change notification settings - Fork 113
/
cluster_scanner.go
90 lines (78 loc) · 1.78 KB
/
cluster_scanner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package radix
import (
"strings"
)
type clusterScanner struct {
cluster *Cluster
opts ScanOpts
addrs []string
currScanner Scanner
lastErr error
}
// NewScanner will return a Scanner which will scan over every node in the
// cluster. This will panic if the ScanOpt's Command isn't "SCAN". For scanning
// operations other than "SCAN" (e.g. "HSCAN", "ZSCAN") use the normal
// NewScanner function.
//
// If the cluster topology changes during a scan the Scanner may or may not
// error out due to it, depending on the nature of the change.
func (c *Cluster) NewScanner(o ScanOpts) Scanner {
if strings.ToUpper(o.Command) != "SCAN" {
panic("Cluster.NewScanner can only perform SCAN operations")
}
var addrs []string
for _, node := range c.Topo().Primaries() {
addrs = append(addrs, node.Addr)
}
cs := &clusterScanner{
cluster: c,
opts: o,
addrs: addrs,
}
cs.nextScanner()
return cs
}
func (cs *clusterScanner) closeCurr() {
if cs.currScanner != nil {
if err := cs.currScanner.Close(); err != nil && cs.lastErr == nil {
cs.lastErr = err
}
cs.currScanner = nil
}
}
func (cs *clusterScanner) scannerForAddr(addr string) bool {
client, _ := cs.cluster.rpool(addr)
if client != nil {
cs.closeCurr()
cs.currScanner = NewScanner(client, cs.opts)
return true
}
return false
}
func (cs *clusterScanner) nextScanner() {
for {
if len(cs.addrs) == 0 {
cs.closeCurr()
return
}
addr := cs.addrs[0]
cs.addrs = cs.addrs[1:]
if cs.scannerForAddr(addr) {
return
}
}
}
func (cs *clusterScanner) Next(res *string) bool {
for {
if cs.currScanner == nil {
return false
} else if out := cs.currScanner.Next(res); out {
return true
}
cs.nextScanner()
}
}
func (cs *clusterScanner) Close() error {
cs.closeCurr()
return cs.lastErr
}