Skip to content

Commit ccaf148

Browse files
committed
[-] rewrite IPManager with channel logic, fixes #282
1 parent bf9fece commit ccaf148

File tree

4 files changed

+36
-63
lines changed

4 files changed

+36
-63
lines changed

checker/etcd_leader_checker.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -74,28 +74,28 @@ func getTransport(conf *vipconfig.Config) (*tls.Config, error) {
7474
func (elc *EtcdLeaderChecker) get(ctx context.Context, out chan<- bool) {
7575
resp, err := elc.Get(ctx, elc.TriggerKey)
7676
if err != nil {
77-
elc.Logger.Error("etcd error:", zap.Error(err))
77+
elc.Logger.Error("Failed to get etcd value:", zap.Error(err))
7878
out <- false
7979
return
8080
}
8181
for _, kv := range resp.Kvs {
82-
elc.Logger.Sugar().Info("current leader from DCS:", string(kv.Value))
82+
elc.Logger.Sugar().Info("Current leader from DCS:", string(kv.Value))
8383
out <- string(kv.Value) == elc.TriggerValue
8484
}
8585
}
8686

8787
// watch monitors the leader change from etcd
8888
func (elc *EtcdLeaderChecker) watch(ctx context.Context, out chan<- bool) error {
89+
elc.Logger.Sugar().Info("Setting WATCH on ", elc.TriggerKey)
8990
watchChan := elc.Watch(ctx, elc.TriggerKey)
90-
elc.Logger.Sugar().Info("set WATCH on ", elc.TriggerKey)
9191
for {
9292
select {
9393
case <-ctx.Done():
9494
return ctx.Err()
9595
case watchResp := <-watchChan:
9696
if watchResp.Canceled {
9797
watchChan = elc.Watch(ctx, elc.TriggerKey)
98-
elc.Logger.Sugar().Info("reset cancelled WATCH on ", elc.TriggerKey)
98+
elc.Logger.Sugar().Info("Resetting cancelled WATCH on ", elc.TriggerKey)
9999
continue
100100
}
101101
if err := watchResp.Err(); err != nil {
@@ -104,7 +104,7 @@ func (elc *EtcdLeaderChecker) watch(ctx context.Context, out chan<- bool) error
104104
}
105105
for _, event := range watchResp.Events {
106106
out <- string(event.Kv.Value) == elc.TriggerValue
107-
elc.Logger.Sugar().Info("current leader from DCS:", string(event.Kv.Value))
107+
elc.Logger.Sugar().Info("Current leader from DCS: ", string(event.Kv.Value))
108108
}
109109
}
110110
}

ipmanager/basicConfigurer_linux.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func (c *BasicConfigurer) arpSendGratuitous() error {
151151
time.Sleep(time.Duration(c.RetryAfter) * time.Millisecond)
152152
}
153153
if err != nil {
154-
log.Error("too many retries", err)
154+
log.Error("Too many retries", err)
155155
return err
156156
}
157157

ipmanager/basicConfigurer_windows.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ func (c *BasicConfigurer) configureAddress() bool {
1717
)
1818
iface, err := net.InterfaceByName(c.Iface.Name)
1919
if err != nil {
20-
log.Infof("Got error: %v", err)
20+
log.Error("Failed to access interface: ", err)
2121
return false
2222
}
2323
err = iphlpapi.AddIPAddress(ip, mask, uint32(iface.Index), &c.ntecontext, &nteinstance)
2424
if err != nil {
25-
log.Infof("Got error: %v", err)
25+
log.Error("Failed to add address: ", err)
2626
return false
2727
}
2828
// For now it is save to say that also working even if a
@@ -37,7 +37,7 @@ func (c *BasicConfigurer) deconfigureAddress() bool {
3737
log.Infof("Removing address %s on %s", c.getCIDR(), c.Iface.Name)
3838
err := iphlpapi.DeleteIPAddress(c.ntecontext)
3939
if err != nil {
40-
log.Error(err)
40+
log.Errorf("Failed to remove address %s: %v", c.getCIDR(), err)
4141
return false
4242
}
4343
return true

ipmanager/ip_manager.go

+27-54
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"context"
55
"net"
66
"net/netip"
7-
"sync"
7+
"sync/atomic"
88
"time"
99

1010
"github.com/cybertec-postgresql/vip-manager/vipconfig"
@@ -25,10 +25,9 @@ var log *zap.SugaredLogger = zap.L().Sugar()
2525
type IPManager struct {
2626
configurer ipConfigurer
2727

28-
states <-chan bool
29-
currentState bool
30-
stateLock sync.Mutex
31-
recheck *sync.Cond
28+
states <-chan bool
29+
shouldSetIPUp atomic.Bool
30+
recheckChan <-chan struct{}
3231
}
3332

3433
func getMask(vip netip.Addr, mask int) net.IPMask {
@@ -69,7 +68,7 @@ func NewIPManager(conf *vipconfig.Config, states <-chan bool) (m *IPManager, err
6968
states: states,
7069
}
7170
log = conf.Logger.Sugar()
72-
m.recheck = sync.NewCond(&m.stateLock)
71+
m.recheckChan = make(chan struct{})
7372
switch conf.HostingType {
7473
case "hetzner":
7574
m.configurer, err = newHetznerConfigurer(ipConf, conf.Verbose)
@@ -86,71 +85,45 @@ func NewIPManager(conf *vipconfig.Config, states <-chan bool) (m *IPManager, err
8685

8786
func (m *IPManager) applyLoop(ctx context.Context) {
8887
strUpDown := map[bool]string{true: "up", false: "down"}
89-
timeout := 0
9088
for {
91-
// Check if we should exit
9289
select {
9390
case <-ctx.Done():
94-
m.configurer.deconfigureAddress()
9591
return
96-
case <-time.After(time.Duration(timeout) * time.Second):
97-
actualState := m.configurer.queryAddress()
98-
m.stateLock.Lock()
99-
desiredState := m.currentState
100-
log.Infof("IP address %s state is %s, must be %s",
101-
m.configurer.getCIDR(),
102-
strUpDown[actualState],
103-
strUpDown[desiredState])
104-
if actualState != desiredState {
105-
m.stateLock.Unlock()
106-
var configureState bool
107-
if desiredState {
108-
configureState = m.configurer.configureAddress()
109-
} else {
110-
configureState = m.configurer.deconfigureAddress()
111-
}
112-
if !configureState {
113-
log.Error("Error while acquiring virtual ip for this machine")
114-
//Sleep a little bit to avoid busy waiting due to the for loop.
115-
timeout = 10
116-
} else {
117-
timeout = 0
118-
}
92+
case <-m.recheckChan: // signal to recheck
93+
case <-time.After(time.Duration(10) * time.Second): // recheck every 10 seconds
94+
}
95+
isIPUp := m.configurer.queryAddress()
96+
shouldSetIPUp := m.shouldSetIPUp.Load()
97+
log.Infof("IP address %s is %s, must be %s",
98+
m.configurer.getCIDR(),
99+
strUpDown[isIPUp],
100+
strUpDown[shouldSetIPUp])
101+
if isIPUp != shouldSetIPUp {
102+
var isOk bool
103+
if shouldSetIPUp {
104+
isOk = m.configurer.configureAddress()
119105
} else {
120-
// Wait for notification
121-
m.recheck.Wait()
122-
// Want to query actual state anyway, so unlock
123-
m.stateLock.Unlock()
106+
isOk = m.configurer.deconfigureAddress()
107+
}
108+
if !isOk {
109+
log.Error("Failed to configure virtual ip for this machine")
124110
}
125111
}
126112
}
127113
}
128114

129115
// SyncStates implements states synchronization
130116
func (m *IPManager) SyncStates(ctx context.Context, states <-chan bool) {
131-
ticker := time.NewTicker(10 * time.Second)
132-
133-
var wg sync.WaitGroup
134-
wg.Add(1)
135-
go func() {
136-
m.applyLoop(ctx)
137-
wg.Done()
138-
}()
139-
117+
go m.applyLoop(ctx)
140118
for {
141119
select {
142120
case newState := <-states:
143-
m.stateLock.Lock()
144-
if m.currentState != newState {
145-
m.currentState = newState
146-
m.recheck.Broadcast()
121+
if m.shouldSetIPUp.Load() != newState {
122+
m.shouldSetIPUp.Store(newState)
123+
<-m.recheckChan
147124
}
148-
m.stateLock.Unlock()
149-
case <-ticker.C:
150-
m.recheck.Broadcast()
151125
case <-ctx.Done():
152-
m.recheck.Broadcast()
153-
wg.Wait()
126+
m.configurer.deconfigureAddress()
154127
m.configurer.cleanupArp()
155128
return
156129
}

0 commit comments

Comments
 (0)