From 8a094cd2a5c903af41b7a70c5c915ecdb6cc8b74 Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Thu, 14 Nov 2024 15:56:07 +0100 Subject: [PATCH] [*] refactor `ConsulLeaderChecker` --- checker/consul_leader_checker.go | 49 ++++++++++++-------------------- 1 file changed, 18 insertions(+), 31 deletions(-) diff --git a/checker/consul_leader_checker.go b/checker/consul_leader_checker.go index a63cf76..9eefe5c 100644 --- a/checker/consul_leader_checker.go +++ b/checker/consul_leader_checker.go @@ -1,7 +1,9 @@ package checker import ( + "cmp" "context" + "fmt" "net/url" "time" @@ -11,51 +13,36 @@ import ( // ConsulLeaderChecker is used to check state of the leader key in Consul type ConsulLeaderChecker struct { - key string - nodename string - apiClient *api.Client + *vipconfig.Config + *api.Client } -// naming this cConf to avoid conflict with conf in etcd_leader_checker.go -var cConf *vipconfig.Config - // NewConsulLeaderChecker returns a new instance -func NewConsulLeaderChecker(con *vipconfig.Config) (*ConsulLeaderChecker, error) { - cConf = con - lc := &ConsulLeaderChecker{ - key: cConf.TriggerKey, - nodename: cConf.TriggerValue, - } +func NewConsulLeaderChecker(con *vipconfig.Config) (lc *ConsulLeaderChecker, err error) { + lc = &ConsulLeaderChecker{Config: con} - url, err := url.Parse(cConf.Endpoints[0]) + url, err := url.Parse(con.Endpoints[0]) if err != nil { return nil, err } - address := url.Hostname() + ":" + url.Port() config := &api.Config{ - Address: address, + Address: fmt.Sprintf("%s:%s", url.Hostname(), url.Port()), Scheme: url.Scheme, WaitTime: time.Second, + Token: cmp.Or(con.ConsulToken, ""), } - if cConf.ConsulToken != "" { - config.Token = cConf.ConsulToken - } - - apiClient, err := api.NewClient(config) - if err != nil { + if lc.Client, err = api.NewClient(config); err != nil { return nil, err } - lc.apiClient = apiClient - return lc, nil } // GetChangeNotificationStream checks the status in the loop func (c *ConsulLeaderChecker) GetChangeNotificationStream(ctx context.Context, out chan<- bool) error { - kv := c.apiClient.KV() + kv := c.Client.KV() queryOptions := &api.QueryOptions{ RequireConsistent: true, @@ -63,31 +50,31 @@ func (c *ConsulLeaderChecker) GetChangeNotificationStream(ctx context.Context, o checkLoop: for { - resp, _, err := kv.Get(c.key, queryOptions) + resp, _, err := kv.Get(c.TriggerKey, queryOptions) if err != nil { if ctx.Err() != nil { break checkLoop } - cConf.Logger.Sugar().Error("consul error: ", err) + c.Logger.Sugar().Error("consul error: ", err) out <- false - time.Sleep(time.Duration(cConf.Interval) * time.Millisecond) + time.Sleep(time.Duration(c.Interval) * time.Millisecond) continue } if resp == nil { - cConf.Logger.Sugar().Errorf("Cannot get variable for key %s. Will try again in a second.", c.key) + c.Logger.Sugar().Errorf("Cannot get variable for key %s. Will try again in a second.", c.TriggerKey) out <- false - time.Sleep(time.Duration(cConf.Interval) * time.Millisecond) + time.Sleep(time.Duration(c.Interval) * time.Millisecond) continue } - state := string(resp.Value) == c.nodename + state := string(resp.Value) == c.TriggerValue queryOptions.WaitIndex = resp.ModifyIndex select { case <-ctx.Done(): break checkLoop case out <- state: - time.Sleep(time.Duration(cConf.Interval) * time.Millisecond) + time.Sleep(time.Duration(c.Interval) * time.Millisecond) continue } }