Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[-] rewrite IPManager with channel logic, fixes #282 #283

Merged
merged 1 commit into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions checker/etcd_leader_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,28 +74,28 @@ func getTransport(conf *vipconfig.Config) (*tls.Config, error) {
func (elc *EtcdLeaderChecker) get(ctx context.Context, out chan<- bool) {
resp, err := elc.Get(ctx, elc.TriggerKey)
if err != nil {
elc.Logger.Error("etcd error:", zap.Error(err))
elc.Logger.Error("Failed to get etcd value:", zap.Error(err))
out <- false
return
}
for _, kv := range resp.Kvs {
elc.Logger.Sugar().Info("current leader from DCS:", string(kv.Value))
elc.Logger.Sugar().Info("Current leader from DCS:", string(kv.Value))
out <- string(kv.Value) == elc.TriggerValue
}
}

// watch monitors the leader change from etcd
func (elc *EtcdLeaderChecker) watch(ctx context.Context, out chan<- bool) error {
elc.Logger.Sugar().Info("Setting WATCH on ", elc.TriggerKey)
watchChan := elc.Watch(ctx, elc.TriggerKey)
elc.Logger.Sugar().Info("set WATCH on ", elc.TriggerKey)
for {
select {
case <-ctx.Done():
return ctx.Err()
case watchResp := <-watchChan:
if watchResp.Canceled {
watchChan = elc.Watch(ctx, elc.TriggerKey)
elc.Logger.Sugar().Info("reset cancelled WATCH on ", elc.TriggerKey)
elc.Logger.Sugar().Info("Resetting cancelled WATCH on ", elc.TriggerKey)
continue
}
if err := watchResp.Err(); err != nil {
Expand All @@ -104,7 +104,7 @@ func (elc *EtcdLeaderChecker) watch(ctx context.Context, out chan<- bool) error
}
for _, event := range watchResp.Events {
out <- string(event.Kv.Value) == elc.TriggerValue
elc.Logger.Sugar().Info("current leader from DCS:", string(event.Kv.Value))
elc.Logger.Sugar().Info("Current leader from DCS: ", string(event.Kv.Value))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion ipmanager/basicConfigurer_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (c *BasicConfigurer) arpSendGratuitous() error {
time.Sleep(time.Duration(c.RetryAfter) * time.Millisecond)
}
if err != nil {
log.Error("too many retries", err)
log.Error("Too many retries", err)
return err
}

Expand Down
6 changes: 3 additions & 3 deletions ipmanager/basicConfigurer_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ func (c *BasicConfigurer) configureAddress() bool {
)
iface, err := net.InterfaceByName(c.Iface.Name)
if err != nil {
log.Infof("Got error: %v", err)
log.Error("Failed to access interface: ", err)
return false
}
err = iphlpapi.AddIPAddress(ip, mask, uint32(iface.Index), &c.ntecontext, &nteinstance)
if err != nil {
log.Infof("Got error: %v", err)
log.Error("Failed to add address: ", err)
return false
}
// For now it is save to say that also working even if a
Expand All @@ -37,7 +37,7 @@ func (c *BasicConfigurer) deconfigureAddress() bool {
log.Infof("Removing address %s on %s", c.getCIDR(), c.Iface.Name)
err := iphlpapi.DeleteIPAddress(c.ntecontext)
if err != nil {
log.Error(err)
log.Errorf("Failed to remove address %s: %v", c.getCIDR(), err)
return false
}
return true
Expand Down
85 changes: 29 additions & 56 deletions ipmanager/ip_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"net"
"net/netip"
"sync"
"sync/atomic"
"time"

"github.com/cybertec-postgresql/vip-manager/vipconfig"
Expand All @@ -25,10 +25,9 @@ var log *zap.SugaredLogger = zap.L().Sugar()
type IPManager struct {
configurer ipConfigurer

states <-chan bool
currentState bool
stateLock sync.Mutex
recheck *sync.Cond
states <-chan bool
shouldSetIPUp atomic.Bool
recheckChan chan struct{}
}

func getMask(vip netip.Addr, mask int) net.IPMask {
Expand Down Expand Up @@ -69,7 +68,7 @@ func NewIPManager(conf *vipconfig.Config, states <-chan bool) (m *IPManager, err
states: states,
}
log = conf.Logger.Sugar()
m.recheck = sync.NewCond(&m.stateLock)
m.recheckChan = make(chan struct{})
switch conf.HostingType {
case "hetzner":
m.configurer, err = newHetznerConfigurer(ipConf, conf.Verbose)
Expand All @@ -86,71 +85,45 @@ func NewIPManager(conf *vipconfig.Config, states <-chan bool) (m *IPManager, err

func (m *IPManager) applyLoop(ctx context.Context) {
strUpDown := map[bool]string{true: "up", false: "down"}
timeout := 0
for {
// Check if we should exit
isIPUp := m.configurer.queryAddress()
shouldSetIPUp := m.shouldSetIPUp.Load()
log.Infof("IP address %s is %s, must be %s",
m.configurer.getCIDR(),
strUpDown[isIPUp],
strUpDown[shouldSetIPUp])
if isIPUp != shouldSetIPUp {
var isOk bool
if shouldSetIPUp {
isOk = m.configurer.configureAddress()
} else {
isOk = m.configurer.deconfigureAddress()
}
if !isOk {
log.Error("Failed to configure virtual ip for this machine")
}
}
select {
case <-ctx.Done():
m.configurer.deconfigureAddress()
return
case <-time.After(time.Duration(timeout) * time.Second):
actualState := m.configurer.queryAddress()
m.stateLock.Lock()
desiredState := m.currentState
log.Infof("IP address %s state is %s, must be %s",
m.configurer.getCIDR(),
strUpDown[actualState],
strUpDown[desiredState])
if actualState != desiredState {
m.stateLock.Unlock()
var configureState bool
if desiredState {
configureState = m.configurer.configureAddress()
} else {
configureState = m.configurer.deconfigureAddress()
}
if !configureState {
log.Error("Error while acquiring virtual ip for this machine")
//Sleep a little bit to avoid busy waiting due to the for loop.
timeout = 10
} else {
timeout = 0
}
} else {
// Wait for notification
m.recheck.Wait()
// Want to query actual state anyway, so unlock
m.stateLock.Unlock()
}
case <-m.recheckChan: // signal to recheck
case <-time.After(time.Duration(10) * time.Second): // recheck every 10 seconds
}
}
}

// SyncStates implements states synchronization
func (m *IPManager) SyncStates(ctx context.Context, states <-chan bool) {
ticker := time.NewTicker(10 * time.Second)

var wg sync.WaitGroup
wg.Add(1)
go func() {
m.applyLoop(ctx)
wg.Done()
}()

go m.applyLoop(ctx)
for {
select {
case newState := <-states:
m.stateLock.Lock()
if m.currentState != newState {
m.currentState = newState
m.recheck.Broadcast()
if m.shouldSetIPUp.Load() != newState {
m.shouldSetIPUp.Store(newState)
m.recheckChan <- struct{}{}
}
m.stateLock.Unlock()
case <-ticker.C:
m.recheck.Broadcast()
case <-ctx.Done():
m.recheck.Broadcast()
wg.Wait()
m.configurer.deconfigureAddress()
m.configurer.cleanupArp()
return
}
Expand Down
Loading