Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support broadcast to unsubscribe topic #36

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
49 changes: 40 additions & 9 deletions p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"math/rand"
"os"
"strconv"
"sync"
"time"

lru "github.com/hashicorp/golang-lru"
Expand Down Expand Up @@ -118,8 +119,8 @@ var (
PeerBurst: 500,
}

// ErrNoPeersToBroadcast is a broadcast error when have no peers
ErrNoPeersToBroadcast = fmt.Errorf("no peers to broadcast")
// ErrNoConnectedPeers is a broadcast error when have no peers
ErrNoConnectedPeers = fmt.Errorf("no connected peers")
)

// Option defines the option function to modify the config for a host
Expand Down Expand Up @@ -263,7 +264,8 @@ type Host struct {
kadKey cid.Cid
newPubSub func(ctx context.Context, h core.Host, opts ...pubsub.Option) (*pubsub.PubSub, error)
pubsub *pubsub.PubSub
pubs map[string]*pubsub.Topic
pubs sync.Map
envestcc marked this conversation as resolved.
Show resolved Hide resolved
mutex sync.Mutex
blacklists map[string]*LRUBlacklist
subs map[string]*pubsub.Subscription
close chan interface{}
Expand Down Expand Up @@ -389,7 +391,6 @@ func NewHost(ctx context.Context, options ...Option) (*Host, error) {
kad: kad,
kadKey: cid,
newPubSub: newPubSub,
pubs: make(map[string]*pubsub.Topic),
blacklists: make(map[string]*LRUBlacklist),
subs: make(map[string]*pubsub.Subscription),
close: make(chan interface{}),
Expand Down Expand Up @@ -475,7 +476,7 @@ func (h *Host) AddUnicastPubSub(topic string, callback HandleUnicast) error {
// AddBroadcastPubSub adds a broadcast topic that the host will pay attention to. This need to be called before using
// Connect/JoinOverlay. Otherwise, pubsub may not be aware of the existing overlay topology
func (h *Host) AddBroadcastPubSub(ctx context.Context, topic string, callback HandleBroadcast) error {
if _, ok := h.pubs[topic]; ok {
if _, ok := h.pubs.Load(topic); ok {
return nil
}
blacklist, err := NewLRUBlacklist(h.cfg.BlockListLRUSize)
Expand All @@ -501,7 +502,7 @@ func (h *Host) AddBroadcastPubSub(ctx context.Context, topic string, callback Ha
if err != nil {
return err
}
h.pubs[topic] = top
h.pubs.Store(topic, top)
h.blacklists[topic] = blacklist
h.subs[topic] = sub
Copy link
Member

@dustinxie dustinxie Mar 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make L488 - 491 + 496 a func getOrAddPub(topic string)

go func() {
Expand Down Expand Up @@ -581,11 +582,25 @@ func (h *Host) Connect(ctx context.Context, target core.PeerAddrInfo) error {
}

// Broadcast sends a message to the hosts who subscribe the topic
// it returns ErrNoConnectedPeers if no connected peers who subscribe the topic when send a unsubscribed topic
func (h *Host) Broadcast(ctx context.Context, topic string, data []byte) error {
if len(h.pubsub.ListPeers(topic)) == 0 {
return ErrNoPeersToBroadcast
var (
pub *pubsub.Topic
err error
)
pubVal, ok := h.pubs.Load(topic)
if !ok {
if pub, err = h.addPub(topic); err != nil {
return err
}
} else {
pub = pubVal.(*pubsub.Topic)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pub, err := h.getOrAddPubsub(topic)

// fan-out with no connected peers will return ErrNoConnectedPeers
if h.subs[topic] == nil && len(pub.ListPeers()) == 0 {
return ErrNoConnectedPeers
}
return h.pubsub.Publish(topic, data)
return pub.Publish(ctx, data)
}

// Unicast sends a message to a peer on the given address
Expand Down Expand Up @@ -731,6 +746,22 @@ func (h *Host) allowSource(src core.PeerID) (bool, error) {
return limiter.Allow(), nil
}

func (h *Host) addPub(topic string) (*pubsub.Topic, error) {
Copy link
Member

@dustinxie dustinxie Mar 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getOrAddPub

h.mutex.Lock()
defer h.mutex.Unlock()

pub, ok := h.pubs.Load(topic)
if ok {
return pub.(*pubsub.Topic), nil
}
t, err := h.pubsub.Join(topic)
if err != nil {
return nil, err
}
h.pubs.Store(topic, t)
return t, nil
}

// generateKeyPair generates the public key and private key by network address
func generateKeyPair(masterKey string) (crypto.PrivKey, crypto.PubKey, error) {
hash := sha1.Sum([]byte(masterKey))
Expand Down
2 changes: 1 addition & 1 deletion p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ func TestBroadcastMultipleTopic(t *testing.T) {
t.Run("broadcastUnsubscribedTopicWithNoPeers", func(t *testing.T) {
resetCount()
err := hosts[3].Broadcast(ctx, "unknown", []byte(""))
require.True(errors.Is(err, ErrNoPeersToBroadcast))
require.True(errors.Is(err, ErrNoConnectedPeers))
})
}

Expand Down