1
1
package checker
2
2
3
3
import (
4
+ "cmp"
4
5
"context"
6
+ "fmt"
5
7
"net/url"
6
8
"time"
7
9
@@ -11,83 +13,68 @@ import (
11
13
12
14
// ConsulLeaderChecker is used to check state of the leader key in Consul
13
15
type ConsulLeaderChecker struct {
14
- key string
15
- nodename string
16
- apiClient * api.Client
16
+ * vipconfig.Config
17
+ * api.Client
17
18
}
18
19
19
- // naming this cConf to avoid conflict with conf in etcd_leader_checker.go
20
- var cConf * vipconfig.Config
21
-
22
20
// NewConsulLeaderChecker returns a new instance
23
- func NewConsulLeaderChecker (con * vipconfig.Config ) (* ConsulLeaderChecker , error ) {
24
- cConf = con
25
- lc := & ConsulLeaderChecker {
26
- key : cConf .TriggerKey ,
27
- nodename : cConf .TriggerValue ,
28
- }
21
+ func NewConsulLeaderChecker (con * vipconfig.Config ) (lc * ConsulLeaderChecker , err error ) {
22
+ lc = & ConsulLeaderChecker {Config : con }
29
23
30
- url , err := url .Parse (cConf .Endpoints [0 ])
24
+ url , err := url .Parse (con .Endpoints [0 ])
31
25
if err != nil {
32
26
return nil , err
33
27
}
34
- address := url .Hostname () + ":" + url .Port ()
35
28
36
29
config := & api.Config {
37
- Address : address ,
30
+ Address : fmt . Sprintf ( "%s:%s" , url . Hostname (), url . Port ()) ,
38
31
Scheme : url .Scheme ,
39
32
WaitTime : time .Second ,
33
+ Token : cmp .Or (con .ConsulToken , "" ),
40
34
}
41
35
42
- if cConf .ConsulToken != "" {
43
- config .Token = cConf .ConsulToken
44
- }
45
-
46
- apiClient , err := api .NewClient (config )
47
- if err != nil {
36
+ if lc .Client , err = api .NewClient (config ); err != nil {
48
37
return nil , err
49
38
}
50
39
51
- lc .apiClient = apiClient
52
-
53
40
return lc , nil
54
41
}
55
42
56
43
// GetChangeNotificationStream checks the status in the loop
57
44
func (c * ConsulLeaderChecker ) GetChangeNotificationStream (ctx context.Context , out chan <- bool ) error {
58
- kv := c .apiClient .KV ()
45
+ kv := c .Client .KV ()
59
46
60
47
queryOptions := & api.QueryOptions {
61
48
RequireConsistent : true ,
62
49
}
63
50
64
51
checkLoop:
65
52
for {
66
- resp , _ , err := kv .Get (c .key , queryOptions )
53
+ resp , _ , err := kv .Get (c .TriggerKey , queryOptions )
67
54
if err != nil {
68
55
if ctx .Err () != nil {
69
56
break checkLoop
70
57
}
71
- cConf .Logger .Sugar ().Error ("consul error: " , err )
58
+ c .Logger .Sugar ().Error ("consul error: " , err )
72
59
out <- false
73
- time .Sleep (time .Duration (cConf .Interval ) * time .Millisecond )
60
+ time .Sleep (time .Duration (c .Interval ) * time .Millisecond )
74
61
continue
75
62
}
76
63
if resp == nil {
77
- cConf .Logger .Sugar ().Errorf ("Cannot get variable for key %s. Will try again in a second." , c .key )
64
+ c .Logger .Sugar ().Errorf ("Cannot get variable for key %s. Will try again in a second." , c .TriggerKey )
78
65
out <- false
79
- time .Sleep (time .Duration (cConf .Interval ) * time .Millisecond )
66
+ time .Sleep (time .Duration (c .Interval ) * time .Millisecond )
80
67
continue
81
68
}
82
69
83
- state := string (resp .Value ) == c .nodename
70
+ state := string (resp .Value ) == c .TriggerValue
84
71
queryOptions .WaitIndex = resp .ModifyIndex
85
72
86
73
select {
87
74
case <- ctx .Done ():
88
75
break checkLoop
89
76
case out <- state :
90
- time .Sleep (time .Duration (cConf .Interval ) * time .Millisecond )
77
+ time .Sleep (time .Duration (c .Interval ) * time .Millisecond )
91
78
continue
92
79
}
93
80
}
0 commit comments