Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
envestcc committed Aug 2, 2024
1 parent 6628384 commit a45755e
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 44 deletions.
6 changes: 3 additions & 3 deletions chainservice/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func (builder *Builder) buildNodeInfoManager() error {
return errors.New("cannot find staking protocol")
}
chain := builder.cs.chain
dm := nodeinfo.NewInfoManager(&builder.cfg.NodeInfo, cs.p2pAgent.NetworkProxy(CompatibleNetwork), cs.chain, builder.cfg.Chain.ProducerPrivateKey(), func() []string {
dm := nodeinfo.NewInfoManager(&builder.cfg.NodeInfo, cs.p2pAgent.Subnet(CompatibleNetwork), cs.chain, builder.cfg.Chain.ProducerPrivateKey(), func() []string {
ctx := protocol.WithFeatureCtx(
protocol.WithBlockCtx(
genesis.WithGenesisContext(context.Background(), chain.Genesis()),
Expand Down Expand Up @@ -515,7 +515,7 @@ func (builder *Builder) buildBlockSyncer() error {
return nil
}

p2pAgent := builder.cs.p2pAgent.NetworkProxy(CompatibleNetwork)
p2pAgent := builder.cs.p2pAgent.Subnet(CompatibleNetwork)
chain := builder.cs.chain
consens := builder.cs.consensus

Expand Down Expand Up @@ -702,7 +702,7 @@ func (builder *Builder) buildBlockTimeCalculator() (err error) {
}

func (builder *Builder) buildConsensusComponent() error {
p2pAgent := builder.cs.p2pAgent.NetworkProxy(CompatibleNetwork)
p2pAgent := builder.cs.p2pAgent.Subnet(CompatibleNetwork)
copts := []consensus.Option{
consensus.WithBroadcast(func(msg proto.Message) error {
return p2pAgent.BroadcastOutbound(context.Background(), msg)
Expand Down
2 changes: 1 addition & 1 deletion chainservice/chainservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (cs *ChainService) NewAPIServer(cfg api.Config, plugins map[int]interface{}
if cfg.GRPCPort == 0 && cfg.HTTPPort == 0 {
return nil, nil
}
p2pAgent := cs.p2pAgent.NetworkProxy(CompatibleNetwork)
p2pAgent := cs.p2pAgent.Subnet(CompatibleNetwork)
apiServerOptions := []api.Option{
api.WithBroadcastOutbound(func(ctx context.Context, chainID uint32, msg proto.Message) error {
return p2pAgent.BroadcastOutbound(ctx, msg)
Expand Down
8 changes: 4 additions & 4 deletions e2etest/local_actpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func TestLocalActPool(t *testing.T) {
func(_ context.Context, _ uint32, _ peer.AddrInfo, _ proto.Message) {

},
p2p.JoinNetwork(chainservice.CompatibleNetwork),
).NetworkProxy(chainservice.CompatibleNetwork)
p2p.JoinSubnet(chainservice.CompatibleNetwork),
).Subnet(chainservice.CompatibleNetwork)
require.NotNil(cli)
require.NoError(cli.Start(ctx))
fmt.Println("p2p agent started")
Expand Down Expand Up @@ -139,8 +139,8 @@ func TestPressureActPool(t *testing.T) {
func(_ context.Context, _ uint32, _ peer.AddrInfo, _ proto.Message) {

},
p2p.JoinNetwork(chainservice.CompatibleNetwork),
).NetworkProxy(chainservice.CompatibleNetwork)
p2p.JoinSubnet(chainservice.CompatibleNetwork),
).Subnet(chainservice.CompatibleNetwork)
require.NotNil(cli)
require.NoError(cli.Start(ctx))

Expand Down
8 changes: 4 additions & 4 deletions e2etest/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ func TestLocalCommit(t *testing.T) {
},
func(_ context.Context, _ uint32, _ peer.AddrInfo, _ proto.Message) {
},
p2p.JoinNetwork(chainservice.CompatibleNetwork),
).NetworkProxy(chainservice.CompatibleNetwork)
p2p.JoinSubnet(chainservice.CompatibleNetwork),
).Subnet(chainservice.CompatibleNetwork)
require.NotNil(p)
require.NoError(p.Start(ctx))
defer func() {
Expand Down Expand Up @@ -330,7 +330,7 @@ func TestLocalSync(t *testing.T) {
hash.ZeroHash256,
func(_ context.Context, _ uint32, _ string, msg proto.Message) {},
func(_ context.Context, _ uint32, _ peer.AddrInfo, _ proto.Message) {},
p2p.JoinNetwork(chainservice.CompatibleNetwork),
p2p.JoinSubnet(chainservice.CompatibleNetwork),
)
require.NoError(bootnode.Start(ctx))
addrs, err := bootnode.Self()
Expand Down Expand Up @@ -404,7 +404,7 @@ func TestLocalSync(t *testing.T) {
}()

err = testutil.WaitUntil(time.Millisecond*100, time.Second*60, func() (bool, error) {
peers, err := svr.P2PAgent().NetworkProxy(chainservice.CompatibleNetwork).ConnectedPeers()
peers, err := svr.P2PAgent().Subnet(chainservice.CompatibleNetwork).ConnectedPeers()
return len(peers) >= 1, err
})
require.NoError(err)
Expand Down
14 changes: 7 additions & 7 deletions p2p/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ type (
Self() ([]multiaddr.Multiaddr, error)
// BlockPeer blocks the peer in p2p layer
BlockPeer(string)
// NetworkProxy returns a network proxy to agent
NetworkProxy(string) NetworkProxy
// Subnet returns a network proxy to agent
Subnet(string) SubnetProxy
// ConnectedPeers returns the connected peers' info
ConnectedPeers() ([]peer.AddrInfo, error)
}
Expand All @@ -133,10 +133,10 @@ type (
}
)

// JoinNetwork choose networks to join.
// JoinSubnet choose networks to join.
// You will only receive messages from the networks you joined.
// "" is a special network name, which means the whole network before introducing message network.
func JoinNetwork(networks ...string) Option {
func JoinSubnet(networks ...string) Option {
return func(a *agent) {
for _, network := range networks {
a.networks[network] = struct{}{}
Expand Down Expand Up @@ -201,7 +201,7 @@ func (*dummyAgent) BuildReport() string {
return ""
}

func (d *dummyAgent) NetworkProxy(n string) NetworkProxy {
func (d *dummyAgent) Subnet(n string) SubnetProxy {
return d
}

Expand Down Expand Up @@ -540,8 +540,8 @@ func (p *agent) BuildReport() string {
return ""
}

func (p *agent) NetworkProxy(network string) NetworkProxy {
return &networkProxy{
func (p *agent) Subnet(network string) SubnetProxy {
return &subnetProxy{
agent: p,
network: network,
}
Expand Down
36 changes: 18 additions & 18 deletions p2p/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (

func TestDummyAgent(t *testing.T) {
require := require.New(t)
a := NewDummyAgent().NetworkProxy(_blockNetwork)
a := NewDummyAgent().Subnet(_blockNetwork)
require.NoError(a.Start(nil))
require.NoError(a.Stop(nil))
require.NoError(a.BroadcastOutbound(nil, nil))
Expand Down Expand Up @@ -89,13 +89,13 @@ func TestBroadcast(t *testing.T) {
BootstrapNodes: []string{bootnodeAddr[0].String()},
ReconnectInterval: 150 * time.Second,
MaxMessageSize: p2p.DefaultConfig.MaxMessageSize,
}, 1, hash.ZeroHash256, b, u, JoinNetwork(_blockNetwork))
}, 1, hash.ZeroHash256, b, u, JoinSubnet(_blockNetwork))
agent.Start(ctx)
agents = append(agents, agent)
}

for i := 0; i < n; i++ {
r.NoError(agents[i].NetworkProxy(_blockNetwork).BroadcastOutbound(ctx, &testingpb.TestPayload{
r.NoError(agents[i].Subnet(_blockNetwork).BroadcastOutbound(ctx, &testingpb.TestPayload{
MsgBody: []byte{uint8(i)},
}))
r.NoError(testutil.WaitUntil(100*time.Millisecond, 20*time.Second, func() (bool, error) {
Expand Down Expand Up @@ -152,10 +152,10 @@ func TestNetworkSeparation(t *testing.T) {
cfg.ReconnectInterval = 150 * time.Second
var agent Agent
if i%2 == 0 {
opt := JoinNetwork(_blockNetwork, _actionNetwork)
opt := JoinSubnet(_blockNetwork, _actionNetwork)
agent = NewAgent(cfg, 1, hash.ZeroHash256, b(uint8(i)), u, opt)
} else {
opt := JoinNetwork(_blockNetwork, _consensusNetwork)
opt := JoinSubnet(_blockNetwork, _consensusNetwork)
agent = NewAgent(cfg, 1, hash.ZeroHash256, b(uint8(i)), u, opt)
}
agent.Start(ctx)
Expand All @@ -166,27 +166,27 @@ func TestNetworkSeparation(t *testing.T) {
t.Run("connectedPeers", func(t *testing.T) {
for i := 0; i < n; i++ {
if i%2 == 0 {
peers, err := agents[i].NetworkProxy(_actionNetwork).ConnectedPeers()
peers, err := agents[i].Subnet(_actionNetwork).ConnectedPeers()
require.NoError(err)
require.Len(peers, n/2-1)
peers, err = agents[i].NetworkProxy(_consensusNetwork).ConnectedPeers()
peers, err = agents[i].Subnet(_consensusNetwork).ConnectedPeers()
require.NoError(err)
require.Len(peers, n/2)
peers, err = agents[i].NetworkProxy(_blockNetwork).ConnectedPeers()
peers, err = agents[i].Subnet(_blockNetwork).ConnectedPeers()
require.NoError(err)
require.Len(peers, n-1)
} else {
peers, err := agents[i].NetworkProxy(_actionNetwork).ConnectedPeers()
peers, err := agents[i].Subnet(_actionNetwork).ConnectedPeers()
require.NoError(err)
require.Len(peers, n/2)
peers, err = agents[i].NetworkProxy(_consensusNetwork).ConnectedPeers()
peers, err = agents[i].Subnet(_consensusNetwork).ConnectedPeers()
require.NoError(err)
require.Len(peers, n/2-1)
peers, err = agents[i].NetworkProxy(_blockNetwork).ConnectedPeers()
peers, err = agents[i].Subnet(_blockNetwork).ConnectedPeers()
require.NoError(err)
require.Len(peers, n-1)
}
peers, err := agents[i].NetworkProxy("unknown").ConnectedPeers()
peers, err := agents[i].Subnet("unknown").ConnectedPeers()
require.NoError(err)
require.Len(peers, 0)
peers, err = agents[i].ConnectedPeers()
Expand All @@ -198,7 +198,7 @@ func TestNetworkSeparation(t *testing.T) {
t.Run("broadcastSubscribed", func(t *testing.T) {
resetCounts()
for i := 0; i < n; i++ {
require.NoError(agents[i].NetworkProxy(_blockNetwork).BroadcastOutbound(ctx, &testingpb.TestPayload{
require.NoError(agents[i].Subnet(_blockNetwork).BroadcastOutbound(ctx, &testingpb.TestPayload{
MsgBody: []byte{uint8(i)},
}))
}
Expand All @@ -214,7 +214,7 @@ func TestNetworkSeparation(t *testing.T) {

t.Run("broadcastUnsubscribed", func(t *testing.T) {
resetCounts()
require.NoError(agents[0].NetworkProxy(_consensusNetwork).BroadcastOutbound(ctx, &testingpb.TestPayload{
require.NoError(agents[0].Subnet(_consensusNetwork).BroadcastOutbound(ctx, &testingpb.TestPayload{
MsgBody: []byte{uint8(0)},
}))
for i := 1; i < n; i++ {
Expand All @@ -236,7 +236,7 @@ func TestNetworkSeparation(t *testing.T) {

t.Run("broadcastUnsubscribedWithNoPeers", func(t *testing.T) {
resetCounts()
err := agents[0].NetworkProxy("unknown").BroadcastOutbound(ctx, &testingpb.TestPayload{
err := agents[0].Subnet("unknown").BroadcastOutbound(ctx, &testingpb.TestPayload{
MsgBody: []byte{uint8(0)},
})
require.True(errors.Is(err, p2p.ErrNoConnectedPeers))
Expand Down Expand Up @@ -284,17 +284,17 @@ func TestUnicast(t *testing.T) {
BootstrapNodes: []string{addrs[0].String()},
ReconnectInterval: 150 * time.Second,
MasterKey: strconv.Itoa(i),
}, 2, hash.ZeroHash256, b, u, JoinNetwork(_blockNetwork))
}, 2, hash.ZeroHash256, b, u, JoinSubnet(_blockNetwork))
r.NoError(agent.Start(ctx))
agents = append(agents, agent)
}

for i := 0; i < n; i++ {
neighbors, err := agents[i].NetworkProxy(_blockNetwork).ConnectedPeers()
neighbors, err := agents[i].Subnet(_blockNetwork).ConnectedPeers()
r.NoError(err)
r.True(len(neighbors) >= n/3)
for _, neighbor := range neighbors {
r.NoError(agents[i].NetworkProxy(_blockNetwork).UnicastOutbound(ctx, neighbor, &testingpb.TestPayload{
r.NoError(agents[i].Subnet(_blockNetwork).UnicastOutbound(ctx, neighbor, &testingpb.TestPayload{
MsgBody: []byte{uint8(i)},
}))
}
Expand Down
12 changes: 6 additions & 6 deletions p2p/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,29 @@ import (
)

type (
// NetworkProxy wrap agent with one specific network
NetworkProxy interface {
// SubnetProxy wrap agent with one specific network
SubnetProxy interface {
Agent
// BroadcastOutbound sends a broadcast message to the network
BroadcastOutbound(ctx context.Context, msg proto.Message) (err error)
// UnicastOutbound sends a unicast message to the given address
UnicastOutbound(ctx context.Context, peer peer.AddrInfo, msg proto.Message) (err error)
}

networkProxy struct {
subnetProxy struct {
*agent
network string
}
)

func (ap *networkProxy) BroadcastOutbound(ctx context.Context, msg proto.Message) error {
func (ap *subnetProxy) BroadcastOutbound(ctx context.Context, msg proto.Message) error {
return ap.agent.BroadcastOutbound(ctx, ap.network, msg)
}

func (ap *networkProxy) UnicastOutbound(ctx context.Context, peer peer.AddrInfo, msg proto.Message) (err error) {
func (ap *subnetProxy) UnicastOutbound(ctx context.Context, peer peer.AddrInfo, msg proto.Message) (err error) {
return ap.agent.UnicastOutbound(ctx, peer, ap.network, msg)
}

func (ap *networkProxy) ConnectedPeers() (peers []peer.AddrInfo, err error) {
func (ap *subnetProxy) ConnectedPeers() (peers []peer.AddrInfo, err error) {
return ap.agent.connectedPeersByNetwork(ap.network)
}
2 changes: 1 addition & 1 deletion server/itx/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func newServer(cfg config.Config, testing bool) (*Server, error) {
case config.StandaloneScheme:
p2pAgent = p2p.NewDummyAgent()
default:
p2pAgent = p2p.NewAgent(cfg.Network, cfg.Chain.ID, cfg.Genesis.Hash(), dispatcher.HandleBroadcast, dispatcher.HandleTell, p2p.JoinNetwork(chainservice.CompatibleNetwork))
p2pAgent = p2p.NewAgent(cfg.Network, cfg.Chain.ID, cfg.Genesis.Hash(), dispatcher.HandleBroadcast, dispatcher.HandleTell, p2p.JoinSubnet(chainservice.CompatibleNetwork))
}
chains := make(map[uint32]*chainservice.ChainService)
apiServers := make(map[uint32]*api.ServerV2)
Expand Down

0 comments on commit a45755e

Please sign in to comment.