Skip to content

Commit

Permalink
[!] improve logging and add verbose support for all components
Browse files Browse the repository at this point in the history
[+] use `go.uber.org/zap` instead of standard `log` package
[*] move net related code from `main` to `ipmanager`
[*] user TriggerKey and TriggerValue names instead of Key and NodeName
  • Loading branch information
pashagolub committed Nov 14, 2024
1 parent 7ecae13 commit 27e4008
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 145 deletions.
11 changes: 5 additions & 6 deletions checker/consul_leader_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package checker

import (
"context"
"log"
"net/url"
"time"

Expand All @@ -17,15 +16,15 @@ type ConsulLeaderChecker struct {
apiClient *api.Client
}

//naming this cConf to avoid conflict with conf in etcd_leader_checker.go
// naming this cConf to avoid conflict with conf in etcd_leader_checker.go
var cConf *vipconfig.Config

// NewConsulLeaderChecker returns a new instance
func NewConsulLeaderChecker(con *vipconfig.Config) (*ConsulLeaderChecker, error) {
cConf = con
lc := &ConsulLeaderChecker{
key: cConf.Key,
nodename: cConf.Nodename,
key: cConf.TriggerKey,
nodename: cConf.TriggerValue,
}

url, err := url.Parse(cConf.Endpoints[0])
Expand Down Expand Up @@ -69,13 +68,13 @@ checkLoop:
if ctx.Err() != nil {
break checkLoop
}
log.Printf("consul error: %s", err)
cConf.Logger.Sugar().Error("consul error: ", err)
out <- false
time.Sleep(time.Duration(cConf.Interval) * time.Millisecond)
continue
}
if resp == nil {
log.Printf("Cannot get variable for key %s. Will try again in a second.", c.key)
cConf.Logger.Sugar().Errorf("Cannot get variable for key %s. Will try again in a second.", c.key)
out <- false
time.Sleep(time.Duration(cConf.Interval) * time.Millisecond)
continue
Expand Down
23 changes: 12 additions & 11 deletions checker/etcd_leader_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"log"
"os"
"time"

"github.com/cybertec-postgresql/vip-manager/vipconfig"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

// EtcdLeaderChecker is used to check state of the leader key in Etcd
Expand All @@ -32,6 +32,7 @@ func NewEtcdLeaderChecker(conf *vipconfig.Config) (*EtcdLeaderChecker, error) {
DialKeepAliveTime: time.Second,
Username: conf.EtcdUser,
Password: conf.EtcdPassword,
Logger: conf.Logger,
}
c, err := clientv3.New(cfg)
return &EtcdLeaderChecker{conf, c}, err
Expand Down Expand Up @@ -71,39 +72,39 @@ func getTransport(conf *vipconfig.Config) (*tls.Config, error) {

// get gets the current leader from etcd
func (elc *EtcdLeaderChecker) get(ctx context.Context, out chan<- bool) {
resp, err := elc.Get(ctx, elc.Key)
resp, err := elc.Get(ctx, elc.TriggerKey)
if err != nil {
log.Printf("etcd error: %s", err)
elc.Logger.Error("etcd error:", zap.Error(err))
out <- false
return
}
for _, kv := range resp.Kvs {
log.Printf("current leader from DCS: %s", kv.Value)
out <- string(kv.Value) == elc.Nodename
elc.Logger.Sugar().Info("current leader from DCS:", kv.Value)
out <- string(kv.Value) == elc.TriggerValue
}
}

// watch monitors the leader change from etcd
func (elc *EtcdLeaderChecker) watch(ctx context.Context, out chan<- bool) error {
watchChan := elc.Watch(ctx, elc.Key)
log.Println("set WATCH on " + elc.Key)
watchChan := elc.Watch(ctx, elc.TriggerKey)
elc.Logger.Sugar().Info("set WATCH on ", elc.TriggerKey)
for {
select {
case <-ctx.Done():
return ctx.Err()
case watchResp := <-watchChan:
if watchResp.Canceled {
watchChan = elc.Watch(ctx, elc.Key)
log.Println("reset cancelled WATCH on " + elc.Key)
watchChan = elc.Watch(ctx, elc.TriggerKey)
elc.Logger.Sugar().Info("reset cancelled WATCH on ", elc.TriggerKey)
continue
}
if err := watchResp.Err(); err != nil {
elc.get(ctx, out) // RPC failed, try to get the key directly to be on the safe side
continue
}
for _, event := range watchResp.Events {
out <- string(event.Kv.Value) == elc.Nodename
log.Printf("current leader from DCS: %s", event.Kv.Value)
out <- string(event.Kv.Value) == elc.TriggerValue
elc.Logger.Sugar().Info("current leader from DCS:", event.Kv.Value)
}
}
}
Expand Down
7 changes: 3 additions & 4 deletions checker/patroni_leader_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package checker

import (
"context"
"log"
"strconv"
"time"

Expand Down Expand Up @@ -30,12 +29,12 @@ func (c *PatroniLeaderChecker) GetChangeNotificationStream(ctx context.Context,
case <-ctx.Done():
return nil
case <-time.After(time.Duration(c.Interval) * time.Millisecond):
r, err := http.Get(c.Endpoints[0] + c.Key)
r, err := http.Get(c.Endpoints[0] + c.TriggerKey)
if err != nil {
log.Printf("patroni REST API error: %s", err)
c.Logger.Sugar().Error("patroni REST API error:", err)
continue
}
out <- strconv.Itoa(r.StatusCode) == c.Nodename
out <- strconv.Itoa(r.StatusCode) == c.TriggerValue
}
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.19.0
go.etcd.io/etcd/client/v3 v3.5.17
go.uber.org/zap v1.27.0
golang.org/x/sys v0.27.0
)

Expand Down Expand Up @@ -48,7 +49,6 @@ require (
go.etcd.io/etcd/api/v3 v3.5.17 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.17 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sync v0.8.0 // indirect
Expand Down
29 changes: 14 additions & 15 deletions ipmanager/basicConfigurer_linux.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ipmanager

import (
"log"
"net"
"os/exec"
"time"
Expand All @@ -22,11 +21,11 @@ var (
func (c *BasicConfigurer) configureAddress() bool {
if c.arpClient == nil {
if err := c.createArpClient(); err != nil {
log.Printf("Couldn't create an Arp client: %s", err)
log.Error("Couldn't create an Arp client:", err)
}
}

log.Printf("Configuring address %s on %s", c.getCIDR(), c.Iface.Name)
log.Infof("Configuring address %s on %s", c.getCIDR(), c.Iface.Name)

result := c.runAddressConfiguration("add")

Expand All @@ -42,7 +41,7 @@ func (c *BasicConfigurer) configureAddress() bool {

// deconfigureAddress drops virtual IP address
func (c *BasicConfigurer) deconfigureAddress() bool {
log.Printf("Removing address %s on %s", c.getCIDR(), c.Iface.Name)
log.Infof("Removing address %s on %s", c.getCIDR(), c.Iface.Name)
return c.runAddressConfiguration("delete")
}

Expand All @@ -54,12 +53,12 @@ func (c *BasicConfigurer) runAddressConfiguration(action string) bool {

switch err.(type) {
case *exec.ExitError:
log.Printf("Got error %s", output)
log.Infof("Got error %s", output)

return false
}
if err != nil {
log.Printf("Error running ip address %s %s on %s: %s",
log.Infof("Error running ip address %s %s on %s: %s",
action, c.VIP, c.Iface.Name, err)
return false
}
Expand All @@ -71,7 +70,7 @@ func (c *BasicConfigurer) createArpClient() (err error) {
if c.arpClient, err = arp.Dial(&c.Iface); err == nil {
return
}
log.Printf("Problems with producing the arp client: %s", err)
log.Infof("Problems with producing the arp client: %s", err)
time.Sleep(time.Duration(c.RetryAfter) * time.Millisecond)
}
return
Expand All @@ -86,7 +85,7 @@ func (c *BasicConfigurer) arpSendGratuitous() error {
* https://support.citrix.com/article/CTX112701
*/
if c.arpClient == nil {
log.Println("No arp client available, skip send gratuitous ARP")
log.Info("No arp client available, skip send gratuitous ARP")
return nil
}
gratuitousReplyPackage, err := arp.NewPacket(
Expand All @@ -97,7 +96,7 @@ func (c *BasicConfigurer) arpSendGratuitous() error {
c.VIP,
)
if err != nil {
log.Printf("Gratuitous arp reply package is malformed: %s", err)
log.Infof("Gratuitous arp reply package is malformed: %s", err)
return err
}

Expand All @@ -121,23 +120,23 @@ func (c *BasicConfigurer) arpSendGratuitous() error {
c.VIP,
)
if err != nil {
log.Printf("Gratuitous arp request package is malformed: %s", err)
log.Infof("Gratuitous arp request package is malformed: %s", err)
return err
}

for i := 0; i < c.RetryNum; i++ {
errReply := c.arpClient.WriteTo(gratuitousReplyPackage, ethernetBroadcast)
if err != nil {
log.Printf("Couldn't write to the arpClient: %s", errReply)
log.Error("Couldn't write to the arpClient:", errReply)
} else {
log.Println("Sent gratuitous ARP reply")
log.Info("Sent gratuitous ARP reply")
}

errRequest := c.arpClient.WriteTo(gratuitousRequestPackage, ethernetBroadcast)
if err != nil {
log.Printf("Couldn't write to the arpClient: %s", errRequest)
log.Error("Couldn't write to the arpClient:", errRequest)
} else {
log.Println("Sent gratuitous ARP request")
log.Info("Sent gratuitous ARP request")
}

if errReply != nil || errRequest != nil {
Expand All @@ -152,7 +151,7 @@ func (c *BasicConfigurer) arpSendGratuitous() error {
time.Sleep(time.Duration(c.RetryAfter) * time.Millisecond)
}
if err != nil {
log.Print("too many retries")
log.Error("too many retries", err)
return err
}

Expand Down
11 changes: 5 additions & 6 deletions ipmanager/basicConfigurer_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,27 @@ package ipmanager

import (
"encoding/binary"
"log"
"net"

"github.com/cybertec-postgresql/vip-manager/iphlpapi"
)

// configureAddress assigns virtual IP address
func (c *BasicConfigurer) configureAddress() bool {
log.Printf("Configuring address %s on %s", c.getCIDR(), c.Iface.Name)
log.Infof("Configuring address %s on %s", c.getCIDR(), c.Iface.Name)
var (
ip = binary.LittleEndian.Uint32(c.VIP.AsSlice())
mask = binary.LittleEndian.Uint32(c.Netmask)
nteinstance uint32
)
iface, err := net.InterfaceByName(c.Iface.Name)
if err != nil {
log.Printf("Got error: %v", err)
log.Infof("Got error: %v", err)
return false
}
err = iphlpapi.AddIPAddress(ip, mask, uint32(iface.Index), &c.ntecontext, &nteinstance)
if err != nil {
log.Printf("Got error: %v", err)
log.Infof("Got error: %v", err)
return false
}
// For now it is save to say that also working even if a
Expand All @@ -35,10 +34,10 @@ func (c *BasicConfigurer) configureAddress() bool {

// deconfigureAddress drops virtual IP address
func (c *BasicConfigurer) deconfigureAddress() bool {
log.Printf("Removing address %s on %s", c.getCIDR(), c.Iface.Name)
log.Infof("Removing address %s on %s", c.getCIDR(), c.Iface.Name)
err := iphlpapi.DeleteIPAddress(c.ntecontext)
if err != nil {
log.Printf("Got error: %v", err)
log.Error(err)
return false
}
return true
Expand Down
Loading

0 comments on commit 27e4008

Please sign in to comment.