Skip to content

Commit 146a7d3

Browse files
committed
Refactor the polling and log
1 parent 56d5d02 commit 146a7d3

File tree

9 files changed

+96
-51
lines changed

9 files changed

+96
-51
lines changed

chain/bridge.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,6 @@ func (b *Bridge) bridgeStream(ctx context.Context, listenConfig bridge.ListenCon
181181
raw = idle.NewIdleConn(raw, idleTimeout)
182182
}
183183
backoff = time.Second / 10
184-
b.logger.Info("Connect", "remote_address", raw.RemoteAddr().String())
185184
go b.stepIgnoreErr(ctx, dialer, raw, dials)
186185
}
187186
}(i, l)
@@ -289,7 +288,6 @@ func (b *Bridge) bridgeProxy(ctx context.Context, listenConfig bridge.ListenConf
289288
raw = idle.NewIdleConn(raw, idleTimeout)
290289
}
291290
backoff = time.Second / 10
292-
b.logger.Info("Connect", "remote_address", raw.RemoteAddr().String())
293291
go h.ServeConn(raw)
294292
}
295293
}(i, host)

chain/chain.go

Lines changed: 80 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,35 @@ package chain
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
6-
"math/rand"
7+
"math"
8+
"net"
79
"strings"
10+
"sync"
811

912
"github.com/wzshiming/bridge"
1013
"github.com/wzshiming/bridge/config"
1114
"github.com/wzshiming/bridge/internal/scheme"
12-
"github.com/wzshiming/schedialer"
13-
"github.com/wzshiming/schedialer/plugins/probe"
14-
"github.com/wzshiming/schedialer/plugins/roundrobin"
15+
"github.com/wzshiming/bridge/logger"
1516
)
1617

1718
// BridgeChain is a bridger that supports multiple crossing of bridger.
1819
type BridgeChain struct {
1920
DialerFunc func(dialer bridge.Dialer) bridge.Dialer
2021
proto map[string]bridge.Bridger
2122
defaultProto bridge.Bridger
23+
24+
backoffCount map[string]uint64
25+
mutex sync.Mutex
2226
}
2327

2428
// NewBridgeChain create a new BridgeChain.
2529
func NewBridgeChain() *BridgeChain {
2630
return &BridgeChain{
27-
proto: map[string]bridge.Bridger{},
28-
DialerFunc: NewEnvDialer,
31+
proto: map[string]bridge.Bridger{},
32+
DialerFunc: NewEnvDialer,
33+
backoffCount: map[string]uint64{},
2934
}
3035
}
3136

@@ -35,7 +40,7 @@ func (b *BridgeChain) BridgeChain(ctx context.Context, dialer bridge.Dialer, add
3540
return dialer, nil
3641
}
3742
address := addresses[len(addresses)-1]
38-
d, err := b.Dial(ctx, dialer, strings.Split(address, "|"), "")
43+
d, err := b.multiDial(dialer, strings.Split(address, "|"))
3944
if err != nil {
4045
return nil, err
4146
}
@@ -65,7 +70,7 @@ func (b *BridgeChain) bridgeChainWithConfig(ctx context.Context, dialer bridge.D
6570
return dialer, nil
6671
}
6772
address := addresses[len(addresses)-1]
68-
d, err := b.Dial(ctx, dialer, address.LB, address.Probe)
73+
d, err := b.multiDial(dialer, address.LB)
6974
if err != nil {
7075
return nil, err
7176
}
@@ -76,32 +81,16 @@ func (b *BridgeChain) bridgeChainWithConfig(ctx context.Context, dialer bridge.D
7681
return b.bridgeChainWithConfig(ctx, d, addresses...)
7782
}
7883

79-
func (b *BridgeChain) Dial(ctx context.Context, dialer bridge.Dialer, addresses []string, probeUrl string) (bridge.Dialer, error) {
80-
if len(addresses) == 1 {
81-
return b.dialOne(ctx, dialer, addresses[0])
82-
}
83-
plugins := []schedialer.Plugin{
84-
roundrobin.NewRoundRobinWithIndex(100, rand.Uint64()%uint64(len(addresses))),
85-
}
86-
if probeUrl != "" {
87-
plugins = append(plugins, probe.NewProbe(100, probeUrl))
88-
}
89-
plugin := schedialer.NewPlugins(plugins...)
90-
for _, address := range addresses {
91-
dial, err := b.dialOne(ctx, dialer, address)
92-
if err != nil {
93-
return nil, err
94-
}
95-
proxy := schedialer.Proxy{
96-
Name: address,
97-
Dialer: dial,
98-
}
99-
plugin.AddProxy(ctx, &proxy)
84+
func (b *BridgeChain) multiDial(dialer bridge.Dialer, addresses []string) (bridge.Dialer, error) {
85+
useCount := &backoffManager{
86+
addresses: addresses,
87+
dialer: dialer,
88+
bc: b,
10089
}
101-
return schedialer.NewSchedialer(plugin), nil
90+
return useCount, nil
10291
}
10392

104-
func (b *BridgeChain) dialOne(ctx context.Context, dialer bridge.Dialer, address string) (bridge.Dialer, error) {
93+
func (b *BridgeChain) singleDial(ctx context.Context, dialer bridge.Dialer, address string) (bridge.Dialer, error) {
10594
sch, _, ok := scheme.SplitSchemeAddr(address)
10695
if !ok {
10796
return nil, fmt.Errorf("unsupported protocol format %q", address)
@@ -126,3 +115,63 @@ func (b *BridgeChain) Register(name string, bridger bridge.Bridger) error {
126115
func (b *BridgeChain) RegisterDefault(bridger bridge.Bridger) {
127116
b.defaultProto = bridger
128117
}
118+
119+
type backoffManager struct {
120+
addresses []string
121+
dialer bridge.Dialer
122+
123+
bc *BridgeChain
124+
}
125+
126+
func (u *backoffManager) useLeastAndCount(addresses []string) string {
127+
if len(addresses) == 1 {
128+
return addresses[0]
129+
}
130+
min := uint64(math.MaxUint64)
131+
132+
u.bc.mutex.Lock()
133+
defer u.bc.mutex.Unlock()
134+
135+
var minAddress string
136+
for _, address := range addresses {
137+
if u.bc.backoffCount[address] < min {
138+
min = u.bc.backoffCount[address]
139+
minAddress = address
140+
}
141+
}
142+
u.bc.backoffCount[minAddress]++
143+
return minAddress
144+
}
145+
146+
func (u *backoffManager) backoff(address string, count uint64) {
147+
u.bc.mutex.Lock()
148+
defer u.bc.mutex.Unlock()
149+
u.bc.backoffCount[address] += count
150+
}
151+
152+
func (u *backoffManager) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
153+
var errs []error
154+
155+
tryTimes := len(u.addresses)
156+
for i := 0; i < tryTimes; i++ {
157+
addr := u.useLeastAndCount(u.addresses)
158+
dialer, err := u.bc.singleDial(ctx, u.dialer, addr)
159+
if err != nil {
160+
errs = append(errs, err)
161+
logger.Std.Warn("failed dial", "err", err, "previous", addr)
162+
u.backoff(addr, 16)
163+
continue
164+
}
165+
conn, err := dialer.DialContext(ctx, network, address)
166+
if err != nil {
167+
errs = append(errs, err)
168+
logger.Std.Warn("failed dial target", "err", err, "previous", addr, "target", address)
169+
u.backoff(addr, 8)
170+
continue
171+
}
172+
173+
logger.Std.Info("success dial target", "previous", addr, "target", address)
174+
return conn, nil
175+
}
176+
return nil, fmt.Errorf("all addresses are failed: %w", errors.Join(errs...))
177+
}

cmd/bridge/main.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,14 @@ func main() {
9898
tasks, err = config.LoadConfig(configs...)
9999
if err != nil {
100100
printDefaults()
101-
slog.Error("LoadConfig", "err", err)
101+
logger.Std.Error("LoadConfig", "err", err)
102102
return
103103
}
104104
} else {
105105
tasks, err = config.LoadConfigWithArgs(listens, dials)
106106
if err != nil {
107107
printDefaults()
108-
slog.Error("LoadConfigWithArgs", "err", err)
108+
logger.Std.Error("LoadConfigWithArgs", "err", err)
109109
return
110110
}
111111
}
@@ -136,7 +136,6 @@ func run(ctx context.Context, log *slog.Logger, tasks []config.Chain) {
136136
}
137137
go func(task config.Chain) {
138138
defer wg.Done()
139-
log := log.With("chains", task)
140139
log.Info(chain.ShowChainWithConfig(task))
141140
b := chain.NewBridge(log, dump)
142141
err := b.BridgeWithConfig(ctx, task)

cmd/bridge/main_other.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ func runWithReload(ctx context.Context, log *slog.Logger, tasks []config.Chain,
6363
wg.Add(1)
6464
go func(ctx context.Context, task config.Chain) {
6565
defer wg.Done()
66-
log := log.With("chains", task)
6766
log.Info(chain.ShowChainWithConfig(task))
6867
for ctx.Err() == nil {
6968
b := chain.NewBridge(log, dump)

config/config.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ package config
33
import (
44
"encoding/json"
55
"fmt"
6-
"log"
76
"os"
87
"strconv"
98
"strings"
109
"time"
1110

1211
"github.com/wzshiming/bridge/internal/scheme"
12+
"github.com/wzshiming/bridge/logger"
1313
)
1414

1515
func LoadConfigWithArgs(listens []string, dials []string) ([]Chain, error) {
@@ -55,7 +55,7 @@ func LoadConfig(configs ...string) ([]Chain, error) {
5555
for _, confPath := range configs {
5656
data, err := os.ReadFile(confPath)
5757
if err != nil {
58-
log.Println(err)
58+
logger.Std.Error("LoadConfig", "err", err, "path", confPath)
5959
continue
6060
}
6161
conf := Config{}
@@ -100,8 +100,7 @@ func (c Chain) Unique() string {
100100
}
101101

102102
type Node struct {
103-
Probe string `json:"probe"`
104-
LB []string `json:"lb"`
103+
LB []string `json:"lb"`
105104
}
106105

107106
func (m Node) MarshalJSON() ([]byte, error) {

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ require (
1414
github.com/wzshiming/httpproxy v0.5.6
1515
github.com/wzshiming/notify v0.1.1
1616
github.com/wzshiming/permuteproxy v0.0.2
17-
github.com/wzshiming/schedialer v0.6.1
1817
github.com/wzshiming/shadowsocks v0.4.1
1918
github.com/wzshiming/socks4 v0.3.2
2019
github.com/wzshiming/socks5 v0.5.1

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ github.com/wzshiming/notify v0.1.1 h1:rJXoszrkNglhCVyn/IfW500f5cW03q1q7YzL8hsLch
2626
github.com/wzshiming/notify v0.1.1/go.mod h1:SFhsQKZJznzsDcj/Qfo9A65k5IRcpUrpgbLRzZEa/DI=
2727
github.com/wzshiming/permuteproxy v0.0.2 h1:svedMueotlxJk9oJfA0gs8WzRYOdgd0DER9XvKpjwlY=
2828
github.com/wzshiming/permuteproxy v0.0.2/go.mod h1:Ny08A1JbuljB8FeJAOiB7dfvRGCVD8PB9hwrALIvYI8=
29-
github.com/wzshiming/schedialer v0.6.1 h1:4VwtIjVF3uMoWqjbyw3oqYi7WGOEYvDu3L9OYT8sbGY=
30-
github.com/wzshiming/schedialer v0.6.1/go.mod h1:TvVxg4QZIBTJzRfmL/G7g6CzynFQKPmtXtSeJ2c4Lus=
3129
github.com/wzshiming/shadowsocks v0.4.1 h1:tyLYtLSQs90jpMIkD+T8KuZH5foXwOH0ZjxSOb45orI=
3230
github.com/wzshiming/shadowsocks v0.4.1/go.mod h1:CfKm/Keclli2sPGfjskGVH+F3gpF0YPVdcf5t4krypY=
3331
github.com/wzshiming/socks4 v0.3.2 h1:w87nwfgRWteVwIH39nqTur8c+2dcODeLgLrWspcUkSc=

logger/logger.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
package logger
22

33
import (
4+
"context"
45
"fmt"
56
"log/slog"
7+
"os"
68
)
79

8-
var Std = slog.Default()
10+
var Std = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
11+
Level: slog.LevelInfo,
12+
}))
913

1014
func Wrap(logger *slog.Logger, name string) *wrap {
1115
return &wrap{
@@ -18,5 +22,5 @@ type wrap struct {
1822
}
1923

2024
func (w wrap) Println(v ...interface{}) {
21-
w.Logger.Info(fmt.Sprintln(v...))
25+
w.Logger.Log(context.Background(), slog.LevelWarn, "print", "message", fmt.Sprint(v...))
2226
}

protocols/local/local.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@ package local
22

33
import (
44
"context"
5-
"log/slog"
65
"net"
76
"os"
87
"strings"
98

109
"github.com/wzshiming/bridge"
1110
"github.com/wzshiming/bridge/internal/netutils"
11+
"github.com/wzshiming/bridge/logger"
1212
"github.com/wzshiming/commandproxy"
1313
)
1414

@@ -25,17 +25,17 @@ type Local struct {
2525
}
2626

2727
func (l *Local) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
28-
slog.Debug("Dial", "network", network, "address", address)
28+
logger.Std.Debug("Dial", "network", network, "address", address)
2929
return l.Dialer.DialContext(ctx, network, address)
3030
}
3131

3232
func (l *Local) Listen(ctx context.Context, network, address string) (net.Listener, error) {
33-
slog.Debug("Listen", "network", network, "address", address)
33+
logger.Std.Debug("Listen", "network", network, "address", address)
3434
return l.ListenConfig.Listen(ctx, network, address)
3535
}
3636

3737
func (l *Local) CommandDialContext(ctx context.Context, name string, args ...string) (net.Conn, error) {
38-
slog.Debug("CommandDial", "name", name, "args", args)
38+
logger.Std.Debug("CommandDial", "name", name, "args", args)
3939
proxy := commandproxy.ProxyCommand(ctx, name, args...)
4040
proxy.Stderr = os.Stderr
4141
conn, err := proxy.Stdio()
@@ -48,7 +48,7 @@ func (l *Local) CommandDialContext(ctx context.Context, name string, args ...str
4848
}
4949

5050
func (l *Local) CommandListen(ctx context.Context, name string, args ...string) (net.Listener, error) {
51-
slog.Debug("CommandListen", "name", name, "args", args)
51+
logger.Std.Debug("CommandListen", "name", name, "args", args)
5252
proxy := append([]string{name}, args...)
5353
remoteAddr := netutils.NewNetAddr("cmd", strings.Join(proxy, " "))
5454
return netutils.NewCommandListener(ctx, l, l.LocalAddr, remoteAddr, proxy)

0 commit comments

Comments
 (0)