Skip to content

Commit 685f943

Browse files
committed
Fix backoff
1 parent 764edfa commit 685f943

File tree

1 file changed

+77
-54
lines changed

1 file changed

+77
-54
lines changed

chain/chain.go

Lines changed: 77 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,13 @@ type BridgeChain struct {
2020
DialerFunc func(dialer bridge.Dialer) bridge.Dialer
2121
proto map[string]bridge.Bridger
2222
defaultProto bridge.Bridger
23-
24-
backoffCount map[string]uint64
25-
mutex sync.Mutex
2623
}
2724

2825
// NewBridgeChain create a new BridgeChain.
2926
func NewBridgeChain() *BridgeChain {
3027
return &BridgeChain{
31-
proto: map[string]bridge.Bridger{},
32-
DialerFunc: NewEnvDialer,
33-
backoffCount: map[string]uint64{},
28+
proto: map[string]bridge.Bridger{},
29+
DialerFunc: NewEnvDialer,
3430
}
3531
}
3632

@@ -40,10 +36,8 @@ func (b *BridgeChain) BridgeChain(ctx context.Context, dialer bridge.Dialer, add
4036
return dialer, nil
4137
}
4238
address := addresses[len(addresses)-1]
43-
d, err := b.multiDial(dialer, strings.Split(address, "|"))
44-
if err != nil {
45-
return nil, err
46-
}
39+
d := b.multiDial(dialer, strings.Split(address, "|"))
40+
4741
addresses = addresses[:len(addresses)-1]
4842
if len(addresses) == 0 {
4943
return d, nil
@@ -70,24 +64,17 @@ func (b *BridgeChain) bridgeChainWithConfig(ctx context.Context, dialer bridge.D
7064
return dialer, nil
7165
}
7266
address := addresses[len(addresses)-1]
73-
d, err := b.multiDial(dialer, address.LB)
74-
if err != nil {
75-
return nil, err
76-
}
67+
d := b.multiDial(dialer, address.LB)
68+
7769
addresses = addresses[:len(addresses)-1]
7870
if len(addresses) == 0 {
7971
return d, nil
8072
}
8173
return b.bridgeChainWithConfig(ctx, d, addresses...)
8274
}
8375

84-
func (b *BridgeChain) multiDial(dialer bridge.Dialer, addresses []string) (bridge.Dialer, error) {
85-
useCount := &backoffManager{
86-
addresses: addresses,
87-
dialer: dialer,
88-
bc: b,
89-
}
90-
return useCount, nil
76+
func (b *BridgeChain) multiDial(dialer bridge.Dialer, addresses []string) bridge.Dialer {
77+
return newBackoffManager(dialer, b.singleDial, addresses)
9178
}
9279

9380
func (b *BridgeChain) singleDial(ctx context.Context, dialer bridge.Dialer, address string) (bridge.Dialer, error) {
@@ -118,60 +105,96 @@ func (b *BridgeChain) RegisterDefault(bridger bridge.Bridger) {
118105

119106
type backoffManager struct {
120107
addresses []string
121-
dialer bridge.Dialer
108+
dialers []bridge.Dialer
109+
110+
baseDialer bridge.Dialer
111+
112+
bridgeFunc bridge.BridgeFunc
122113

123-
bc *BridgeChain
114+
backoffCount map[int]uint64
115+
116+
mut sync.Mutex
124117
}
125118

126-
func (u *backoffManager) useLeastAndCount(addresses []string) string {
127-
if len(addresses) == 1 {
128-
return addresses[0]
119+
func newBackoffManager(baseDialer bridge.Dialer, bridgeFunc bridge.BridgeFunc, addresses []string) *backoffManager {
120+
return &backoffManager{
121+
addresses: addresses,
122+
dialers: make([]bridge.Dialer, len(addresses)),
123+
baseDialer: baseDialer,
124+
bridgeFunc: bridgeFunc,
125+
backoffCount: map[int]uint64{},
129126
}
127+
}
128+
129+
func (u *backoffManager) useLeastIndex() int {
130130
min := uint64(math.MaxUint64)
131131

132-
u.bc.mutex.Lock()
133-
defer u.bc.mutex.Unlock()
132+
var index int
133+
for i := range u.addresses {
134+
if u.backoffCount[i] < min {
135+
min = u.backoffCount[i]
136+
index = i
137+
}
138+
}
139+
140+
u.backoffCount[index]++
134141

135-
var minAddress string
136-
for _, address := range addresses {
137-
if u.bc.backoffCount[address] < min {
138-
min = u.bc.backoffCount[address]
139-
minAddress = address
142+
if min > math.MaxInt32 {
143+
for i := range u.backoffCount {
144+
u.backoffCount[i] -= math.MaxInt32
140145
}
141146
}
142-
u.bc.backoffCount[minAddress]++
143-
return minAddress
147+
return index
144148
}
145149

146-
func (u *backoffManager) backoff(address string, count uint64) {
147-
u.bc.mutex.Lock()
148-
defer u.bc.mutex.Unlock()
149-
u.bc.backoffCount[address] += count
150+
func (u *backoffManager) backoff(index int, count uint64) {
151+
u.mut.Lock()
152+
defer u.mut.Unlock()
153+
u.backoffCount[index] += count
150154
}
151155

152-
func (u *backoffManager) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
153-
var errs []error
156+
func (u *backoffManager) dialContext(ctx context.Context, network, address string) (net.Conn, error) {
157+
u.mut.Lock()
158+
index := u.useLeastIndex()
159+
addr := u.addresses[index]
160+
dialer := u.dialers[index]
161+
u.mut.Unlock()
154162

155-
tryTimes := len(u.addresses)
156-
for i := 0; i < tryTimes; i++ {
157-
addr := u.useLeastAndCount(u.addresses)
158-
dialer, err := u.bc.singleDial(ctx, u.dialer, addr)
163+
if dialer == nil {
164+
d, err := u.bridgeFunc(ctx, u.baseDialer, addr)
159165
if err != nil {
160-
errs = append(errs, err)
161166
logger.Std.Warn("failed dial", "err", err, "previous", addr)
162-
u.backoff(addr, 16)
163-
continue
167+
u.backoff(index, 16)
168+
return nil, err
164169
}
165-
conn, err := dialer.DialContext(ctx, network, address)
170+
dialer = d
171+
172+
u.mut.Lock()
173+
u.dialers[index] = d
174+
u.mut.Unlock()
175+
}
176+
177+
conn, err := dialer.DialContext(ctx, network, address)
178+
if err != nil {
179+
logger.Std.Warn("failed dial target", "err", err, "previous", addr, "target", address)
180+
u.backoff(index, 8)
181+
return nil, err
182+
}
183+
184+
logger.Std.Info("success dial target", "previous", addr, "target", address)
185+
return conn, nil
186+
}
187+
188+
func (u *backoffManager) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
189+
var errs []error
190+
tryTimes := len(u.addresses)/2 + 1
191+
for i := 0; i < tryTimes; i++ {
192+
conn, err := u.dialContext(ctx, network, address)
166193
if err != nil {
167194
errs = append(errs, err)
168-
logger.Std.Warn("failed dial target", "err", err, "previous", addr, "target", address)
169-
u.backoff(addr, 8)
170195
continue
171196
}
172-
173-
logger.Std.Info("success dial target", "previous", addr, "target", address)
174197
return conn, nil
175198
}
176-
return nil, fmt.Errorf("all addresses are failed: %w", errors.Join(errs...))
199+
return nil, errors.Join(errs...)
177200
}

0 commit comments

Comments
 (0)